From ad5f7c7520e79dd7d2623f8ebe10c550da9584e5 Mon Sep 17 00:00:00 2001 From: Sergey Petrov Date: Thu, 27 Mar 2025 12:05:29 +0500 Subject: [PATCH] Added logger. --- reader/cmd/main.go | 21 ++- reader/go.mod | 5 +- reader/go.sum | 12 +- reader/internal/config/config.go | 6 + reader/internal/unpacker/proc.go | 293 +++++++++++++++++++------------ 5 files changed, 210 insertions(+), 127 deletions(-) create mode 100644 reader/internal/config/config.go diff --git a/reader/cmd/main.go b/reader/cmd/main.go index 6d3e8c4..405793f 100644 --- a/reader/cmd/main.go +++ b/reader/cmd/main.go @@ -1,9 +1,14 @@ package main import ( - "git.insit.tech/sas/rtsp_proxy/core/log" - "git.insit.tech/sas/rtsp_proxy/proto/common" + "fmt" + "go.uber.org/zap" + "reader/internal/config" "reader/internal/unpacker" + "time" + + log2 "git.insit.tech/sas/rtsp_proxy/core/log" + logger "reader/internal/log" ) func main() { @@ -17,10 +22,12 @@ func main() { // //log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) - Log := log.MainLogging("/home/psa/GoRepository/" + "/data/" + "camera54-centr-kirova-kalinina") + config.LogsDirectory = log2.DirCreator(config.Local, "logs") + logger.Log = log2.MainLogging( + fmt.Sprintf("%s/reader-main_%s.log", config.LogsDirectory, time.Now().Format("15-04-05_02-01-2006"))) - unpacker.CreateFlow("/home/psa/GoRepository", - "camera54-centr-kirova-kalinina", - &common.FileName{Duration: 60}, - []string{"1920-1080"}, Log) + err := unpacker.CreateVideo(config.Local) + if err != nil { + logger.Log.Error("Failed to create flow", zap.Error(err)) + } } diff --git a/reader/go.mod b/reader/go.mod index 37e28dc..5d5968c 100644 --- a/reader/go.mod +++ b/reader/go.mod @@ -3,8 +3,8 @@ module reader go 1.24.1 require ( - git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250325111946-a4678342ee71 - git.insit.tech/sas/rtsp_proxy v0.0.0-20250326040356-446f7f0578d9 + git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327060836-29e7a51f8c5e + git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c github.com/bluenviron/gortsplib/v4 v4.12.3 github.com/bluenviron/mediacommon v1.14.0 github.com/bluenviron/mediacommon/v2 v2.0.0 @@ -15,6 +15,7 @@ require ( require ( github.com/asticode/go-astikit v0.52.0 // indirect github.com/asticode/go-astits v1.13.0 // indirect + github.com/golang/snappy v1.0.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grafov/m3u8 v0.12.1 // indirect github.com/pion/randutil v0.1.0 // indirect diff --git a/reader/go.sum b/reader/go.sum index 7469968..2e7a467 100644 --- a/reader/go.sum +++ b/reader/go.sum @@ -1,7 +1,9 @@ -git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250325111946-a4678342ee71 h1:ObdTAquutHq2Abh9J02S8wBkeHuOYS+6fd8chf2yQC4= -git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250325111946-a4678342ee71/go.mod h1:TjxUTANLCPQpjNFVlGZWlGn0ICGV9oQP7aed0xexFkw= -git.insit.tech/sas/rtsp_proxy v0.0.0-20250326040356-446f7f0578d9 h1:8aOLn23rkkXR/mpkOtbaNrCWsiBs0Pl+/ds3CzUuHZo= -git.insit.tech/sas/rtsp_proxy v0.0.0-20250326040356-446f7f0578d9/go.mod h1:/AHWd1Otr+ikOLWzpXtoozzifEx9ZKou+R6DgwaEzr0= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327043613-6b15e0f4ae74 h1:g2g1TM0aHNwo9no0WWucqxRIaUzku1lfFGOdKxbt4Uk= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327043613-6b15e0f4ae74/go.mod h1:jXcr5WE8GwhuGdtVjHRyWIemXJJ6GL8QK1M7r4F3cz0= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327060836-29e7a51f8c5e h1:xcc9QLroFdHuabwtuYHrYgQxqa6hcsu5xH860dD8RFE= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327060836-29e7a51f8c5e/go.mod h1:jXcr5WE8GwhuGdtVjHRyWIemXJJ6GL8QK1M7r4F3cz0= +git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c h1:A/D1INKJI/jkgy4TRamf7HTvQVqGy7qPBdTlZmYWIm0= +git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c/go.mod h1:/AHWd1Otr+ikOLWzpXtoozzifEx9ZKou+R6DgwaEzr0= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astikit v0.52.0 h1:kTl2XjgiVQhUl1H7kim7NhmTtCMwVBbPrXKqhQhbk8Y= github.com/asticode/go-astikit v0.52.0/go.mod h1:fV43j20UZYfXzP9oBn33udkvCvDvCDhzjVqoLFuuYZE= @@ -18,6 +20,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 h1:yfrARW/aVlqKORCdKrYdU0PZUKPqQvYEUQBKfVlNa9Q= github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21/go.mod h1:HZqGD/LXHB1VCGUGNzuyxSsD12f3KjbJbvImAmoK/mM= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s= diff --git a/reader/internal/config/config.go b/reader/internal/config/config.go new file mode 100644 index 0000000..ab6f324 --- /dev/null +++ b/reader/internal/config/config.go @@ -0,0 +1,6 @@ +package config + +var ( + Local = "storage" + LogsDirectory string +) diff --git a/reader/internal/unpacker/proc.go b/reader/internal/unpacker/proc.go index b8ee5ff..f2062cd 100644 --- a/reader/internal/unpacker/proc.go +++ b/reader/internal/unpacker/proc.go @@ -3,156 +3,221 @@ package unpacker import ( "bytes" "encoding/binary" + "errors" "fmt" - "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage" - "git.insit.tech/sas/rtsp_proxy/core/gen" - "git.insit.tech/sas/rtsp_proxy/proto/common" - "github.com/bluenviron/gortsplib/v4/pkg/format" - "go.uber.org/zap" "io" "log" "os" - "reader/internal/processor" "strconv" "strings" "time" + + "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage" + "git.insit.tech/sas/rtsp_proxy/core/gen" + log2 "git.insit.tech/sas/rtsp_proxy/core/log" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "go.uber.org/zap" + logger "reader/internal/log" + "reader/internal/processor" ) -// CreateFlow generate TS files and M3U8 playlists. -func CreateFlow(dir string, cutURI string, fn *common.FileName, resolutions []string, Log *zap.Logger) { - fmt.Println("Reader started") +// dir - // Establish starting period. - per := 60 - period := time.Duration(per) +// CreateVideo generate TS files and M3U8 playlists. +func CreateVideo(dir string) error { + // Check if the data folder in the directory. + homeDir, err := os.UserHomeDir() + if err != nil { + return err + } + dirData := fmt.Sprintf("%s/%s/data", homeDir, dir) - // Create M3U8 playlist. - go gen.MediaPlaylistGenerator(dir+"/data", cutURI, fn.Duration, resolutions, Log) + exist := storage.Exists(dirData) + if !exist { + return errors.New("directory does not exist") + } - for { - time.Sleep(period * time.Second) - for _, resolution := range resolutions { - filenames := gen.StringDirEntryList(dir+"/data/"+cutURI+"/"+resolution, "insit", Log) + // Read directory. + files, err := storage.ReadDir(dirData) + if err != nil { + return err + } - for i := len(filenames) - 2; i < len(filenames)-1; i++ { + for _, file := range files { + // Create logger. + cam := log2.CamLogging( + fmt.Sprintf("%s/%s/reader-cam_%s.log", dirData, file.Name(), time.Now().Format("15-04-05_02-01-2006"))) - segment := storage.Segment{} + res, err := storage.ReadDir(fmt.Sprintf("%s/%s", dirData, file.Name())) + if err != nil { + cam.Error("error reading directory", zap.String("dir", dir), zap.Error(err)) + return err + } + resolutions := make([]string, 0) + for _, r := range res { + if r.IsDir() { + resolutions = append(resolutions, r.Name()) + } + } - // Open file for reading. - f, err := os.Open(dir + "/data/" + cutURI + "/" + resolution + "/" + filenames[i]) - if err != nil { - fmt.Println("opening file error: ", err) - return - } - defer f.Close() + logger.Log.Info("start process camera:", zap.String("cam_name", file.Name())) + log.Println("start process camera: ", file.Name()) - // Read StreamID. - var streamIDLen int32 - if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil { - fmt.Println("reading StreamID length error: ", err) - return - } - streamIDBytes := make([]byte, streamIDLen) - if _, err := io.ReadFull(f, streamIDBytes); err != nil { - fmt.Println("reading StreamID error: ", err) - return - } - streamID := string(streamIDBytes) - fmt.Println("Stream ID:", streamID) + // Establish starting period. + per := 60 + period := time.Duration(per) - // Read header of the file. - var segLen int32 - if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil { - fmt.Println("reading header length error: ", err) - return - } - segData := make([]byte, segLen) - if _, err := io.ReadFull(f, segData); err != nil { - fmt.Println("reading header error: ", err) - return - } - headerReader := bytes.NewReader(segData) - headerSeg, err := readHeaderSegment(headerReader) - if err != nil { - fmt.Println("func readHeaderSegment error: ", err) - return + // Create M3U8 playlist. + go gen.MediaPlaylistGenerator(dirData, file.Name(), float64(period), resolutions, cam) + + for { + for _, resolution := range resolutions { + filenames := gen.StringDirEntryList( + fmt.Sprintf("%s/%s/%s", dirData, file.Name(), resolution), "insit", logger.Log) + if len(filenames) == 0 { + break } - // Parse duration of a segment. - per, err = strconv.Atoi(headerSeg.Duration) - if err != nil { - fmt.Println("parsing duration error: ", err) - } + for i := len(filenames) - 2; i < len(filenames)-1; i++ { + segment := storage.Segment{} - // Setup MPEG-TS muxer. - var h264Format format.H264 - var aacFormat format.MPEG4Audio + // Open file for reading. + f, err := os.Open(fmt.Sprintf("%s/%s/%s/%s", dirData, file.Name(), resolution, filenames[i])) + if err != nil { + cam.Error( + "opening file error for file:", zap.String("filename", filenames[i]), zap.Error(err)) + return err + } + defer f.Close() - filename, _ := strings.CutSuffix(filenames[i], ".insit") + // Read StreamID. + var streamIDLen int32 + if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil { + cam.Error( + "reading StreamID length error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err + } + streamIDBytes := make([]byte, streamIDLen) + if _, err := io.ReadFull(f, streamIDBytes); err != nil { + cam.Error("reading StreamID error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err + } + streamID := string(streamIDBytes) + cam.Info( + "cam with Stream ID started to convert:", + zap.String("filename", filenames[i]), + zap.String("stream_id", streamID)) - currentMpegtsMuxer := processor.MpegtsMuxer{ - FileName: dir + "/data/" + cutURI + "/" + resolution + "/" + filename + ".ts", - H264Format: &h264Format, - Mpeg4AudioFormat: &aacFormat, - } - - err = currentMpegtsMuxer.Initialize() - if err != nil { - fmt.Printf("init muxer error: %w\n", err) - } - - // Read segments. - for { - // Read segments length. + // Read header of the file. var segLen int32 - err := binary.Read(f, binary.LittleEndian, &segLen) - if err != nil { - if err == io.EOF { - break - } - fmt.Println("read segments length error: ", err) - return + if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil { + cam.Error( + "reading header length error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err } - // Read segments data. - segData = make([]byte, segLen) + segData := make([]byte, segLen) if _, err := io.ReadFull(f, segData); err != nil { - fmt.Println("read segments error: ", err) - return + cam.Error("reading header error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err } - packetReader := bytes.NewReader(segData) - packets, err := readPacketSegment(packetReader) + headerReader := bytes.NewReader(segData) + headerSeg, err := readHeaderSegment(headerReader) if err != nil { - fmt.Println("func readPacketSegment error: ", err) - return + cam.Error( + "func readHeaderSegment error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err } - segment.Packets = append(segment.Packets, packets...) + // Parse duration of a segment. + per, err = strconv.Atoi(headerSeg.Duration) + if err != nil { + cam.Error("parsing duration error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err + } - for _, pkt := range packets { - switch pkt.Type { - case storage.PacketTypeH264: - // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs) - if err != nil { - log.Printf("write H264 packet error: %v\n", err) - } - case storage.PacketTypeLPCM: - // Convert G711 to AAC. - au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples) - if err != nil { - log.Printf("converting to AAC frame error: %v\n", err) - } + // Setup MPEG-TS muxer. + var h264Format format.H264 + var aacFormat format.MPEG4Audio - // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts) - if err != nil { - log.Printf("write G711 packet error: %v\n", err) + filename, _ := strings.CutSuffix(filenames[i], ".insit") + tsFilename := fmt.Sprintf("%s/%s/%s/%s.ts", dirData, file.Name(), resolution, filename) + + currentMpegtsMuxer := processor.MpegtsMuxer{ + FileName: tsFilename, + H264Format: &h264Format, + Mpeg4AudioFormat: &aacFormat, + } + + err = currentMpegtsMuxer.Initialize() + if err != nil { + cam.Error("init muxer error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err + } + + // Read segments. + for { + // Read segments length. + var segLen int32 + err := binary.Read(f, binary.LittleEndian, &segLen) + if err != nil { + if err == io.EOF { + break + } + cam.Error( + "read segments length error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err + } + // Read segments data. + segData = make([]byte, segLen) + if _, err := io.ReadFull(f, segData); err != nil { + cam.Error( + "read segments error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err + } + packetReader := bytes.NewReader(segData) + packets, err := readPacketSegment(packetReader) + if err != nil { + cam.Error( + "func readPacketSegment error:", zap.String("filename", filenames[i]), zap.Error(err)) + return err + } + + segment.Packets = append(segment.Packets, packets...) + + for _, pkt := range packets { + switch pkt.Type { + case storage.PacketTypeH264: + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs) + if err != nil { + cam.Warn( + "write H264 packet error:", zap.String("filename", filenames[i]), zap.Error(err)) + } + case storage.PacketTypeLPCM: + // Convert G711 to AAC. + au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples) + if err != nil { + cam.Warn( + "converting to AAC frame error:", zap.String("filename", filenames[i]), zap.Error(err)) + } + + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts) + if err != nil { + cam.Warn( + "write G711 packet error:", zap.String("filename", filenames[i]), zap.Error(err)) + } } } } + currentMpegtsMuxer.Close() + + if storage.Exists(tsFilename) { + time.Sleep(period * time.Second) + } } - currentMpegtsMuxer.Close() } } } + return errors.New("func CreateVideo downed") }