diff --git a/writer/cmd/main.go b/writer/cmd/main.go index 897b60c..b59b9c6 100644 --- a/writer/cmd/main.go +++ b/writer/cmd/main.go @@ -1,35 +1,20 @@ package main import ( - "flag" "fmt" + logger "git.insit.tech/psa/rtsp_reader-writer/writer/internal/log" + "time" + "git.insit.tech/psa/rtsp_reader-writer/writer/internal/config" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/rtsp" - "git.insit.tech/sas/rtsp_proxy/core/log" + log2 "git.insit.tech/sas/rtsp_proxy/core/log" ) func main() { - directory := flag.String("dir", "/home/psa/GoRepository", "directory") - flag.Parse() + config.LogsDirectory = log2.DirCreator(config.Local, "logs") + logger.Log = log2.MainLogging( + fmt.Sprintf("%s/Main_%s.log", config.LogsDirectory, time.Now().Format("15-04-05_02-01-2006"))) - // Parse camera links from YAML file into struct Cameras. - cams, err := config.ParseCamerasYAML(*directory) - if err != nil { - panic(err) - } - - Log := log.MainLogging("/home/psa/GoRepository/" + "/data/" + "camera54-centr-kirova-kalinina") - - // Connect to each camera. - for _, link := range cams { - fmt.Printf("process camera:\n %s\n", link) - - go func() { - err = rtsp.StartWriter(*directory, 60, link, Log) - if err != nil { - fmt.Printf("procRTSP function error for camera %s: %s", link, err.Error()) - } - }() - } + rtsp.StartWriter() select {} } diff --git a/writer/go.sum b/writer/go.sum index 349d664..70daf4c 100644 --- a/writer/go.sum +++ b/writer/go.sum @@ -24,8 +24,6 @@ github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s= -github.com/grafov/m3u8 v0.12.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= @@ -48,8 +46,6 @@ github.com/youpy/go-wav v0.3.2 h1:NLM8L/7yZ0Bntadw/0h95OyUsen+DQIVf9gay+SUsMU= github.com/youpy/go-wav v0.3.2/go.mod h1:0FCieAXAeSdcxFfwLpRuEo0PFmAoc+8NU34h7TUvk50= github.com/zaf/g711 v1.4.0 h1:XZYkjjiAg9QTBnHqEg37m2I9q3IIDv5JRYXs2N8ma7c= github.com/zaf/g711 v1.4.0/go.mod h1:eCDXt3dSp/kYYAoooba7ukD/Q75jvAaS4WOMr0l1Roo= -github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2 h1:0iAY2pL6yYhNYpdc1DbFq0p7ocyu5MlgKmkealhz3nk= -github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2/go.mod h1:c8Gxxfmh0jmZ6G+ISlpa315WBVkzd8mEhu6gN9mn5Qg= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= diff --git a/writer/internal/config/source.yaml b/writer/internal/config/cameras.yaml similarity index 100% rename from writer/internal/config/source.yaml rename to writer/internal/config/cameras.yaml diff --git a/writer/internal/config/config.go b/writer/internal/config/config.go new file mode 100644 index 0000000..b303d14 --- /dev/null +++ b/writer/internal/config/config.go @@ -0,0 +1,7 @@ +package config + +var ( + Local = "reader-writer" + LogsDirectory string + Cameras = "/home/psa/GoRepository/rtsp_reader-writer/writer/internal/config/cameras.yaml" +) diff --git a/writer/internal/config/parser.go b/writer/internal/config/parser.go index 69a3b94..3e79f75 100644 --- a/writer/internal/config/parser.go +++ b/writer/internal/config/parser.go @@ -6,10 +6,10 @@ import ( ) // ParseCamerasYAML parses camera links from YAML file into struct Cameras. -func ParseCamerasYAML(dir string) (map[string]string, error) { +func ParseCamerasYAML() (map[string]string, error) { var CamerasYAML map[string]string - data, err := os.ReadFile(dir + "/rtsp_reader-writer/writer/internal/config/source.yaml") + data, err := os.ReadFile(Cameras) if err != nil { return CamerasYAML, err } diff --git a/writer/internal/ingest/formats/aac.go b/writer/internal/ingest/formats/aac.go index 5ce032c..b40619b 100644 --- a/writer/internal/ingest/formats/aac.go +++ b/writer/internal/ingest/formats/aac.go @@ -27,7 +27,6 @@ func ProcessAAC( aacMedia *description.Media, aacRTPDec *rtpmpeg4audio.Decoder, pkt *rtp.Packet, - t string, ) ( int64, [][]byte, @@ -35,13 +34,13 @@ func ProcessAAC( // Decode timestamp. pts, ok := c.PacketPTS2(aacMedia, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract access unit from RTP packets. aus, err := aacRTPDec.Decode(pkt) if err != nil { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } return pts, aus, nil diff --git a/writer/internal/ingest/formats/av1.go b/writer/internal/ingest/formats/av1.go index 962228e..9c35641 100644 --- a/writer/internal/ingest/formats/av1.go +++ b/writer/internal/ingest/formats/av1.go @@ -32,7 +32,6 @@ func ProcessAV1( pkt *rtp.Packet, av1Dec *AV1Decoder, firstRandomReceived bool, - t string, ) ( int64, *image.RGBA, @@ -40,31 +39,31 @@ func ProcessAV1( // Decode timestamp. pts, ok := c.PacketPTS2(av1Media, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract AV1 temporal units from RTP packets. tu, err := av1RTPDec.Decode(pkt) if err != rtpav1.ErrNonStartingPacketAndNoPrevious && err != rtpav1.ErrMorePacketsNeeded { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } // Wait for a random access unit. IsRandomAccess2, _ := av1.IsRandomAccess(tu) if !firstRandomReceived && !IsRandomAccess2 { - return 0, nil, fmt.Errorf("[%v]: waiting for a random access unit\n", t) + return 0, nil, fmt.Errorf("waiting for a random access unit\n") } firstRandomReceived = true // Convert AV1 temporal units into RGBA frames. img, err := av1Dec.Decode(tu) if err != nil { - return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + return 0, nil, fmt.Errorf("convert into RGBA frames error\n") } // Wait for a frame. if img == nil { - return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + return 0, nil, fmt.Errorf("frame not found\n") } return pts, img, nil diff --git a/writer/internal/ingest/formats/g711.go b/writer/internal/ingest/formats/g711.go index d0bcea3..26d6cac 100644 --- a/writer/internal/ingest/formats/g711.go +++ b/writer/internal/ingest/formats/g711.go @@ -22,19 +22,19 @@ func FindG711Format(desc *description.Session) (*format.G711, *description.Media } // ProcessG711 processes G711 flow and returns PTS and AU. -func ProcessG711(c *gortsplib.Client, g711Media *description.Media, g711RTPDec *rtplpcm.Decoder, pkt *rtp.Packet, t string, +func ProcessG711(c *gortsplib.Client, g711Media *description.Media, g711RTPDec *rtplpcm.Decoder, pkt *rtp.Packet, ) ( int64, []byte, error) { // Decode timestamp. pts, ok := c.PacketPTS2(g711Media, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract access unit from RTP packets. au, err := g711RTPDec.Decode(pkt) if err != nil { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } return pts, au, nil diff --git a/writer/internal/ingest/formats/h264.go b/writer/internal/ingest/formats/h264.go index 9cb6670..eb30624 100644 --- a/writer/internal/ingest/formats/h264.go +++ b/writer/internal/ingest/formats/h264.go @@ -28,7 +28,6 @@ func ProcessH264( h264Media *description.Media, h264RTPDec *rtph264.Decoder, pkt *rtp.Packet, - t string, ) ( int64, [][]byte, @@ -36,14 +35,14 @@ func ProcessH264( // Decode timestamp. pts, ok := c.PacketPTS2(h264Media, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract access unit from RTP packets. au, err := h264RTPDec.Decode(pkt) if err != nil { if err != rtph264.ErrNonStartingPacketAndNoPrevious && err != rtph264.ErrMorePacketsNeeded { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } } @@ -58,7 +57,6 @@ func ProcessH264RGBA( h264Dec *H264Decoder, pkt *rtp.Packet, firstRandomAccess bool, - t string, ) ( int64, *image.RGBA, @@ -66,13 +64,13 @@ func ProcessH264RGBA( // Decode timestamp. pts, ok := c.PacketPTS2(h264Media, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract access units from RTP packets. au, err := h264RTPDec.Decode(pkt) if err != rtph264.ErrNonStartingPacketAndNoPrevious && err != rtph264.ErrMorePacketsNeeded { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } // Wait for a random access unit. @@ -84,12 +82,12 @@ func ProcessH264RGBA( // Convert H264 access units into RGBA frames. img, err := h264Dec.Decode(au) if err != nil { - return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + return 0, nil, fmt.Errorf("convert into RGBA frames error\n") } // Wait for a frame. if img == nil { - return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + return 0, nil, fmt.Errorf("frame not found\n") } return pts, img, nil diff --git a/writer/internal/ingest/formats/h265.go b/writer/internal/ingest/formats/h265.go index 8718fa7..b44d61d 100644 --- a/writer/internal/ingest/formats/h265.go +++ b/writer/internal/ingest/formats/h265.go @@ -31,7 +31,6 @@ func ProcessH265RGBA( h265Dec *H265Decoder, pkt *rtp.Packet, firstRandomAccess bool, - t string, ) ( int64, *image.RGBA, @@ -39,13 +38,13 @@ func ProcessH265RGBA( // Decode timestamp. pts, ok := c.PacketPTS2(h265Media, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract access units from RTP packets. au, err := h265RTPDec.Decode(pkt) if err != rtph265.ErrNonStartingPacketAndNoPrevious && err != rtph265.ErrMorePacketsNeeded { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } // Wait for a random access unit. @@ -57,12 +56,12 @@ func ProcessH265RGBA( // Convert H265 access units into RGBA frames. img, err := h265Dec.Decode(au) if err != nil { - return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + return 0, nil, fmt.Errorf("convert into RGBA frames error\n") } // Wait for a frame. if img == nil { - return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + return 0, nil, fmt.Errorf("frame not found\n") } return pts, img, nil diff --git a/writer/internal/ingest/formats/lpcm.go b/writer/internal/ingest/formats/lpcm.go index 25471d0..d336f02 100644 --- a/writer/internal/ingest/formats/lpcm.go +++ b/writer/internal/ingest/formats/lpcm.go @@ -27,7 +27,6 @@ func ProcessLPCM( lpcmMedia *description.Media, lpcmRTPDec *rtplpcm.Decoder, pkt *rtp.Packet, - t string, ) ( int64, []byte, @@ -35,13 +34,13 @@ func ProcessLPCM( // Decode timestamp. pts, ok := c.PacketPTS2(lpcmMedia, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract LPCM samples from RTP packets. samples, err := lpcmRTPDec.Decode(pkt) if err != nil { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } return pts, samples, err diff --git a/writer/internal/ingest/formats/mjpeg.go b/writer/internal/ingest/formats/mjpeg.go index 15e4b83..61aea45 100644 --- a/writer/internal/ingest/formats/mjpeg.go +++ b/writer/internal/ingest/formats/mjpeg.go @@ -30,7 +30,6 @@ func ProcessMJPEGRGBA( mjpegMedia *description.Media, mjpegRTPDec *rtpmjpeg.Decoder, pkt *rtp.Packet, - t string, ) ( int64, image.Image, @@ -38,19 +37,19 @@ func ProcessMJPEGRGBA( // Decode timestamp. pts, ok := c.PacketPTS2(mjpegMedia, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract JPEG images from RTP packets. enc, err := mjpegRTPDec.Decode(pkt) if err != rtpmjpeg.ErrNonStartingPacketAndNoPrevious && err != rtpmjpeg.ErrMorePacketsNeeded { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } // Convert JPEG images into RGBA frames. img, err := jpeg.Decode(bytes.NewReader(enc)) if err != nil { - return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + return 0, nil, fmt.Errorf("convert into RGBA frames error\n") } return pts, img, nil diff --git a/writer/internal/ingest/formats/opus.go b/writer/internal/ingest/formats/opus.go index b446ee1..6a889a2 100644 --- a/writer/internal/ingest/formats/opus.go +++ b/writer/internal/ingest/formats/opus.go @@ -27,7 +27,6 @@ func ProcessOPUS( opusMedia *description.Media, opusRTPDec *rtpsimpleaudio.Decoder, pkt *rtp.Packet, - t string, ) ( int64, []byte, @@ -35,13 +34,13 @@ func ProcessOPUS( // Decode timestamp. pts, ok := c.PacketPTS2(opusMedia, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract Opus packets from RTP packets. op, err := opusRTPDec.Decode(pkt) if err != nil { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } return pts, op, nil diff --git a/writer/internal/ingest/formats/vp8.go b/writer/internal/ingest/formats/vp8.go index 3c1efde..3ebb13b 100644 --- a/writer/internal/ingest/formats/vp8.go +++ b/writer/internal/ingest/formats/vp8.go @@ -29,7 +29,6 @@ func ProcessVP8RGBA( vp8RTPDec *rtpvp8.Decoder, vp8Dec *VPDecoder, pkt *rtp.Packet, - t string, ) ( int64, *image.RGBA, @@ -37,24 +36,24 @@ func ProcessVP8RGBA( // Decode timestamp. pts, ok := c.PacketPTS2(vp8Media, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract access units from RTP packets. au, err := vp8RTPDec.Decode(pkt) if err != rtpvp8.ErrNonStartingPacketAndNoPrevious && err != rtpvp8.ErrMorePacketsNeeded { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } // Convert VP8 access units into RGBA frames. img, err := vp8Dec.Decode(au) if err != nil { - return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + return 0, nil, fmt.Errorf("convert into RGBA frames error\n") } // Wait for a frame. if img == nil { - return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + return 0, nil, fmt.Errorf("frame not found\n") } return pts, img, nil diff --git a/writer/internal/ingest/formats/vp9.go b/writer/internal/ingest/formats/vp9.go index 842e4eb..fb4d724 100644 --- a/writer/internal/ingest/formats/vp9.go +++ b/writer/internal/ingest/formats/vp9.go @@ -30,7 +30,6 @@ func ProcessVP9RGBA( vp9RTPDec *rtpvp9.Decoder, vp9Dec *VPDecoder, pkt *rtp.Packet, - t string, ) ( int64, *image.RGBA, @@ -38,24 +37,24 @@ func ProcessVP9RGBA( // Decode timestamp. pts, ok := c.PacketPTS2(vp9Media, pkt) if !ok { - return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + return 0, nil, fmt.Errorf("waiting for timestamp\n") } // Extract access units from RTP packets. au, err := vp9RTPDec.Decode(pkt) if err != rtpvp8.ErrNonStartingPacketAndNoPrevious && err != rtpvp8.ErrMorePacketsNeeded { - return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + return 0, nil, fmt.Errorf("decoding RTP packet error: %w", err) } // Convert VP8 access units into RGBA frames. img, err := vp9Dec.Decode(au) if err != nil { - return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + return 0, nil, fmt.Errorf("convert into RGBA frames error\n") } // Wait for a frame. if img == nil { - return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + return 0, nil, fmt.Errorf("frame not found\n") } return pts, img, nil diff --git a/writer/internal/ingest/rtsp/operator.go b/writer/internal/ingest/rtsp/operator.go index c59d3b5..85e53cd 100644 --- a/writer/internal/ingest/rtsp/operator.go +++ b/writer/internal/ingest/rtsp/operator.go @@ -4,22 +4,25 @@ import ( "log" "strings" "time" + + "go.uber.org/zap" ) -// cutURI returns the last part of the URI after "/". -func cutURI(URI string) (CutURI string) { - splitted := strings.Split(URI, "/") - return splitted[len(splitted)-1] +// lastPartURI returns the last part of the URI after "/". +func lastPartURI(URI string) (CutURI string) { + split := strings.Split(URI, "/") + return split[len(split)-1] } // waitPeriod waits for the next period. -func waitPeriod(period int) { +func waitPeriod(period int, cam *zap.Logger) { periodTD := time.Duration(period) * time.Second now := time.Now() nextSegment := now.Truncate(periodTD).Add(periodTD) waitDuration := nextSegment.Sub(now) - log.Printf("waiting for start recording: %v\n", waitDuration) + cam.Info("waiting for start recording:", zap.Duration("time", waitDuration)) + log.Println("waiting for start recording: ", waitDuration) time.Sleep(waitDuration) } diff --git a/writer/internal/ingest/rtsp/rtsp.go b/writer/internal/ingest/rtsp/rtsp.go index c797614..29cff4a 100644 --- a/writer/internal/ingest/rtsp/rtsp.go +++ b/writer/internal/ingest/rtsp/rtsp.go @@ -4,29 +4,54 @@ import ( "encoding/binary" "errors" "fmt" - "go.uber.org/zap" "log" "os" "strconv" "strings" "time" + "git.insit.tech/psa/rtsp_reader-writer/writer/internal/config" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats" + logger "git.insit.tech/psa/rtsp_reader-writer/writer/internal/log" "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage" + log2 "git.insit.tech/sas/rtsp_proxy/core/log" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" "github.com/pion/rtp" + "go.uber.org/zap" ) // StartWriter starts the program. -func StartWriter(dir string, period int, link string, Log *zap.Logger) error { - err := RTSP(dir, period, link, Log) +func StartWriter() { + cams, err := config.ParseCamerasYAML() + if err != nil { + logger.Log.Fatal("func ParseCamerasYAML error:", zap.Error(err)) + log.Println("func ParseCamerasYAML error: ") + } + + for _, link := range cams { + logger.Log.Info("process camera:", zap.String("link", link)) + log.Println("process camera: ", link) + + go func() { + err = startRTSP(config.Local, 60, link) + if err != nil { + logger.Log.Error("procRTSP function error for camera:", zap.String("link", link), zap.Error(err)) + log.Println("procRTSP function error for camera: ", link) + } + }() + } +} + +// startRTSP starts RTSP protocol. +func startRTSP(dir string, period int, link string) error { + err := rtsp(dir, period, link) if err != nil { // Change domain if a camera was flipped to another domain. - err = changeDomain(dir, period, link, err, Log) + err = changeDomain(dir, period, link, err) if err != nil { return fmt.Errorf("change domain error: %w", err) } @@ -35,8 +60,15 @@ func StartWriter(dir string, period int, link string, Log *zap.Logger) error { return nil } -// RTSP processes RTSP protocol. -func RTSP(dir string, period int, link string) error { +// rtsp processes RTSP protocol. +func rtsp(dir string, period int, link string) error { + // Create data folder in the directory. + dirData := log2.DirCreator(dir, "data") + + // Create logger. + cutURI := lastPartURI(link) + cam := log2.CamLogging(fmt.Sprintf("%s/%s/Cam_%s.log", dirData, cutURI, time.Now().Format("15-04-05_02-01-2006"))) + // Connect to the server. c := gortsplib.Client{ UserAgent: "PSA", @@ -44,31 +76,33 @@ func RTSP(dir string, period int, link string) error { u, err := base.ParseURL(link) if err != nil { - return fmt.Errorf("parse URL error: %w", err) + cam.Error("parse URL error:", zap.Error(err)) + return err } err = c.Start(u.Scheme, u.Host) if err != nil { - return fmt.Errorf("connect to the server error: %w", err) + cam.Error("connect to the server error:", zap.Error(err)) + return err } defer c.Close() desc, res, err := c.Describe(u) if err != nil || desc == nil { - log.Printf("medias not found for camera [%s]: %v", link, err) - return nil + cam.Error("medias not found for camera:", zap.Error(err)) + return err } // Create file name structure and directory for files. resolution := findResolution(res.Body) resolutions := []string{resolution} - cutURI := cutURI(link) - fn := storage.CreateFileName(dir, resolutions, cutURI, period) + fn := storage.CreateFileName(dirData, resolutions, cutURI, period) err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755) if err != nil { - return fmt.Errorf("mkdirall error: %w", err) + cam.Error("mkdirall error: %w", zap.Error(err)) + return err } // Find formats. @@ -77,108 +111,121 @@ func RTSP(dir string, period int, link string) error { av1Format, av1Media, err := formats.FindAV1Format(desc) if av1Format != nil { - log.Println("[av1]: format found") videoFormat = "AV1" - } else { - log.Println(err) + cam.Info("AV1 format found") + } + if err != nil { + cam.Info("func FindAV1Format:", zap.Error(err)) } g711Format, g711Media, err := formats.FindG711Format(desc) if g711Format != nil { - log.Println("[g711]: format found") audioFormat = "G711" - } else { - log.Println(err) + cam.Info("G711 format found") + } + if err != nil { + cam.Info("func FindG711Format:", zap.Error(err)) } h264Format, h264Media, err := formats.FindH264Format(desc) if h264Format != nil { - log.Println("[h264]: format found") videoFormat = "H264" - } else { - log.Println(err) + cam.Info("H264 format found") + } + if err != nil { + cam.Info("func FindH264Format:", zap.Error(err)) } aacFormat, aacMedia, err := formats.FindAACFormat(desc) if aacFormat != nil { - log.Println("[aac]: format found") audioFormat = "AAC" - } else { - log.Println(err) + cam.Info("AAC format found") + } + if err != nil { + cam.Info("func FindAACFormat:", zap.Error(err)) } h265Format, h265Media, err := formats.FindH265Format(desc) if h265Format != nil { - log.Println("[h265]: format found") videoFormat = "H265" - } else { - log.Println(err) + cam.Info("H265 format found") + } + if err != nil { + cam.Info("func FindH265Format:", zap.Error(err)) } lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc) if lpcmFormat != nil { - log.Println("[lpcm]: format found") audioFormat = "LPCM" - } else { - log.Println(err) + cam.Info("LPCM format found") + } + if err != nil { + cam.Info("func FindLPCMFormat:", zap.Error(err)) } mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc) if mjpegFormat != nil { - log.Println("[mjpeg]: format found") videoFormat = "MJPEG" - } else { - log.Println(err) + cam.Info("MJPEG format found") + } + if err != nil { + cam.Info("func FindLPCMFormat:", zap.Error(err)) } opusFormat, opusMedia, err := formats.FindOPUSFormat(desc) if opusFormat != nil { - log.Println("[opus]: format found") audioFormat = "OPUS" - } else { - log.Println(err) + cam.Info("OPUS format found") + } + if err != nil { + cam.Info("func FindOPUSFormat:", zap.Error(err)) } vp8Format, vp8Media, err := formats.FindVP8Format(desc) if vp8Format != nil { - log.Println("[vp8]: format found") videoFormat = "VP8" - } else { - log.Println(err) + cam.Info("VP8 format found") + } + if err != nil { + cam.Info("func FindVP8Format:", zap.Error(err)) } vp9Format, vp9Media, err := formats.FindVP9Format(desc) if vp9Format != nil { - log.Println("[vp9]: format found") videoFormat = "VP9" - } else { - log.Println(err) + cam.Info("VP9 format found") + } + if err != nil { + cam.Info("func FindVP9Format:", zap.Error(err)) } // Start program according to gotten formats. switch { case videoFormat == "AV1" && audioFormat == "": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. av1RTPDec, err := av1Format.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } av1Dec := &formats.AV1Decoder{} err = av1Dec.Initialize() if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + cam.Error("init decoder error:", zap.Error(err)) + return err } defer av1Dec.Close() // Setup media. _, err = c.Setup(desc.BaseURL, av1Media, 0, 0) if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } firstRandomReceived := false @@ -186,19 +233,19 @@ func RTSP(dir string, period int, link string) error { // Process input rtp packets. c.OnPacketRTP(av1Media, av1Format, func(pkt *rtp.Packet) { // Process AV1 flow and return PTS and IMG. - pts, img, err := formats.ProcessAV1(&c, av1Media, av1RTPDec, pkt, av1Dec, firstRandomReceived, videoFormat) + pts, _, err := formats.ProcessAV1(&c, av1Media, av1RTPDec, pkt, av1Dec, firstRandomReceived) if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } - log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -218,37 +265,40 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "" && audioFormat == "G711": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. g711RTPDec, err := g711Format.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } // Setup media. _, err = c.Setup(desc.BaseURL, g711Media, 0, 0) if err != nil { - return fmt.Errorf("[%v]: setup media error: %w\n", audioFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } // Process input rtp packets. c.OnPacketRTP(g711Media, g711Format, func(pkt *rtp.Packet) { // Process G711 flow and return PTS and AU. - pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, audioFormat) + pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt) if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } // Decode samples (these are 16-bit, big endian LPCM samples). @@ -258,13 +308,14 @@ func RTSP(dir string, period int, link string) error { g711.DecodeAlaw(au) } - log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(au)) + cam.Info("decoded audio samples:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -284,27 +335,29 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "H264" && audioFormat == "AAC": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v-%v]: start recording", videoFormat, audioFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoders. //h264RTPDec, err := h264Format.CreateDecoder() //if err != nil { - // log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + // cam.Error("create decoder error:", zap.Error(err)) //} // //aacRTPDec, err := aacFormat.CreateDecoder() //if err != nil { - // log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + // cam.Error("create decoder error:", zap.Error(err)) //} //// Setup MPEG-TS muxer. @@ -322,7 +375,8 @@ func RTSP(dir string, period int, link string) error { // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) if err != nil { - return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } // Process input rtp packets. @@ -332,8 +386,7 @@ func RTSP(dir string, period int, link string) error { //// Process H264 flow and return PTS and AU. //pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat) //if err != nil { - // log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) - // return + // cam.Warn("process packet error:", zap.Error(err)) //} // //// Encode the access unit into MPEG-TS. @@ -346,8 +399,7 @@ func RTSP(dir string, period int, link string) error { //// Process AAC flow and return PTS and AUS. //pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) //if err != nil { - // log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) - // return + // cam.Warn("process packet error:", zap.Error(err)) //} // //// Encode access units into MPEG-TS. @@ -362,7 +414,8 @@ func RTSP(dir string, period int, link string) error { // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -382,27 +435,30 @@ func RTSP(dir string, period int, link string) error { // return //} // - //log.Printf("[%v-%v]: new file for recording created: %v", - // videoFormat, audioFormat, currentMpegtsMuxer.FileName) + // cam.Info("new file for recording created") + // log.Println("new file for recording created") } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "H264" && audioFormat == "G711" || videoFormat == "H264" && audioFormat == "": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v-%v]: start recording", videoFormat, audioFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoders. h264RTPDec, err := h264Format.CreateDecoder() if err != nil { - log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } g711RTPDec, err := g711Format.CreateDecoder() if err != nil { - log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } //// Setup MPEG-TS muxer. @@ -422,12 +478,14 @@ func RTSP(dir string, period int, link string) error { // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) if err != nil { - return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } file, err := os.Create(fn.SetNumNTime("insit")) if err != nil { - fmt.Println("creating file error:", err) + cam.Error("creating file error:", zap.Error(err)) + return err } defer file.Close() @@ -439,16 +497,19 @@ func RTSP(dir string, period int, link string) error { // Write StreamID. if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil { - fmt.Println("write StreamID length error:", err) + cam.Error("write StreamID length error:", zap.Error(err)) + return err } if _, err := file.Write([]byte(cutURI)); err != nil { - fmt.Println("write StreamID error:", err) + cam.Error("write StreamID error:", zap.Error(err)) + return err } // Write header of the file. err = storage.WriteHeader(file, seg) if err != nil { - return fmt.Errorf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err) + cam.Error("write header error:", zap.Error(err)) + return err } // Process input rtp packets. @@ -456,10 +517,9 @@ func RTSP(dir string, period int, link string) error { switch f := forma.(type) { case *format.H264: // Process H264 flow and return PTS and AU. - pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat) + pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt) if err != nil { - log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } if au != nil { @@ -470,7 +530,7 @@ func RTSP(dir string, period int, link string) error { // Write segment with interleaved packets. if err := storage.WriteInterleavedPacket(file, seg); err != nil { - fmt.Println("write segment error:", err) + cam.Error("write segment error:", zap.Error(err)) return } } @@ -483,10 +543,9 @@ func RTSP(dir string, period int, link string) error { case *format.G711: // Process G711 flow and returns PTS and AU. - pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, videoFormat) + pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt) if err != nil { - log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } if au != nil { @@ -500,7 +559,7 @@ func RTSP(dir string, period int, link string) error { // Write segment with interleaved packets. if err := storage.WriteInterleavedPacket(file, seg); err != nil { - fmt.Println("write segment error:", err) + cam.Error("write segment error:", zap.Error(err)) return } } @@ -522,7 +581,8 @@ func RTSP(dir string, period int, link string) error { // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -548,7 +608,9 @@ func RTSP(dir string, period int, link string) error { file, err = os.Create(fn.SetNumNTime("insit")) if err != nil { - fmt.Println("creating file error:", err) + cam.Error("creating file error:", zap.Error(err)) + logger.Log.Error("creating file error:", zap.Error(err)) + return } seg = storage.Segment{ @@ -559,40 +621,49 @@ func RTSP(dir string, period int, link string) error { // Write StreamID. if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil { - fmt.Println("write StreamID length error:", err) + cam.Error("write StreamID length error:", zap.Error(err)) + logger.Log.Error("write StreamID length error:", zap.Error(err)) + return } if _, err := file.Write([]byte(cutURI)); err != nil { - fmt.Println("write StreamID error:", err) + cam.Error("write StreamID error:", zap.Error(err)) + logger.Log.Error("write StreamID error:", zap.Error(err)) + return } // Write header of the file. err = storage.WriteHeader(file, seg) if err != nil { - log.Printf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err) + cam.Error("write header error:", zap.Error(err)) + logger.Log.Error("write header error:", zap.Error(err)) + return } - log.Printf("[%v-%v]: new file for recording created: %v", - videoFormat, audioFormat, seg.Date+".insit") + cam.Info("new file for recording created") + log.Println("new file for recording created") } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "H264-" && audioFormat == "": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. h264RTPDec, err := h264Format.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } // Setup H264 -> RGBA decoder. h264Dec := &formats.H264Decoder{} err = h264Dec.Initialize() if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + cam.Error("init decoder error:", zap.Error(err)) + return err } defer h264Dec.Close() @@ -607,7 +678,8 @@ func RTSP(dir string, period int, link string) error { // Setup media. _, err = c.Setup(desc.BaseURL, h264Media, 0, 0) if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } firstRandomAccess := false @@ -615,20 +687,20 @@ func RTSP(dir string, period int, link string) error { // Process input rtp packets. c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) { // Process H264 flow and return PTS and IMG. - pts, img, err := formats.ProcessH264RGBA( - &c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat) + pts, _, err := formats.ProcessH264RGBA( + &c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess) if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } - log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -648,29 +720,33 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "H265" && audioFormat == "": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. h265RTPDec, err := h265Format.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } // Setup H264 -> RGBA decoder. h265Dec := &formats.H265Decoder{} err = h265Dec.Initialize() if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + cam.Error("init decoder error:", zap.Error(err)) + return err } defer h265Dec.Close() @@ -688,7 +764,8 @@ func RTSP(dir string, period int, link string) error { // Setup media. _, err = c.Setup(desc.BaseURL, h265Media, 0, 0) if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } firstRandomAccess := false @@ -696,20 +773,19 @@ func RTSP(dir string, period int, link string) error { // Process input rtp packets. c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) { // Process H265 flow and return PTS and IMG. - pts, img, err := formats.ProcessH265RGBA( - &c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat) + pts, _, err := formats.ProcessH265RGBA(&c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess) if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } - log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -729,46 +805,50 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "" && audioFormat == "LPCM": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. lpcmRTPDec, err := lpcmFormat.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } // Setup media. _, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0) if err != nil { - log.Printf("[%v]: setup media error: %v\n", audioFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } // Process input rtp packets. c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) { // Process LPCM flow and return PTS and SAMPLES. - pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat) + pts, _, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt) if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } - log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples)) + cam.Info("decoded audio samples:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -788,46 +868,50 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "MJPEG" && audioFormat == "": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. mjpegRTPDec, err := mjpegFormat.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } // Setup media. _, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0) if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } // Process input rtp packets. c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) { // Process MJPEG flow and return PTS and IMG. - pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat) + pts, _, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt) if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } - log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -847,48 +931,52 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "" && audioFormat == "AAC": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. aacRTPDec, err := aacFormat.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } // Setup media. _, err = c.Setup(desc.BaseURL, aacMedia, 0, 0) if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } // Process input rtp packets. c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) { // Process AAC flow and return PTS and AUS. - pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) + pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt) if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } - for _, au := range aus { - log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au)) + for _, _ = range aus { + cam.Info("received access unit:", zap.String("PTS", strconv.FormatInt(pts, 10))) } }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -908,46 +996,50 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "" && audioFormat == "OPUS": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. opusRTPDec, err := opusFormat.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } // Setup media. _, err = c.Setup(desc.BaseURL, opusMedia, 0, 0) if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } // Process input rtp packets. c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) { // Process OPUS flow and return PTS and OP. - pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat) + pts, _, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt) if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } - log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op)) + cam.Info("received OPUS packet:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -967,54 +1059,59 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "VP8" && audioFormat == "": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. vp8RTPDec, err := vp8Format.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } // Setup VP8 -> RGBA decoder. vp8Dec := &formats.VPDecoder{} err = vp8Dec.Initialize() if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + cam.Error("init decoder error:", zap.Error(err)) + return err } defer vp8Dec.Close() // Setup media. _, err = c.Setup(desc.BaseURL, vp8Media, 0, 0) if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } // Process input rtp packets. c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) { // Process VP8 flow and return PTS and IMG. - pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat) + pts, _, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt) if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } - log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -1034,54 +1131,59 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") case videoFormat == "VP9" && audioFormat == "": // Wait for the next period. - waitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + waitPeriod(period, cam) + cam.Info("start recording") // Create decoder. vp9RTPDec, err := vp9Format.CreateDecoder() if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + cam.Error("create decoder error:", zap.Error(err)) + return err } // Setup VP9 -> RGBA decoder. vp9Dec := &formats.VPDecoder{} err = vp9Dec.Initialize() if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + cam.Error("init decoder error:", zap.Error(err)) + return err } defer vp9Dec.Close() // Setup media. _, err = c.Setup(desc.BaseURL, vp9Media, 0, 0) if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + cam.Error("setup media error:", zap.Error(err)) + return err } // Process input rtp packets. c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) { // Process VP9 flow and return PTS and IMG. - pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat) + pts, _, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt) if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + cam.Warn("process packet error:", zap.Error(err)) } - log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10))) }) // Start playing. _, err = c.Play(nil) if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + cam.Error("sending PLAY request error:", zap.Error(err)) + return err } // Create ticker for rotation files. @@ -1101,34 +1203,36 @@ func RTSP(dir string, period int, link string) error { panic(err) } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + cam.Info("new file for recording created") + log.Println("new file for recording created") */ } }() - panic(c.Wait()) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + return fmt.Errorf("c.Wait() error") } return nil } // changeDomain changes domain if a camera was flipped to another domain. -func changeDomain(dir string, period int, link string, err error, Log *zap.Logger) error { +func changeDomain(dir string, period int, link string, err error) error { err2 := errors.New("404 (Not found)") if errors.As(err, &err2) { if strings.Contains(link, "video-1") { - err = RTSP(dir, period, strings.Replace(link, "video-1", "video-2", 1), Log) + err = rtsp(dir, period, strings.Replace(link, "video-1", "video-2", 1)) if err != nil { return err } } else { - err = RTSP(dir, period, strings.Replace(link, "video-2", "video-1", 1), Log) + err = rtsp(dir, period, strings.Replace(link, "video-2", "video-1", 1)) if err != nil { return err } } } else { - panic(err) + return err2 } return nil diff --git a/writer/internal/log/logger.go b/writer/internal/log/logger.go index e5c6eed..2798951 100644 --- a/writer/internal/log/logger.go +++ b/writer/internal/log/logger.go @@ -2,4 +2,6 @@ package log import "go.uber.org/zap" -var Log *zap.Logger +var ( + Log *zap.Logger +) diff --git a/writer/pkg/storage/file_manager.go b/writer/pkg/storage/file_manager.go index ce0b5c1..9546a91 100644 --- a/writer/pkg/storage/file_manager.go +++ b/writer/pkg/storage/file_manager.go @@ -6,9 +6,9 @@ import ( ) // CreateFileName creates FileName structure. -func CreateFileName(dir string, resolutions []string, cuttedURI string, period int) *common.FileName { +func CreateFileName(dirData string, resolutions []string, cutURI string, period int) *common.FileName { fn := common.FileName{ - Path: dir + "/data/" + cuttedURI + "/" + resolutions[0], + Path: dirData + "/" + cutURI + "/" + resolutions[0], TimeNow: time.Now().Format("15-04-05_02-01-2006"), Name: "videoFragment", Number: -1,