package rtsp import ( "encoding/binary" "errors" "fmt" "log" "os" "strconv" "strings" "time" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats" "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" "github.com/pion/rtp" ) // StartWriter starts the program. func StartWriter(dir string, period int, link string) error { err := RTSP(dir, period, link) if err != nil { // Change domain if a camera was flipped to another domain. err = changeDomain(dir, period, link, err) if err != nil { return fmt.Errorf("change domain error: %w", err) } } return nil } // RTSP processes RTSP protocol. func RTSP(dir string, period int, link string) error { // Connect to the server. c := gortsplib.Client{ UserAgent: "PSA", } u, err := base.ParseURL(link) if err != nil { return fmt.Errorf("parse URL error: %w", err) } err = c.Start(u.Scheme, u.Host) if err != nil { return fmt.Errorf("connect to the server error: %w", err) } defer c.Close() desc, res, err := c.Describe(u) if err != nil || desc == nil { log.Printf("medias not found for camera [%s]: %v", link, err) return nil } // Create file name structure and directory for files. resolution := findResolution(res.Body) resolutions := []string{resolution} cutURI := cutURI(link) fn := storage.CreateFileName(dir, resolutions, cutURI, period) err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755) if err != nil { return fmt.Errorf("mkdirall error: %w", err) } //////////////////////////////////////////////////////////////////////////////////////////////////// /* // Create M3U8 playlist. go gen.MediaPlaylistGenerator(dir+"/data/"+cutURI, "", fn.Duration, resolutions) */ //////////////////////////////////////////////////////////////////////////////////////////////////// // Find formats. var audioFormat string var videoFormat string av1Format, av1Media, err := formats.FindAV1Format(desc) if av1Format != nil { log.Println("[av1]: format found") videoFormat = "AV1" } else { log.Println(err) } g711Format, g711Media, err := formats.FindG711Format(desc) if g711Format != nil { log.Println("[g711]: format found") audioFormat = "G711" } else { log.Println(err) } h264Format, h264Media, err := formats.FindH264Format(desc) if h264Format != nil { log.Println("[h264]: format found") videoFormat = "H264" } else { log.Println(err) } aacFormat, aacMedia, err := formats.FindAACFormat(desc) if aacFormat != nil { log.Println("[aac]: format found") audioFormat = "AAC" } else { log.Println(err) } h265Format, h265Media, err := formats.FindH265Format(desc) if h265Format != nil { log.Println("[h265]: format found") videoFormat = "H265" } else { log.Println(err) } lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc) if lpcmFormat != nil { log.Println("[lpcm]: format found") audioFormat = "LPCM" } else { log.Println(err) } mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc) if mjpegFormat != nil { log.Println("[mjpeg]: format found") videoFormat = "MJPEG" } else { log.Println(err) } opusFormat, opusMedia, err := formats.FindOPUSFormat(desc) if opusFormat != nil { log.Println("[opus]: format found") audioFormat = "OPUS" } else { log.Println(err) } vp8Format, vp8Media, err := formats.FindVP8Format(desc) if vp8Format != nil { log.Println("[vp8]: format found") videoFormat = "VP8" } else { log.Println(err) } vp9Format, vp9Media, err := formats.FindVP9Format(desc) if vp9Format != nil { log.Println("[vp9]: format found") videoFormat = "VP9" } else { log.Println(err) } // Start program according to gotten formats. switch { case videoFormat == "AV1" && audioFormat == "": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", videoFormat) // Create decoder. av1RTPDec, err := av1Format.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) } av1Dec := &formats.AV1Decoder{} err = av1Dec.Initialize() if err != nil { log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) } defer av1Dec.Close() // Setup media. _, err = c.Setup(desc.BaseURL, av1Media, 0, 0) if err != nil { return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) } firstRandomReceived := false // Process input rtp packets. c.OnPacketRTP(av1Media, av1Format, func(pkt *rtp.Packet) { // Process AV1 flow and return PTS and IMG. pts, img, err := formats.ProcessAV1(&c, av1Media, av1RTPDec, pkt, av1Dec, firstRandomReceived, videoFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", videoFormat, err) return } log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) case videoFormat == "" && audioFormat == "G711": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", audioFormat) // Create decoder. g711RTPDec, err := g711Format.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) } // Setup media. _, err = c.Setup(desc.BaseURL, g711Media, 0, 0) if err != nil { return fmt.Errorf("[%v]: setup media error: %w\n", audioFormat, err) } // Process input rtp packets. c.OnPacketRTP(g711Media, g711Format, func(pkt *rtp.Packet) { // Process G711 flow and return PTS and AU. pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, audioFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", audioFormat, err) return } // Decode samples (these are 16-bit, big endian LPCM samples). if g711Format.MULaw { g711.DecodeMulaw(au) } else { g711.DecodeAlaw(au) } log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(au)) }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) case videoFormat == "H264" && audioFormat == "AAC": // Wait for the next period. waitPeriod(period) log.Printf("[%v-%v]: start recording", videoFormat, audioFormat) // Create decoders. //h264RTPDec, err := h264Format.CreateDecoder() //if err != nil { // log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) //} // //aacRTPDec, err := aacFormat.CreateDecoder() //if err != nil { // log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) //} //// Setup MPEG-TS muxer. //currentMpegtsMuxer := formats.MpegtsMuxer{ // FileName: fn.SetNumNTime(), // H264Format: h264Format, // Mpeg4AudioFormat: aacFormat, //} //err = currentMpegtsMuxer.Initialize() //if err != nil { // return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) //} // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) if err != nil { return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err) } // Process input rtp packets. c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { switch forma.(type) { case *format.H264: //// Process H264 flow and return PTS and AU. //pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat) //if err != nil { // log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) // return //} // //// Encode the access unit into MPEG-TS. //err = currentMpegtsMuxer.WriteH264(pts, au) //if err != nil { // return //} case *format.MPEG4Audio: //// Process AAC flow and return PTS and AUS. //pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) //if err != nil { // log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) // return //} // //// Encode access units into MPEG-TS. //err = currentMpegtsMuxer.WriteAAC(aus, pts) //if err != nil { // return //} } }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { //// Logic for rotation files. //currentMpegtsMuxer.Close() //currentMpegtsMuxer.FileName = fn.SetNumNTime() // //err = currentMpegtsMuxer.Initialize() //if err != nil { // log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) // return //} // //log.Printf("[%v-%v]: new file for recording created: %v", // videoFormat, audioFormat, currentMpegtsMuxer.FileName) } }() panic(c.Wait()) case videoFormat == "H264" && audioFormat == "G711" || videoFormat == "H264" && audioFormat == "": // Wait for the next period. waitPeriod(period) log.Printf("[%v-%v]: start recording", videoFormat, audioFormat) // Create decoders. h264RTPDec, err := h264Format.CreateDecoder() if err != nil { log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) } g711RTPDec, err := g711Format.CreateDecoder() if err != nil { log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) } //// Setup MPEG-TS muxer. //var aacFormat *format.MPEG4Audio // //currentMpegtsMuxer := formats.MpegtsMuxer{ // FileName: fn.SetNumNTime(), // H264Format: h264Format, // Mpeg4AudioFormat: aacFormat, //} //err = currentMpegtsMuxer.Initialize() //if err != nil { // return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) //} // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) if err != nil { return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err) } file, err := os.Create(fn.SetNumNTime("insit")) if err != nil { fmt.Println("creating file error:", err) } defer file.Close() seg := storage.Segment{ Date: time.Now().Format("15-04-05_02-01-2006"), Duration: strconv.Itoa(period), Packet: storage.InterleavedPacket{}, } // Write StreamID. if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil { fmt.Println("write StreamID length error:", err) } if _, err := file.Write([]byte(cutURI)); err != nil { fmt.Println("write StreamID error:", err) } // Write header of the file. err = storage.WriteHeader(file, seg) if err != nil { return fmt.Errorf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err) } // Process input rtp packets. c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { switch f := forma.(type) { case *format.H264: // Process H264 flow and return PTS and AU. pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat) if err != nil { log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) return } if au != nil { // Add appropriate lines to the interleaved packet. seg.Packet.Type = storage.PacketTypeH264 seg.Packet.Pts = pts seg.Packet.H264AUs = au // Write segment with interleaved packets. if err := storage.WriteInterleavedPacket(file, seg); err != nil { fmt.Println("write segment error:", err) return } } //// Encode the access unit into MPEG-TS. //err = currentMpegtsMuxer.WriteH264(pts, au) //if err != nil { // return //} case *format.G711: // Process G711 flow and returns PTS and AU. pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, videoFormat) if err != nil { log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) return } if au != nil { // Convert G711 to LPCM. lpcmSamples := formats.ConvertG711ToLPCM(au, f.MULaw) // Add appropriate lines to the interleaved packet. seg.Packet.Type = storage.PacketTypeLPCM seg.Packet.Pts = pts seg.Packet.LPCMSamples = lpcmSamples // Write segment with interleaved packets. if err := storage.WriteInterleavedPacket(file, seg); err != nil { fmt.Println("write segment error:", err) return } } //// Convert G711 to AAC. //au, err = formats.ConvertLPCMToAAC(lpcmSamples) //if err != nil { // log.Printf("[%v-%v]: converting to AAC frame error: %v\n", videoFormat, audioFormat, err) //} // //// Encode the access unit into MPEG-TS. //err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pts) //if err != nil { // return } }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { /* // Logic for rotation files. currentMpegtsMuxer.Close() currentMpegtsMuxer.FileName = fn.SetNumNTime() err = currentMpegtsMuxer.Initialize() if err != nil { log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) return } */ file.Close() file, err = os.Create(fn.SetNumNTime("insit")) if err != nil { fmt.Println("creating file error:", err) } seg = storage.Segment{ Date: time.Now().Format("15-04-05_02-01-2006"), Duration: strconv.Itoa(period), Packet: storage.InterleavedPacket{}, } // Write StreamID. if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil { fmt.Println("write StreamID length error:", err) } if _, err := file.Write([]byte(cutURI)); err != nil { fmt.Println("write StreamID error:", err) } // Write header of the file. err = storage.WriteHeader(file, seg) if err != nil { log.Printf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err) } log.Printf("[%v-%v]: new file for recording created: %v", videoFormat, audioFormat, seg.Date+".insit") } }() panic(c.Wait()) case videoFormat == "H264-" && audioFormat == "": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", videoFormat) // Create decoder. h264RTPDec, err := h264Format.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) } // Setup H264 -> RGBA decoder. h264Dec := &formats.H264Decoder{} err = h264Dec.Initialize() if err != nil { log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) } defer h264Dec.Close() // if SPS and PPS are present into the SDP, send them to the decoder if h264Format.SPS != nil { h264Dec.Decode([][]byte{h264Format.SPS}) } if h264Format.PPS != nil { h264Dec.Decode([][]byte{h264Format.PPS}) } // Setup media. _, err = c.Setup(desc.BaseURL, h264Media, 0, 0) if err != nil { return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) } firstRandomAccess := false // Process input rtp packets. c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) { // Process H264 flow and return PTS and IMG. pts, img, err := formats.ProcessH264RGBA( &c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", videoFormat, err) return } log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) case videoFormat == "H265" && audioFormat == "": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", videoFormat) // Create decoder. h265RTPDec, err := h265Format.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) } // Setup H264 -> RGBA decoder. h265Dec := &formats.H265Decoder{} err = h265Dec.Initialize() if err != nil { log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) } defer h265Dec.Close() // If VPS, SPS and PPS are present into the SDP, send them to the decoder. if h265Format.VPS != nil { h265Dec.Decode([][]byte{h265Format.VPS}) } if h265Format.SPS != nil { h265Dec.Decode([][]byte{h265Format.SPS}) } if h265Format.PPS != nil { h265Dec.Decode([][]byte{h265Format.PPS}) } // Setup media. _, err = c.Setup(desc.BaseURL, h265Media, 0, 0) if err != nil { return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) } firstRandomAccess := false // Process input rtp packets. c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) { // Process H265 flow and return PTS and IMG. pts, img, err := formats.ProcessH265RGBA( &c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", videoFormat, err) return } log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) case videoFormat == "" && audioFormat == "LPCM": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", audioFormat) // Create decoder. lpcmRTPDec, err := lpcmFormat.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) } // Setup media. _, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0) if err != nil { log.Printf("[%v]: setup media error: %v\n", audioFormat, err) } // Process input rtp packets. c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) { // Process LPCM flow and return PTS and SAMPLES. pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", audioFormat, err) return } log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples)) }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) case videoFormat == "MJPEG" && audioFormat == "": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", audioFormat) // Create decoder. mjpegRTPDec, err := mjpegFormat.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) } // Setup media. _, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0) if err != nil { return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) } // Process input rtp packets. c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) { // Process MJPEG flow and return PTS and IMG. pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", videoFormat, err) return } log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) case videoFormat == "" && audioFormat == "AAC": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", audioFormat) // Create decoder. aacRTPDec, err := aacFormat.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) } // Setup media. _, err = c.Setup(desc.BaseURL, aacMedia, 0, 0) if err != nil { return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) } // Process input rtp packets. c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) { // Process AAC flow and return PTS and AUS. pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", audioFormat, err) return } for _, au := range aus { log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au)) } }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) case videoFormat == "" && audioFormat == "OPUS": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", audioFormat) // Create decoder. opusRTPDec, err := opusFormat.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) } // Setup media. _, err = c.Setup(desc.BaseURL, opusMedia, 0, 0) if err != nil { return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) } // Process input rtp packets. c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) { // Process OPUS flow and return PTS and OP. pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", audioFormat, err) return } log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op)) }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) case videoFormat == "VP8" && audioFormat == "": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", videoFormat) // Create decoder. vp8RTPDec, err := vp8Format.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) } // Setup VP8 -> RGBA decoder. vp8Dec := &formats.VPDecoder{} err = vp8Dec.Initialize() if err != nil { log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) } defer vp8Dec.Close() // Setup media. _, err = c.Setup(desc.BaseURL, vp8Media, 0, 0) if err != nil { return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) } // Process input rtp packets. c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) { // Process VP8 flow and return PTS and IMG. pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", audioFormat, err) return } log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) case videoFormat == "VP9" && audioFormat == "": // Wait for the next period. waitPeriod(period) log.Printf("[%v]: start recording", videoFormat) // Create decoder. vp9RTPDec, err := vp9Format.CreateDecoder() if err != nil { log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) } // Setup VP9 -> RGBA decoder. vp9Dec := &formats.VPDecoder{} err = vp9Dec.Initialize() if err != nil { log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) } defer vp9Dec.Close() // Setup media. _, err = c.Setup(desc.BaseURL, vp9Media, 0, 0) if err != nil { return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) } // Process input rtp packets. c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) { // Process VP9 flow and return PTS and IMG. pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat) if err != nil { log.Printf("[%v]: process packet error: %v\n", audioFormat, err) return } log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) }) // Start playing. _, err = c.Play(nil) if err != nil { return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } log.Println("New file for recording created:", currentMpegtsMuxer.fileName) */ } }() panic(c.Wait()) } return nil } // changeDomain changes domain if a camera was flipped to another domain. func changeDomain(dir string, period int, link string, err error) error { err2 := errors.New("404 (Not found)") if errors.As(err, &err2) { if strings.Contains(link, "video-1") { err = RTSP(dir, period, strings.Replace(link, "video-1", "video-2", 1)) if err != nil { return err } } else { err = RTSP(dir, period, strings.Replace(link, "video-2", "video-1", 1)) if err != nil { return err } } } else { panic(err) } return nil }