255 lines
6.6 KiB
Go
255 lines
6.6 KiB
Go
package procRTSP
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"git.insit.tech/sas/rtsp_proxy/protos/gens"
|
|
"github.com/bluenviron/gortsplib/v4"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/base"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm"
|
|
"github.com/pion/rtp"
|
|
_ "github.com/zaf/g711"
|
|
"log"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
"writer/internal/config"
|
|
"writer/internal/media"
|
|
"writer/pkg/converter"
|
|
)
|
|
|
|
var (
|
|
resolutions = []string{"1280x720"}
|
|
h264RTPDec *rtph264.Decoder
|
|
g711RTPDec *rtplpcm.Decoder
|
|
)
|
|
|
|
// StartWriter starts the program.
|
|
func StartWriter(dir string, period int, URI string) error {
|
|
err := ProcRTSP(dir, period, URI)
|
|
if err != nil {
|
|
|
|
// Temporary solution for inner cameras.
|
|
//
|
|
// Change domain if a camera was flipped to another domain.
|
|
err = changeDomain(dir, period, URI, err)
|
|
if err != nil {
|
|
return fmt.Errorf("change domain error: %w", err)
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ProcRTSP process RTSP protocol and writes H264 and PCM flows into TS container.
|
|
func ProcRTSP(dir string, period int, URI string) error {
|
|
// Return the last part of the URI after "/".
|
|
cuttedURI := config.CutURI(URI)
|
|
|
|
// Create FileName structure
|
|
fn := config.CreateFileName(dir, resolutions, cuttedURI, period)
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Create directory for files according to resolutions.
|
|
err := os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755)
|
|
if err != nil {
|
|
return fmt.Errorf("mkdirall error: %w", err)
|
|
}
|
|
|
|
// Create M3U8 playlist.
|
|
go gens.MediaPlaylistGenerator(dir+"/data/"+cuttedURI, "", fn.Duration, resolutions)
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Initialise client.
|
|
c := gortsplib.Client{
|
|
UserAgent: "PSA",
|
|
}
|
|
|
|
// Parse URL.
|
|
u, err := base.ParseURL(URI)
|
|
if err != nil {
|
|
return fmt.Errorf("parse URL error: %w", err)
|
|
}
|
|
|
|
// Connect to the server.
|
|
err = c.Start(u.Scheme, u.Host)
|
|
if err != nil {
|
|
return fmt.Errorf("connect to the server error: %w", err)
|
|
}
|
|
defer c.Close()
|
|
|
|
// Find available medias.
|
|
desc, _, err := c.Describe(u)
|
|
if err != nil || desc == nil {
|
|
return fmt.Errorf("medias not found: %w", err)
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Find the H264 media and format.
|
|
h264Format, h264Media, err := media.CheckH264Format(desc)
|
|
|
|
// Find the G711 media and format.
|
|
g711Format, g711Media, err := media.CheckG711Format(desc)
|
|
|
|
// Initialising variable for AAC.
|
|
// var mpeg4AudioFormat *format.MPEG4Audio
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Create RTP -> H264 decoder.
|
|
if h264Format != nil {
|
|
h264RTPDec, err = h264Format.CreateDecoder()
|
|
if err != nil {
|
|
return fmt.Errorf("create H264 decoder error: %w", err)
|
|
}
|
|
}
|
|
|
|
// Create RTP -> H264 decoder.
|
|
if g711Format != nil {
|
|
g711RTPDec, err = g711Format.CreateDecoder()
|
|
if err != nil {
|
|
return fmt.Errorf("create G711 decoder error: %w", err)
|
|
}
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Wait for the next period.
|
|
config.Waiter(period)
|
|
log.Println("Start recording")
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Setup MPEG-TS muxer.
|
|
currentMpegtsMuxer := &converter.MpegtsMuxer{
|
|
FileName: fn.SetNumNTime(),
|
|
H264Format: h264Format,
|
|
// Mpeg4AudioFormat: mpeg4AudioFormat,
|
|
}
|
|
|
|
err = currentMpegtsMuxer.Initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer currentMpegtsMuxer.Close()
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Setup all medias.
|
|
err = c.SetupAll(desc.BaseURL, desc.Medias)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Called when a H264/RTP or G711/RTP packet arrives.
|
|
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
|
|
switch f := forma.(type) {
|
|
case *format.H264:
|
|
// Process H264 flow and return PTS and AU.
|
|
pts, au, err := media.ProcH264(&c, h264Media, h264RTPDec, pkt)
|
|
if err != nil {
|
|
//log.Printf("%s: process H264 error: %s\n", cuttedURI, err)
|
|
}
|
|
|
|
// Encode the access unit into MPEG-TS.
|
|
err = currentMpegtsMuxer.WriteH264(au, pts)
|
|
if err != nil {
|
|
//log.Printf("%s: writing H264 packet: %s\n", cuttedURI, err)
|
|
}
|
|
|
|
case *format.G711:
|
|
// Process G711 flow and returns PTS and AU.
|
|
_, au, err := media.ProcG711(&c, g711Media, g711RTPDec, pkt)
|
|
if err != nil {
|
|
log.Printf("%s: process G711 error: %s\n", cuttedURI, err)
|
|
}
|
|
|
|
// Convert G711 to AAC.
|
|
_, err = converter.ConvertG711ToAAC(au, f.MULaw) // take aacAu
|
|
if err != nil {
|
|
log.Printf("%s: converting G711 to AAC frame: %s\n", cuttedURI, err)
|
|
}
|
|
|
|
/*
|
|
// Encode the access unit into MPEG-TS.
|
|
err = MpegtsMuxer.writeMPEG4Audio([][]byte{aacAu}, pts)
|
|
if err != nil {
|
|
log.Printf("MPEG-TS write error: %v", err)
|
|
return
|
|
}
|
|
*/
|
|
}
|
|
})
|
|
|
|
////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Send PLAY request.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
log.Fatalln("Ошибка запуска воспроизведения:", err)
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
currentMpegtsMuxer.Close()
|
|
currentMpegtsMuxer.FileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.Initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
log.Println("New file for recording created:", currentMpegtsMuxer.FileName)
|
|
}
|
|
}()
|
|
|
|
// Restart program if client gets TEARDOWN request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
if req.Method == base.Teardown {
|
|
log.Printf("got TEARDOWN request from server: %v", req)
|
|
|
|
//err = procRTSP(period, URI)
|
|
//if err != nil {
|
|
// log.Fatalf("restart RTSP error: %s\n", err)
|
|
//}
|
|
}
|
|
}
|
|
|
|
panic(c.Wait())
|
|
}
|
|
|
|
// changeDomain changes domain if a camera was flipped to another domain.
|
|
func changeDomain(directory string, period int, URI string, err error) error {
|
|
err2 := errors.New("404 (Not found)")
|
|
if errors.As(err, &err2) {
|
|
if strings.Contains(URI, "video-1") {
|
|
err = ProcRTSP(directory, period, strings.Replace(URI, "video-1", "video-2", 1))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
err = ProcRTSP(directory, period, strings.Replace(URI, "video-2", "video-1", 1))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
panic(err)
|
|
}
|
|
|
|
return nil
|
|
}
|