diff --git a/writer/internal/config/config.go b/writer/internal/config/config.go index b303d14..f295580 100644 --- a/writer/internal/config/config.go +++ b/writer/internal/config/config.go @@ -1,7 +1,7 @@ package config var ( - Local = "reader-writer" + Local = "writer" LogsDirectory string Cameras = "/home/psa/GoRepository/rtsp_reader-writer/writer/internal/config/cameras.yaml" ) diff --git a/writer/internal/ingest/rtsp/rtsp.go b/writer/internal/ingest/rtsp/rtsp.go index ccc0a49..0ee47ed 100644 --- a/writer/internal/ingest/rtsp/rtsp.go +++ b/writer/internal/ingest/rtsp/rtsp.go @@ -37,7 +37,7 @@ func StartWriter() { log.Println("process camera: ", link) go func() { - err = startRTSP(config.Local, 60, link) + err = rtsp(config.Local, 60, link, -1) if err != nil { logger.Log.Error("procRTSP function error for camera:", zap.String("link", link), zap.Error(err)) log.Println("procRTSP function error for camera: ", link) @@ -46,22 +46,8 @@ func StartWriter() { } } -// startRTSP starts RTSP protocol. -func startRTSP(dir string, period int, link string) error { - err := rtsp(dir, period, link) - if err != nil { - // Change domain if a camera was flipped to another domain. - err = changeDomain(dir, period, link, err) - if err != nil { - return fmt.Errorf("change domain error: %w", err) - } - - } - return nil -} - // rtsp processes RTSP protocol. -func rtsp(dir string, period int, link string) error { +func rtsp(dir string, period int, link string, number int) error { // Create data folder in the directory. dirData := log2.DirCreator(dir, "data") @@ -90,14 +76,16 @@ func rtsp(dir string, period int, link string) error { desc, res, err := c.Describe(u) if err != nil || desc == nil { cam.Error("medias not found for camera:", zap.Error(err)) - return err + log.Println("medias not found for camera: ", cutURI) + changeDomain(dir, period, link, number+1, cam, err) + return nil } // Create file name structure and directory for files. resolution := findResolution(res.Body) resolutions := []string{resolution} - fn := storage.CreateFileName(dirData, resolutions, cutURI, period) + fn := storage.CreateFileName(dirData, resolutions, cutURI, period, number) err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755) if err != nil { @@ -295,8 +283,10 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "" && audioFormat == "G711": // Wait for the next period. @@ -388,9 +378,10 @@ func rtsp(dir string, period int, link string) error { */ } }() - - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "H264" && audioFormat == "AAC": // Wait for the next period. @@ -512,8 +503,10 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "H264" && audioFormat == "G711" || videoFormat == "H264" && audioFormat == "": // Wait for the next period. @@ -739,8 +732,11 @@ func rtsp(dir string, period int, link string) error { log.Println("new file for recording created") } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "H264-" && audioFormat == "": // Wait for the next period. @@ -846,8 +842,10 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "H265" && audioFormat == "": // Wait for the next period. @@ -955,8 +953,10 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "" && audioFormat == "LPCM": // Wait for the next period. @@ -1042,8 +1042,10 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "MJPEG" && audioFormat == "": // Wait for the next period. @@ -1129,8 +1131,10 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "" && audioFormat == "AAC": // Wait for the next period. @@ -1218,8 +1222,10 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "" && audioFormat == "OPUS": // Wait for the next period. @@ -1305,8 +1311,10 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "VP8" && audioFormat == "": // Wait for the next period. @@ -1401,8 +1409,10 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } case videoFormat == "VP9" && audioFormat == "": // Wait for the next period. @@ -1497,31 +1507,35 @@ func rtsp(dir string, period int, link string) error { } }() - cam.Error("c.Wait() error:", zap.Error(c.Wait())) - return fmt.Errorf("c.Wait() error") + if err = c.Wait(); err != nil { + rtsp(dir, period, link, fn.Number+1) + cam.Error("c.Wait() error:", zap.Error(c.Wait())) + } } return nil } // changeDomain changes domain if a camera was flipped to another domain. -func changeDomain(dir string, period int, link string, err error) error { +func changeDomain(dir string, period int, link string, number int, cam *zap.Logger, err error) { err2 := errors.New("404 (Not found)") if errors.As(err, &err2) { if strings.Contains(link, "video-1") { - err = rtsp(dir, period, strings.Replace(link, "video-1", "video-2", 1)) + err = rtsp(dir, period, strings.Replace(link, "video-1", "video-2", 1), number) if err != nil { - return err + cam.Error("changeDomain rtsp error:", zap.Error(err)) + logger.Log.Error("changeDomain rtsp error:", zap.Error(err)) + log.Println("changeDomain rtsp error for camera: ", link) + return } } else { - err = rtsp(dir, period, strings.Replace(link, "video-2", "video-1", 1)) + err = rtsp(dir, period, strings.Replace(link, "video-2", "video-1", 1), number) if err != nil { - return err + cam.Error("changeDomain rtsp error:", zap.Error(err)) + logger.Log.Error("changeDomain rtsp error:", zap.Error(err)) + log.Println("changeDomain rtsp error for camera: ", link) + return } } - } else { - return err2 } - - return nil } diff --git a/writer/pkg/storage/file_manager.go b/writer/pkg/storage/file_manager.go index 9546a91..4d80664 100644 --- a/writer/pkg/storage/file_manager.go +++ b/writer/pkg/storage/file_manager.go @@ -6,12 +6,12 @@ import ( ) // CreateFileName creates FileName structure. -func CreateFileName(dirData string, resolutions []string, cutURI string, period int) *common.FileName { +func CreateFileName(dirData string, resolutions []string, cutURI string, period int, number int) *common.FileName { fn := common.FileName{ Path: dirData + "/" + cutURI + "/" + resolutions[0], TimeNow: time.Now().Format("15-04-05_02-01-2006"), Name: "videoFragment", - Number: -1, + Number: number, Duration: float64(period), }