package procRTSP import ( "errors" "fmt" "git.insit.tech/sas/rtsp_proxy/protos/gens" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtph264" "github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm" "github.com/pion/rtp" _ "github.com/zaf/g711" "log" "strings" "time" "writer/internal/config" "writer/internal/media" "writer/pkg/converter" ) var ( resolutions = []string{"1280x720"} h264RTPDec *rtph264.Decoder g711RTPDec *rtplpcm.Decoder ) // StartWriter starts the program. func StartWriter(period int, URI string) error { err := procRTSP(period, URI) if err != nil { // Change domain if a camera was flipped to another domain. err = changeDomain(period, URI, err) if err != nil { return fmt.Errorf("change domain error: %w", err) } } return nil } // procRTSP process RTSP protocol and writes H264 and PCM flows into TS container. func procRTSP(period int, URI string) error { // Create FileName structure fn := config.CreateFileName(resolutions, period) //////////////////////////////////////////////////////////////////////////////////////// // Create M3U8 playlist. go gens.MediaPlaylistGenerator("/home/psa/GoRepository/data", "", fn.Duration, resolutions) //////////////////////////////////////////////////////////////////////////////////////// // Initialise client. c := gortsplib.Client{ UserAgent: "PSA", } // Parse URL. u, err := base.ParseURL(URI) if err != nil { return fmt.Errorf("parse URL error: %w", err) } // Connect to the server. err = c.Start(u.Scheme, u.Host) if err != nil { return fmt.Errorf("connect to the server error: %w", err) } defer c.Close() // Find available medias. desc, _, err := c.Describe(u) if err != nil || desc == nil { return fmt.Errorf("medias not found: %w", err) } //////////////////////////////////////////////////////////////////////////////////////// // Find the H264 media and format. h264Format, h264Media, err := media.CheckH264Format(desc) // Find the G711 media and format. g711Format, g711Media, err := media.CheckG711Format(desc) // Initialising variable for AAC. var mpeg4AudioFormat *format.MPEG4Audio //////////////////////////////////////////////////////////////////////////////////////// // Create RTP -> H264 decoder. if h264Format != nil { h264RTPDec, err = h264Format.CreateDecoder() if err != nil { return fmt.Errorf("create H264 decoder error: %w", err) } } // Create RTP -> H264 decoder. if g711Format != nil { g711RTPDec, err = g711Format.CreateDecoder() if err != nil { return fmt.Errorf("create G711 decoder error: %w", err) } } //////////////////////////////////////////////////////////////////////////////////////// // Wait for the next period. config.Waiter(period) log.Println("Start recording") //////////////////////////////////////////////////////////////////////////////////////// // Setup MPEG-TS muxer. currentMpegtsMuxer := &converter.MpegtsMuxer{ FileName: fn.SetNumNTime(), H264Format: h264Format, Mpeg4AudioFormat: mpeg4AudioFormat, } err = currentMpegtsMuxer.Initialize(resolutions) if err != nil { panic(err) } defer currentMpegtsMuxer.Close() //////////////////////////////////////////////////////////////////////////////////////// // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) if err != nil { panic(err) } //////////////////////////////////////////////////////////////////////////////////////// // Called when a H264/RTP or G711/RTP packet arrives. c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { switch f := forma.(type) { case *format.H264: // Process H264 flow and return PTS and AU. pts, au, err := media.ProcH264(&c, h264Media, h264RTPDec, pkt) if err != nil { fmt.Printf("process G711 error: %s\n", err) } // Encode the access unit into MPEG-TS. err = currentMpegtsMuxer.WriteH264(au, pts) if err != nil { log.Printf("writing H264 packet: %s\n", err) } case *format.G711: // Process G711 flow and returns PTS and AU. _, au, err := media.ProcG711(&c, g711Media, g711RTPDec, pkt) if err != nil { fmt.Printf("process G711 error: %s\n", err) } // Convert G711 to AAC. _, err = converter.ConvertG711ToAAC(au, f.MULaw) // take aacAu if err != nil { log.Printf("converting G711 to AAC frame: %s\n", err) } /* // Encode the access unit into MPEG-TS. err = MpegtsMuxer.writeMPEG4Audio([][]byte{aacAu}, pts) if err != nil { log.Printf("MPEG-TS write error: %v", err) return } */ } }) //////////////////////////////////////////////////////////////////////////////////////// // Send PLAY request. _, err = c.Play(nil) if err != nil { log.Fatalln("Ошибка запуска воспроизведения:", err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Minute) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { currentMpegtsMuxer.Close() currentMpegtsMuxer.FileName = fn.SetNumNTime() err = currentMpegtsMuxer.Initialize(resolutions) if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.FileName) } }() // Restart program if client gets TEARDOWN request. c.OnRequest = func(req *base.Request) { if req.Method == base.Teardown { log.Printf("got TEARDOWN request from server: %v", req) } go func() { err = procRTSP(period, URI) if err != nil { log.Fatalf("restart RTSP error: %s\n", err) } }() } panic(c.Wait()) } // changeDomain changes domain if a camera was flipped to another domain. func changeDomain(period int, URI string, err error) error { err2 := errors.New("404 (Not found)") if errors.As(err, &err2) { if strings.Contains(URI, "video-1") { err = procRTSP(period, strings.Replace(URI, "video-1", "video-2", 1)) if err != nil { return err } } else { err = procRTSP(period, strings.Replace(URI, "video-2", "video-1", 1)) if err != nil { return err } } } else { panic(err) } return nil }