package procRTSP import ( "fmt" "git.insit.tech/sas/rtsp_proxy/gens" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/rtp" _ "github.com/zaf/g711" "log" "time" "writer/internal/config" "writer/internal/media" "writer/pkg/converter" ) var ( resolutions = []string{"1280x720"} ) // ProcRTSP process RTSP protocol and writes H264 and PCM flows into TS container. func ProcRTSP(period int, URI string) error { // Create FileName structure fn := config.CreateFileName(resolutions, period) //////////////////////////////////////////////////////////////////////////////////////// // Create M3U8 playlist. go gens.MediaPlaylistGenerator("/home/psa/GoRepository/data", "", fn.Duration, resolutions) //////////////////////////////////////////////////////////////////////////////////////// // Initialise client. c := gortsplib.Client{} // Parse URL. u, err := base.ParseURL(URI) if err != nil { return fmt.Errorf("parse URL error: %w", err) } // Connect to the server. err = c.Start(u.Scheme, u.Host) if err != nil { return fmt.Errorf("connect to the server error: %w", err) } defer c.Close() // Find available medias. desc, _, err := c.Describe(u) if err != nil || desc == nil { return fmt.Errorf("medias not found: %w", err) } //////////////////////////////////////////////////////////////////////////////////////// // Find the H264 media and format. h264Format, h264Media, err := media.CheckH264Format(desc) // Find the G711 media and format. g711Format, g711Media, err := media.CheckG711Format(desc) // Initialising variable for AAC. var mpeg4AudioFormat *format.MPEG4Audio //////////////////////////////////////////////////////////////////////////////////////// // Create RTP -> H264 decoder. h264RTPDec, err := h264Format.CreateDecoder() if err != nil { return fmt.Errorf("create H264 decoder error: %w", err) } // Create RTP -> H264 decoder. g711RTPDec, err := g711Format.CreateDecoder() if err != nil { return fmt.Errorf("create G711 decoder error: %w", err) } //////////////////////////////////////////////////////////////////////////////////////// // Wait for the next period. config.Waiter(period) log.Println("Start recording") //////////////////////////////////////////////////////////////////////////////////////// // Setup MPEG-TS muxer. currentMpegtsMuxer := &converter.MpegtsMuxer{ FileName: fn.SetNumNTime(), H264Format: h264Format, Mpeg4AudioFormat: mpeg4AudioFormat, } err = currentMpegtsMuxer.Initialize(resolutions) if err != nil { panic(err) } defer currentMpegtsMuxer.Close() //////////////////////////////////////////////////////////////////////////////////////// // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) if err != nil { panic(err) } //////////////////////////////////////////////////////////////////////////////////////// // Called when a H264/RTP or G711/RTP packet arrives. c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { switch f := forma.(type) { case *format.H264: // Process H264 flow and return PTS and AU. pts, au, err := media.ProcH264(&c, h264Media, h264RTPDec, pkt) if err != nil { fmt.Printf("process G711 error: %s\n", err) } // Encode the access unit into MPEG-TS. err = currentMpegtsMuxer.WriteH264(au, pts) if err != nil { log.Printf("writing H264 packet: %s\n", err) } case *format.G711: // Process G711 flow and returns PTS and AU. _, au, err := media.ProcG711(&c, g711Media, g711RTPDec, pkt) if err != nil { fmt.Printf("process G711 error: %s\n", err) } // Convert G711 to AAC. _, err = converter.ConvertG711ToAAC(au, f.MULaw) // take aacAu if err != nil { log.Printf("converting G711 to AAC frame: %s\n", err) } /* // Encode the access unit into MPEG-TS. err = MpegtsMuxer.writeMPEG4Audio([][]byte{aacAu}, pts) if err != nil { log.Printf("MPEG-TS write error: %v", err) return } */ } }) //////////////////////////////////////////////////////////////////////////////////////// // Send PLAY request. _, err = c.Play(nil) if err != nil { log.Fatalln("Ошибка запуска воспроизведения:", err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Minute) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { currentMpegtsMuxer.Close() currentMpegtsMuxer.FileName = fn.SetNumNTime() err = currentMpegtsMuxer.Initialize(resolutions) if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.FileName) } }() panic(c.Wait()) }