diff --git a/reader/cmd/main.go b/reader/cmd/main.go index 405793f..3dde7d6 100644 --- a/reader/cmd/main.go +++ b/reader/cmd/main.go @@ -1,14 +1,10 @@ package main import ( - "fmt" "go.uber.org/zap" "reader/internal/config" - "reader/internal/unpacker" - "time" - - log2 "git.insit.tech/sas/rtsp_proxy/core/log" logger "reader/internal/log" + "reader/internal/unpacker" ) func main() { @@ -22,9 +18,7 @@ func main() { // //log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) - config.LogsDirectory = log2.DirCreator(config.Local, "logs") - logger.Log = log2.MainLogging( - fmt.Sprintf("%s/reader-main_%s.log", config.LogsDirectory, time.Now().Format("15-04-05_02-01-2006"))) + logger.StartMainLogger(config.Local, "reader") err := unpacker.CreateVideo(config.Local) if err != nil { diff --git a/reader/go.mod b/reader/go.mod index 5d5968c..43953b9 100644 --- a/reader/go.mod +++ b/reader/go.mod @@ -3,7 +3,7 @@ module reader go 1.24.1 require ( - git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327060836-29e7a51f8c5e + git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250328095722-6d9cef974261 git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c github.com/bluenviron/gortsplib/v4 v4.12.3 github.com/bluenviron/mediacommon v1.14.0 diff --git a/reader/go.sum b/reader/go.sum index 2e7a467..efc9d51 100644 --- a/reader/go.sum +++ b/reader/go.sum @@ -1,7 +1,5 @@ -git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327043613-6b15e0f4ae74 h1:g2g1TM0aHNwo9no0WWucqxRIaUzku1lfFGOdKxbt4Uk= -git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327043613-6b15e0f4ae74/go.mod h1:jXcr5WE8GwhuGdtVjHRyWIemXJJ6GL8QK1M7r4F3cz0= -git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327060836-29e7a51f8c5e h1:xcc9QLroFdHuabwtuYHrYgQxqa6hcsu5xH860dD8RFE= -git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327060836-29e7a51f8c5e/go.mod h1:jXcr5WE8GwhuGdtVjHRyWIemXJJ6GL8QK1M7r4F3cz0= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250328095722-6d9cef974261 h1:oYqTZakNnU7efuuPAbHQAXY0WXkw9eRvdQxnsALz6b8= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250328095722-6d9cef974261/go.mod h1:vFg2FZ/v01qLRLionpgocR5MaznCijzUiCSrPou45Ts= git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c h1:A/D1INKJI/jkgy4TRamf7HTvQVqGy7qPBdTlZmYWIm0= git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c/go.mod h1:/AHWd1Otr+ikOLWzpXtoozzifEx9ZKou+R6DgwaEzr0= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= diff --git a/reader/internal/log/logger.go b/reader/internal/log/logger.go index 2798951..5160b06 100644 --- a/reader/internal/log/logger.go +++ b/reader/internal/log/logger.go @@ -1,7 +1,24 @@ package log -import "go.uber.org/zap" +import ( + "fmt" + "time" + + log2 "git.insit.tech/sas/rtsp_proxy/core/log" + "go.uber.org/zap" + "reader/internal/config" +) var ( Log *zap.Logger ) + +// StartMainLogger establishes main logger. +func StartMainLogger(local, program string) { + config.LogsDirectory = log2.DirCreator(local, "logs") + Log = log2.MainLogging( + fmt.Sprintf("%s/%s-main_%s.log", + config.LogsDirectory, + program, + time.Now().Format("15-04-05_02-01-2006"))) +} diff --git a/reader/internal/processor/h264-aac_muxer.go b/reader/internal/processor/h264-aac_muxer.go index 342031e..3e4bd59 100644 --- a/reader/internal/processor/h264-aac_muxer.go +++ b/reader/internal/processor/h264-aac_muxer.go @@ -2,6 +2,7 @@ package processor import ( "bufio" + "fmt" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "log" "os" @@ -12,6 +13,12 @@ import ( "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) +var ( + i = 0 + f = 0 + j = 0 +) + func multiplyAndDivide(v, m, d int64) int64 { secs := v / d dec := v % d @@ -74,7 +81,9 @@ func (e *MpegtsMuxer) WriteH264(pts int64, au [][]byte) error { idrPresent := false for _, nalu := range au { + i++ typ := h264.NALUType(nalu[0] & 0x1F) + f++ switch typ { case h264.NALUTypeSPS: e.H264Format.SPS = nalu @@ -89,6 +98,7 @@ func (e *MpegtsMuxer) WriteH264(pts int64, au [][]byte) error { case h264.NALUTypeIDR: idrPresent = true + j++ case h264.NALUTypeNonIDR: nonIDRPresent = true @@ -97,6 +107,8 @@ func (e *MpegtsMuxer) WriteH264(pts int64, au [][]byte) error { filteredAU = append(filteredAU, nalu) } + fmt.Println(i, f, j) + au = filteredAU if au == nil || (!nonIDRPresent && !idrPresent) { diff --git a/reader/internal/unpacker/proc.go b/reader/internal/unpacker/proc.go index f2062cd..2f919b4 100644 --- a/reader/internal/unpacker/proc.go +++ b/reader/internal/unpacker/proc.go @@ -21,8 +21,6 @@ import ( "reader/internal/processor" ) -// dir - // CreateVideo generate TS files and M3U8 playlists. func CreateVideo(dir string) error { // Check if the data folder in the directory. @@ -70,154 +68,156 @@ func CreateVideo(dir string) error { // Create M3U8 playlist. go gen.MediaPlaylistGenerator(dirData, file.Name(), float64(period), resolutions, cam) - for { - for _, resolution := range resolutions { - filenames := gen.StringDirEntryList( - fmt.Sprintf("%s/%s/%s", dirData, file.Name(), resolution), "insit", logger.Log) - if len(filenames) == 0 { - break - } - - for i := len(filenames) - 2; i < len(filenames)-1; i++ { - segment := storage.Segment{} - - // Open file for reading. - f, err := os.Open(fmt.Sprintf("%s/%s/%s/%s", dirData, file.Name(), resolution, filenames[i])) - if err != nil { - cam.Error( - "opening file error for file:", zap.String("filename", filenames[i]), zap.Error(err)) - return err - } - defer f.Close() - - // Read StreamID. - var streamIDLen int32 - if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil { - cam.Error( - "reading StreamID length error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err - } - streamIDBytes := make([]byte, streamIDLen) - if _, err := io.ReadFull(f, streamIDBytes); err != nil { - cam.Error("reading StreamID error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err - } - streamID := string(streamIDBytes) - cam.Info( - "cam with Stream ID started to convert:", - zap.String("filename", filenames[i]), - zap.String("stream_id", streamID)) - - // Read header of the file. - var segLen int32 - if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil { - cam.Error( - "reading header length error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err - } - segData := make([]byte, segLen) - if _, err := io.ReadFull(f, segData); err != nil { - cam.Error("reading header error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err - } - headerReader := bytes.NewReader(segData) - headerSeg, err := readHeaderSegment(headerReader) - if err != nil { - cam.Error( - "func readHeaderSegment error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err + go func() { + for { + for _, resolution := range resolutions { + filenames := gen.StringDirEntryList( + fmt.Sprintf("%s/%s/%s", dirData, file.Name(), resolution), "insit", logger.Log) + if len(filenames) == 0 { + break } - // Parse duration of a segment. - per, err = strconv.Atoi(headerSeg.Duration) - if err != nil { - cam.Error("parsing duration error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err - } + for i := len(filenames) - 2; i < len(filenames)-1; i++ { + segment := storage.Segment{} - // Setup MPEG-TS muxer. - var h264Format format.H264 - var aacFormat format.MPEG4Audio + // Open file for reading. + f, err := os.Open(fmt.Sprintf("%s/%s/%s/%s", dirData, file.Name(), resolution, filenames[i])) + if err != nil { + cam.Error( + "opening file error for file:", zap.String("filename", filenames[i]), zap.Error(err)) + return + } + defer f.Close() - filename, _ := strings.CutSuffix(filenames[i], ".insit") - tsFilename := fmt.Sprintf("%s/%s/%s/%s.ts", dirData, file.Name(), resolution, filename) + // Read StreamID. + var streamIDLen int32 + if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil { + cam.Error( + "reading StreamID length error:", zap.String("filename", filenames[i]), zap.Error(err)) + return + } + streamIDBytes := make([]byte, streamIDLen) + if _, err := io.ReadFull(f, streamIDBytes); err != nil { + cam.Error("reading StreamID error:", zap.String("filename", filenames[i]), zap.Error(err)) + return + } + streamID := string(streamIDBytes) + cam.Info( + "cam with Stream ID started to convert:", + zap.String("filename", filenames[i]), + zap.String("stream_id", streamID)) - currentMpegtsMuxer := processor.MpegtsMuxer{ - FileName: tsFilename, - H264Format: &h264Format, - Mpeg4AudioFormat: &aacFormat, - } - - err = currentMpegtsMuxer.Initialize() - if err != nil { - cam.Error("init muxer error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err - } - - // Read segments. - for { - // Read segments length. + // Read header of the file. var segLen int32 - err := binary.Read(f, binary.LittleEndian, &segLen) - if err != nil { - if err == io.EOF { - break - } + if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil { cam.Error( - "read segments length error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err + "reading header length error:", zap.String("filename", filenames[i]), zap.Error(err)) + return } - // Read segments data. - segData = make([]byte, segLen) + segData := make([]byte, segLen) if _, err := io.ReadFull(f, segData); err != nil { - cam.Error( - "read segments error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err + cam.Error("reading header error:", zap.String("filename", filenames[i]), zap.Error(err)) + return } - packetReader := bytes.NewReader(segData) - packets, err := readPacketSegment(packetReader) + headerReader := bytes.NewReader(segData) + headerSeg, err := readHeaderSegment(headerReader) if err != nil { cam.Error( - "func readPacketSegment error:", zap.String("filename", filenames[i]), zap.Error(err)) - return err + "func readHeaderSegment error:", zap.String("filename", filenames[i]), zap.Error(err)) + return } - segment.Packets = append(segment.Packets, packets...) + // Parse duration of a segment. + per, err = strconv.Atoi(headerSeg.Duration) + if err != nil { + cam.Error("parsing duration error:", zap.String("filename", filenames[i]), zap.Error(err)) + return + } - for _, pkt := range packets { - switch pkt.Type { - case storage.PacketTypeH264: - // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs) - if err != nil { - cam.Warn( - "write H264 packet error:", zap.String("filename", filenames[i]), zap.Error(err)) - } - case storage.PacketTypeLPCM: - // Convert G711 to AAC. - au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples) - if err != nil { - cam.Warn( - "converting to AAC frame error:", zap.String("filename", filenames[i]), zap.Error(err)) - } + // Setup MPEG-TS muxer. + var h264Format format.H264 + var aacFormat format.MPEG4Audio - // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts) - if err != nil { - cam.Warn( - "write G711 packet error:", zap.String("filename", filenames[i]), zap.Error(err)) + filename, _ := strings.CutSuffix(filenames[i], ".insit") + tsFilename := fmt.Sprintf("%s/%s/%s/%s.ts", dirData, file.Name(), resolution, filename) + + currentMpegtsMuxer := processor.MpegtsMuxer{ + FileName: tsFilename, + H264Format: &h264Format, + Mpeg4AudioFormat: &aacFormat, + } + + err = currentMpegtsMuxer.Initialize() + if err != nil { + cam.Error("init muxer error:", zap.String("filename", filenames[i]), zap.Error(err)) + return + } + + // Read segments. + for { + // Read segments length. + var segLen int32 + err := binary.Read(f, binary.LittleEndian, &segLen) + if err != nil { + if err == io.EOF { + break + } + cam.Error( + "read segments length error:", zap.String("filename", filenames[i]), zap.Error(err)) + return + } + // Read segments data. + segData = make([]byte, segLen) + if _, err := io.ReadFull(f, segData); err != nil { + cam.Error( + "read segments error:", zap.String("filename", filenames[i]), zap.Error(err)) + return + } + packetReader := bytes.NewReader(segData) + packets, err := readPacketSegment(packetReader) + if err != nil { + cam.Error( + "func readPacketSegment error:", zap.String("filename", filenames[i]), zap.Error(err)) + return + } + + segment.Packets = append(segment.Packets, packets...) + + for _, pkt := range packets { + switch pkt.Type { + case storage.PacketTypeH264: + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs) + if err != nil { + cam.Warn( + "write H264 packet error:", zap.String("filename", filenames[i]), zap.Error(err)) + } + case storage.PacketTypeLPCM: + // Convert G711 to AAC. + au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples) + if err != nil { + cam.Warn( + "converting to AAC frame error:", zap.String("filename", filenames[i]), zap.Error(err)) + } + + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts) + if err != nil { + cam.Warn( + "write G711 packet error:", zap.String("filename", filenames[i]), zap.Error(err)) + } } } } - } - currentMpegtsMuxer.Close() + currentMpegtsMuxer.Close() - if storage.Exists(tsFilename) { - time.Sleep(period * time.Second) + if storage.Exists(tsFilename) { + time.Sleep(period * time.Second) + } } } } - } + }() } - return errors.New("func CreateVideo downed") + select {} } diff --git a/writer/cmd/main.go b/writer/cmd/main.go index edf2e58..8d0a629 100644 --- a/writer/cmd/main.go +++ b/writer/cmd/main.go @@ -1,22 +1,15 @@ package main import ( - "fmt" - "git.insit.tech/psa/rtsp_reader-writer/writer/internal/metrics" - "time" - "git.insit.tech/psa/rtsp_reader-writer/writer/internal/config" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/rtsp" logger "git.insit.tech/psa/rtsp_reader-writer/writer/internal/log" - log2 "git.insit.tech/sas/rtsp_proxy/core/log" + "git.insit.tech/psa/rtsp_reader-writer/writer/internal/metrics" ) func main() { go metrics.Metrics() - - config.LogsDirectory = log2.DirCreator(config.Local, "logs") - logger.Log = log2.MainLogging( - fmt.Sprintf("%s/writer-main_%s.log", config.LogsDirectory, time.Now().Format("15-04-05_02-01-2006"))) + logger.StartMainLogger(config.Local, "writer") rtsp.StartWriter() diff --git a/writer/internal/ingest/rtsp/rtsp.go b/writer/internal/ingest/rtsp/rtsp.go index d1232f8..d0942d4 100644 --- a/writer/internal/ingest/rtsp/rtsp.go +++ b/writer/internal/ingest/rtsp/rtsp.go @@ -56,7 +56,7 @@ func rtsp(dir string, period int, link string, number int) error { // Create data folder in the directory. dirData := log2.DirCreator(dir, "data") - // Create logger. + // Create cam logger. cutURI := LastPartURI(link) cam := log2.CamLogging(fmt.Sprintf("%s/%s/writer-cam_%s.log", dirData, cutURI, time.Now().Format("15-04-05_02-01-2006"))) diff --git a/writer/internal/log/logger.go b/writer/internal/log/logger.go index 2798951..5c4afe0 100644 --- a/writer/internal/log/logger.go +++ b/writer/internal/log/logger.go @@ -1,7 +1,24 @@ package log -import "go.uber.org/zap" +import ( + "fmt" + "time" + + "git.insit.tech/psa/rtsp_reader-writer/writer/internal/config" + log2 "git.insit.tech/sas/rtsp_proxy/core/log" + "go.uber.org/zap" +) var ( Log *zap.Logger ) + +// StartMainLogger establishes main logger. +func StartMainLogger(local, program string) { + config.LogsDirectory = log2.DirCreator(local, "logs") + Log = log2.MainLogging( + fmt.Sprintf("%s/%s-main_%s.log", + config.LogsDirectory, + program, + time.Now().Format("15-04-05_02-01-2006"))) +}