package rtsp import ( "encoding/binary" "errors" "fmt" "log" "os" "strconv" "strings" "time" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/config" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats" logger "git.insit.tech/psa/rtsp_reader-writer/writer/internal/log" "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage" log2 "git.insit.tech/sas/rtsp_proxy/core/log" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" "github.com/pion/rtp" "go.uber.org/zap" ) // StartWriter starts the program. func StartWriter() { cams, err := config.ParseCamerasYAML() if err != nil { logger.Log.Fatal("func ParseCamerasYAML error:", zap.Error(err)) log.Println("func ParseCamerasYAML error: ") } for _, link := range cams { logger.Log.Info("start process camera:", zap.String("link", link)) log.Println("start process camera: ", link) go func() { err = rtsp(config.Local, 60, link, -1) if err != nil { logger.Log.Error("procRTSP function error for camera:", zap.String("link", link), zap.Error(err)) log.Println("procRTSP function error for camera: ", link) } }() } } // rtsp processes RTSP protocol. func rtsp(dir string, period int, link string, number int) error { // Create data folder in the directory. dirData := log2.DirCreator(dir, "data") // Create logger. cutURI := LastPartURI(link) cam := log2.CamLogging(fmt.Sprintf("%s/%s/writer-cam_%s.log", dirData, cutURI, time.Now().Format("15-04-05_02-01-2006"))) // Connect to the server. c := gortsplib.Client{ UserAgent: "PSA", } u, err := base.ParseURL(link) if err != nil { cam.Error("parse URL error:", zap.Error(err)) return err } err = c.Start(u.Scheme, u.Host) if err != nil { cam.Error("connect to the server error:", zap.Error(err)) return err } defer c.Close() desc, res, err := c.Describe(u) if err != nil || desc == nil { cam.Error("medias not found for camera:", zap.Error(err)) log.Println("medias not found for camera: ", cutURI) changeDomain(dir, period, link, number+1, cam, err) return nil } // Create file name structure and directory for files. resolution := findResolution(res.Body) resolutions := []string{resolution} fn := storage.CreateFileName(dirData, resolutions, cutURI, period, number) err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755) if err != nil { cam.Error("mkdirall error: %w", zap.Error(err)) return err } // Find formats. var audioFormat string var videoFormat string av1Format, av1Media, err := formats.FindAV1Format(desc) if av1Format != nil { videoFormat = "AV1" cam.Info("AV1 format found") } if err != nil { cam.Info("func FindAV1Format:", zap.Error(err)) } g711Format, g711Media, err := formats.FindG711Format(desc) if g711Format != nil { audioFormat = "G711" cam.Info("G711 format found") } if err != nil { cam.Info("func FindG711Format:", zap.Error(err)) } h264Format, h264Media, err := formats.FindH264Format(desc) if h264Format != nil { videoFormat = "H264" cam.Info("H264 format found") } if err != nil { cam.Info("func FindH264Format:", zap.Error(err)) } aacFormat, aacMedia, err := formats.FindAACFormat(desc) if aacFormat != nil { audioFormat = "AAC" cam.Info("AAC format found") } if err != nil { cam.Info("func FindAACFormat:", zap.Error(err)) } h265Format, h265Media, err := formats.FindH265Format(desc) if h265Format != nil { videoFormat = "H265" cam.Info("H265 format found") } if err != nil { cam.Info("func FindH265Format:", zap.Error(err)) } lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc) if lpcmFormat != nil { audioFormat = "LPCM" cam.Info("LPCM format found") } if err != nil { cam.Info("func FindLPCMFormat:", zap.Error(err)) } mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc) if mjpegFormat != nil { videoFormat = "MJPEG" cam.Info("MJPEG format found") } if err != nil { cam.Info("func FindLPCMFormat:", zap.Error(err)) } opusFormat, opusMedia, err := formats.FindOPUSFormat(desc) if opusFormat != nil { audioFormat = "OPUS" cam.Info("OPUS format found") } if err != nil { cam.Info("func FindOPUSFormat:", zap.Error(err)) } vp8Format, vp8Media, err := formats.FindVP8Format(desc) if vp8Format != nil { videoFormat = "VP8" cam.Info("VP8 format found") } if err != nil { cam.Info("func FindVP8Format:", zap.Error(err)) } vp9Format, vp9Media, err := formats.FindVP9Format(desc) if vp9Format != nil { videoFormat = "VP9" cam.Info("VP9 format found") } if err != nil { cam.Info("func FindVP9Format:", zap.Error(err)) } // Start program according to gotten formats. switch { case videoFormat == "AV1" && audioFormat == "": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. av1RTPDec, err := av1Format.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } av1Dec := &formats.AV1Decoder{} err = av1Dec.Initialize() if err != nil { cam.Error("init decoder error:", zap.Error(err)) return err } defer av1Dec.Close() // Setup media. _, err = c.Setup(desc.BaseURL, av1Media, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } firstRandomReceived := false // Process input rtp packets. c.OnPacketRTP(av1Media, av1Format, func(pkt *rtp.Packet) { // Process AV1 flow and return PTS and IMG. pts, _, err := formats.ProcessAV1(&c, av1Media, av1RTPDec, pkt, av1Dec, firstRandomReceived) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "" && audioFormat == "G711": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. g711RTPDec, err := g711Format.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } // Setup media. _, err = c.Setup(desc.BaseURL, g711Media, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } // Process input rtp packets. c.OnPacketRTP(g711Media, g711Format, func(pkt *rtp.Packet) { // Process G711 flow and return PTS and AU. pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } // Decode samples (these are 16-bit, big endian LPCM samples). if g711Format.MULaw { g711.DecodeMulaw(au) } else { g711.DecodeAlaw(au) } cam.Info("decoded audio samples:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "H264" && audioFormat == "AAC": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoders. //h264RTPDec, err := h264Format.CreateDecoder() //if err != nil { // cam.Error("create decoder error:", zap.Error(err)) //} // //aacRTPDec, err := aacFormat.CreateDecoder() //if err != nil { // cam.Error("create decoder error:", zap.Error(err)) //} //// Setup MPEG-TS muxer. //currentMpegtsMuxer := formats.MpegtsMuxer{ // FileName: fn.SetNumNTime(), // H264Format: h264Format, // Mpeg4AudioFormat: aacFormat, //} //err = currentMpegtsMuxer.Initialize() //if err != nil { // return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) //} // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } // Process input rtp packets. c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { switch forma.(type) { case *format.H264: //// Process H264 flow and return PTS and AU. //pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat) //if err != nil { // cam.Warn("process packet error:", zap.Error(err)) //} // //// Encode the access unit into MPEG-TS. //err = currentMpegtsMuxer.WriteH264(pts, au) //if err != nil { // return //} case *format.MPEG4Audio: //// Process AAC flow and return PTS and AUS. //pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) //if err != nil { // cam.Warn("process packet error:", zap.Error(err)) //} // //// Encode access units into MPEG-TS. //err = currentMpegtsMuxer.WriteAAC(aus, pts) //if err != nil { // return //} } }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { //// Logic for rotation files. //currentMpegtsMuxer.Close() //currentMpegtsMuxer.FileName = fn.SetNumNTime() // //err = currentMpegtsMuxer.Initialize() //if err != nil { // log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) // return //} // // cam.Info("new file for recording created") // log.Println("new file for recording created") } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "H264" && audioFormat == "G711" || videoFormat == "H264" && audioFormat == "": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoders. h264RTPDec, err := h264Format.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } g711RTPDec, err := g711Format.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } //// Setup MPEG-TS muxer. //var aacFormat *format.MPEG4Audio // //currentMpegtsMuxer := formats.MpegtsMuxer{ // FileName: fn.SetNumNTime(), // H264Format: h264Format, // Mpeg4AudioFormat: aacFormat, //} //err = currentMpegtsMuxer.Initialize() //if err != nil { // return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) //} // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } file, err := os.Create(fn.SetNumNTime("insit")) if err != nil { cam.Error("creating file error:", zap.Error(err)) return err } defer file.Close() seg := storage.Segment{ Date: time.Now().Format("15-04-05_02-01-2006"), Duration: strconv.Itoa(period), Packet: storage.InterleavedPacket{}, } // Write StreamID. if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil { cam.Error("write StreamID length error:", zap.Error(err)) return err } if _, err := file.Write([]byte(cutURI)); err != nil { cam.Error("write StreamID error:", zap.Error(err)) return err } // Write header of the file. err = storage.WriteHeader(file, seg) if err != nil { cam.Error("write header error:", zap.Error(err)) return err } // Process input rtp packets. c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { switch f := forma.(type) { case *format.H264: // Process H264 flow and return PTS and AU. pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } if au != nil { // Add appropriate lines to the interleaved packet. seg.Packet.Type = storage.PacketTypeH264 seg.Packet.Pts = pts seg.Packet.H264AUs = au // Write segment with interleaved packets. if err := storage.WriteInterleavedPacket(file, seg); err != nil { cam.Error("write segment error:", zap.Error(err)) return } } //// Encode the access unit into MPEG-TS. //err = currentMpegtsMuxer.WriteH264(pts, au) //if err != nil { // return //} case *format.G711: // Process G711 flow and returns PTS and AU. pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } if au != nil { // Convert G711 to LPCM. lpcmSamples := formats.ConvertG711ToLPCM(au, f.MULaw) // Add appropriate lines to the interleaved packet. seg.Packet.Type = storage.PacketTypeLPCM seg.Packet.Pts = pts seg.Packet.LPCMSamples = lpcmSamples // Write segment with interleaved packets. if err := storage.WriteInterleavedPacket(file, seg); err != nil { cam.Error("write segment error:", zap.Error(err)) return } } //// Convert G711 to AAC. //au, err = formats.ConvertLPCMToAAC(lpcmSamples) //if err != nil { // log.Printf("[%v-%v]: converting to AAC frame error: %v\n", videoFormat, audioFormat, err) //} // //// Encode the access unit into MPEG-TS. //err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pts) //if err != nil { // return } }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { /* // Logic for rotation files. currentMpegtsMuxer.Close() currentMpegtsMuxer.FileName = fn.SetNumNTime() err = currentMpegtsMuxer.Initialize() if err != nil { log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) return } */ file.Close() file, err = os.Create(fn.SetNumNTime("insit")) if err != nil { cam.Error("creating file error:", zap.Error(err)) logger.Log.Error("creating file error:", zap.Error(err)) return } seg = storage.Segment{ Date: time.Now().Format("15-04-05_02-01-2006"), Duration: strconv.Itoa(period), Packet: storage.InterleavedPacket{}, } // Write StreamID. if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil { cam.Error("write StreamID length error:", zap.Error(err)) logger.Log.Error("write StreamID length error:", zap.Error(err)) return } if _, err := file.Write([]byte(cutURI)); err != nil { cam.Error("write StreamID error:", zap.Error(err)) logger.Log.Error("write StreamID error:", zap.Error(err)) return } // Write header of the file. err = storage.WriteHeader(file, seg) if err != nil { cam.Error("write header error:", zap.Error(err)) logger.Log.Error("write header error:", zap.Error(err)) return } cam.Info("new file for recording created") log.Println("new file for recording created") } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "H264-" && audioFormat == "": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. h264RTPDec, err := h264Format.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } // Setup H264 -> RGBA decoder. h264Dec := &formats.H264Decoder{} err = h264Dec.Initialize() if err != nil { cam.Error("init decoder error:", zap.Error(err)) return err } defer h264Dec.Close() // if SPS and PPS are present into the SDP, send them to the decoder if h264Format.SPS != nil { h264Dec.Decode([][]byte{h264Format.SPS}) } if h264Format.PPS != nil { h264Dec.Decode([][]byte{h264Format.PPS}) } // Setup media. _, err = c.Setup(desc.BaseURL, h264Media, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } firstRandomAccess := false // Process input rtp packets. c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) { // Process H264 flow and return PTS and IMG. pts, _, err := formats.ProcessH264RGBA( &c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "H265" && audioFormat == "": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. h265RTPDec, err := h265Format.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } // Setup H264 -> RGBA decoder. h265Dec := &formats.H265Decoder{} err = h265Dec.Initialize() if err != nil { cam.Error("init decoder error:", zap.Error(err)) return err } defer h265Dec.Close() // If VPS, SPS and PPS are present into the SDP, send them to the decoder. if h265Format.VPS != nil { h265Dec.Decode([][]byte{h265Format.VPS}) } if h265Format.SPS != nil { h265Dec.Decode([][]byte{h265Format.SPS}) } if h265Format.PPS != nil { h265Dec.Decode([][]byte{h265Format.PPS}) } // Setup media. _, err = c.Setup(desc.BaseURL, h265Media, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } firstRandomAccess := false // Process input rtp packets. c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) { // Process H265 flow and return PTS and IMG. pts, _, err := formats.ProcessH265RGBA(&c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "" && audioFormat == "LPCM": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. lpcmRTPDec, err := lpcmFormat.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } // Setup media. _, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } // Process input rtp packets. c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) { // Process LPCM flow and return PTS and SAMPLES. pts, _, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } cam.Info("decoded audio samples:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "MJPEG" && audioFormat == "": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. mjpegRTPDec, err := mjpegFormat.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } // Setup media. _, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } // Process input rtp packets. c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) { // Process MJPEG flow and return PTS and IMG. pts, _, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "" && audioFormat == "AAC": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. aacRTPDec, err := aacFormat.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } // Setup media. _, err = c.Setup(desc.BaseURL, aacMedia, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } // Process input rtp packets. c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) { // Process AAC flow and return PTS and AUS. pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } for _, _ = range aus { cam.Info("received access unit:", zap.String("PTS", strconv.FormatInt(pts, 10))) } }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "" && audioFormat == "OPUS": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. opusRTPDec, err := opusFormat.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } // Setup media. _, err = c.Setup(desc.BaseURL, opusMedia, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } // Process input rtp packets. c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) { // Process OPUS flow and return PTS and OP. pts, _, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } cam.Info("received OPUS packet:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "VP8" && audioFormat == "": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. vp8RTPDec, err := vp8Format.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } // Setup VP8 -> RGBA decoder. vp8Dec := &formats.VPDecoder{} err = vp8Dec.Initialize() if err != nil { cam.Error("init decoder error:", zap.Error(err)) return err } defer vp8Dec.Close() // Setup media. _, err = c.Setup(desc.BaseURL, vp8Media, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } // Process input rtp packets. c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) { // Process VP8 flow and return PTS and IMG. pts, _, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } case videoFormat == "VP9" && audioFormat == "": // Wait for the next period. waitPeriod(period, cam) cam.Info("start recording") // Create decoder. vp9RTPDec, err := vp9Format.CreateDecoder() if err != nil { cam.Error("create decoder error:", zap.Error(err)) return err } // Setup VP9 -> RGBA decoder. vp9Dec := &formats.VPDecoder{} err = vp9Dec.Initialize() if err != nil { cam.Error("init decoder error:", zap.Error(err)) return err } defer vp9Dec.Close() // Setup media. _, err = c.Setup(desc.BaseURL, vp9Media, 0, 0) if err != nil { cam.Error("setup media error:", zap.Error(err)) return err } // Process input rtp packets. c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) { // Process VP9 flow and return PTS and IMG. pts, _, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt) if err != nil { cam.Warn("process packet error:", zap.Error(err)) } cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { cam.Error("sending PLAY request error:", zap.Error(err)) return err } // Set program if the client gets request. c.OnRequest = func(req *base.Request) { switch req.Method { case base.Teardown: cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String())) case base.Method(rune(base.StatusRequestTimeout)): cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String())) case base.GetParameter: cam.Info("got Get_Parameter from server:", zap.String("request", req.String())) case "TCP timeout": cam.Warn("got TCP timeout from server:", zap.String("request", req.String())) case "EOF": cam.Warn("got EOF from server:", zap.String("request", req.String())) default: cam.Warn("got method from server:", zap.String("request", req.String())) } } // Create ticker for rotation files. ticker := time.NewTicker(time.Duration(period) * time.Second) defer ticker.Stop() // Rotate files. go func() { for range ticker.C { // Logic for rotation files. /* currentMpegtsMuxer.close() currentMpegtsMuxer.fileName = fn.SetNumNTime() err = currentMpegtsMuxer.initialize() if err != nil { panic(err) } cam.Info("new file for recording created") log.Println("new file for recording created") */ } }() if err = c.Wait(); err != nil { rtsp(dir, period, link, fn.Number+1) cam.Error("c.Wait() error:", zap.Error(c.Wait())) } } return nil } // changeDomain changes domain if a camera was flipped to another domain. func changeDomain(dir string, period int, link string, number int, cam *zap.Logger, err error) { err2 := errors.New("404 (Not found)") if errors.As(err, &err2) { if strings.Contains(link, "video-1") { err = rtsp(dir, period, strings.Replace(link, "video-1", "video-2", 1), number) if err != nil { cam.Error("changeDomain rtsp error:", zap.Error(err)) logger.Log.Error("changeDomain rtsp error:", zap.Error(err)) log.Println("changeDomain rtsp error for camera: ", link) return } } else { err = rtsp(dir, period, strings.Replace(link, "video-2", "video-1", 1), number) if err != nil { cam.Error("changeDomain rtsp error:", zap.Error(err)) logger.Log.Error("changeDomain rtsp error:", zap.Error(err)) log.Println("changeDomain rtsp error for camera: ", link) return } } } }