Refactoring.

This commit is contained in:
Сергей Петров 2025-03-31 14:55:02 +05:00
parent 3dc656b6a2
commit 790a292b0b
9 changed files with 184 additions and 153 deletions

View File

@ -1,14 +1,10 @@
package main package main
import ( import (
"fmt"
"go.uber.org/zap" "go.uber.org/zap"
"reader/internal/config" "reader/internal/config"
"reader/internal/unpacker"
"time"
log2 "git.insit.tech/sas/rtsp_proxy/core/log"
logger "reader/internal/log" logger "reader/internal/log"
"reader/internal/unpacker"
) )
func main() { func main() {
@ -22,9 +18,7 @@ func main() {
// //
//log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) //log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
config.LogsDirectory = log2.DirCreator(config.Local, "logs") logger.StartMainLogger(config.Local, "reader")
logger.Log = log2.MainLogging(
fmt.Sprintf("%s/reader-main_%s.log", config.LogsDirectory, time.Now().Format("15-04-05_02-01-2006")))
err := unpacker.CreateVideo(config.Local) err := unpacker.CreateVideo(config.Local)
if err != nil { if err != nil {

View File

@ -3,7 +3,7 @@ module reader
go 1.24.1 go 1.24.1
require ( require (
git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327060836-29e7a51f8c5e git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250328095722-6d9cef974261
git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c
github.com/bluenviron/gortsplib/v4 v4.12.3 github.com/bluenviron/gortsplib/v4 v4.12.3
github.com/bluenviron/mediacommon v1.14.0 github.com/bluenviron/mediacommon v1.14.0

View File

@ -1,7 +1,5 @@
git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327043613-6b15e0f4ae74 h1:g2g1TM0aHNwo9no0WWucqxRIaUzku1lfFGOdKxbt4Uk= git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250328095722-6d9cef974261 h1:oYqTZakNnU7efuuPAbHQAXY0WXkw9eRvdQxnsALz6b8=
git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327043613-6b15e0f4ae74/go.mod h1:jXcr5WE8GwhuGdtVjHRyWIemXJJ6GL8QK1M7r4F3cz0= git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250328095722-6d9cef974261/go.mod h1:vFg2FZ/v01qLRLionpgocR5MaznCijzUiCSrPou45Ts=
git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327060836-29e7a51f8c5e h1:xcc9QLroFdHuabwtuYHrYgQxqa6hcsu5xH860dD8RFE=
git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250327060836-29e7a51f8c5e/go.mod h1:jXcr5WE8GwhuGdtVjHRyWIemXJJ6GL8QK1M7r4F3cz0=
git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c h1:A/D1INKJI/jkgy4TRamf7HTvQVqGy7qPBdTlZmYWIm0= git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c h1:A/D1INKJI/jkgy4TRamf7HTvQVqGy7qPBdTlZmYWIm0=
git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c/go.mod h1:/AHWd1Otr+ikOLWzpXtoozzifEx9ZKou+R6DgwaEzr0= git.insit.tech/sas/rtsp_proxy v0.0.0-20250326124321-cb817660066c/go.mod h1:/AHWd1Otr+ikOLWzpXtoozzifEx9ZKou+R6DgwaEzr0=
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=

View File

@ -1,7 +1,24 @@
package log package log
import "go.uber.org/zap" import (
"fmt"
"time"
log2 "git.insit.tech/sas/rtsp_proxy/core/log"
"go.uber.org/zap"
"reader/internal/config"
)
var ( var (
Log *zap.Logger Log *zap.Logger
) )
// StartMainLogger establishes main logger.
func StartMainLogger(local, program string) {
config.LogsDirectory = log2.DirCreator(local, "logs")
Log = log2.MainLogging(
fmt.Sprintf("%s/%s-main_%s.log",
config.LogsDirectory,
program,
time.Now().Format("15-04-05_02-01-2006")))
}

View File

@ -2,6 +2,7 @@ package processor
import ( import (
"bufio" "bufio"
"fmt"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"log" "log"
"os" "os"
@ -12,6 +13,12 @@ import (
"github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/bluenviron/mediacommon/pkg/formats/mpegts"
) )
var (
i = 0
f = 0
j = 0
)
func multiplyAndDivide(v, m, d int64) int64 { func multiplyAndDivide(v, m, d int64) int64 {
secs := v / d secs := v / d
dec := v % d dec := v % d
@ -74,7 +81,9 @@ func (e *MpegtsMuxer) WriteH264(pts int64, au [][]byte) error {
idrPresent := false idrPresent := false
for _, nalu := range au { for _, nalu := range au {
i++
typ := h264.NALUType(nalu[0] & 0x1F) typ := h264.NALUType(nalu[0] & 0x1F)
f++
switch typ { switch typ {
case h264.NALUTypeSPS: case h264.NALUTypeSPS:
e.H264Format.SPS = nalu e.H264Format.SPS = nalu
@ -89,6 +98,7 @@ func (e *MpegtsMuxer) WriteH264(pts int64, au [][]byte) error {
case h264.NALUTypeIDR: case h264.NALUTypeIDR:
idrPresent = true idrPresent = true
j++
case h264.NALUTypeNonIDR: case h264.NALUTypeNonIDR:
nonIDRPresent = true nonIDRPresent = true
@ -97,6 +107,8 @@ func (e *MpegtsMuxer) WriteH264(pts int64, au [][]byte) error {
filteredAU = append(filteredAU, nalu) filteredAU = append(filteredAU, nalu)
} }
fmt.Println(i, f, j)
au = filteredAU au = filteredAU
if au == nil || (!nonIDRPresent && !idrPresent) { if au == nil || (!nonIDRPresent && !idrPresent) {

View File

@ -21,8 +21,6 @@ import (
"reader/internal/processor" "reader/internal/processor"
) )
// dir
// CreateVideo generate TS files and M3U8 playlists. // CreateVideo generate TS files and M3U8 playlists.
func CreateVideo(dir string) error { func CreateVideo(dir string) error {
// Check if the data folder in the directory. // Check if the data folder in the directory.
@ -70,154 +68,156 @@ func CreateVideo(dir string) error {
// Create M3U8 playlist. // Create M3U8 playlist.
go gen.MediaPlaylistGenerator(dirData, file.Name(), float64(period), resolutions, cam) go gen.MediaPlaylistGenerator(dirData, file.Name(), float64(period), resolutions, cam)
for { go func() {
for _, resolution := range resolutions { for {
filenames := gen.StringDirEntryList( for _, resolution := range resolutions {
fmt.Sprintf("%s/%s/%s", dirData, file.Name(), resolution), "insit", logger.Log) filenames := gen.StringDirEntryList(
if len(filenames) == 0 { fmt.Sprintf("%s/%s/%s", dirData, file.Name(), resolution), "insit", logger.Log)
break if len(filenames) == 0 {
} break
for i := len(filenames) - 2; i < len(filenames)-1; i++ {
segment := storage.Segment{}
// Open file for reading.
f, err := os.Open(fmt.Sprintf("%s/%s/%s/%s", dirData, file.Name(), resolution, filenames[i]))
if err != nil {
cam.Error(
"opening file error for file:", zap.String("filename", filenames[i]), zap.Error(err))
return err
}
defer f.Close()
// Read StreamID.
var streamIDLen int32
if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil {
cam.Error(
"reading StreamID length error:", zap.String("filename", filenames[i]), zap.Error(err))
return err
}
streamIDBytes := make([]byte, streamIDLen)
if _, err := io.ReadFull(f, streamIDBytes); err != nil {
cam.Error("reading StreamID error:", zap.String("filename", filenames[i]), zap.Error(err))
return err
}
streamID := string(streamIDBytes)
cam.Info(
"cam with Stream ID started to convert:",
zap.String("filename", filenames[i]),
zap.String("stream_id", streamID))
// Read header of the file.
var segLen int32
if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil {
cam.Error(
"reading header length error:", zap.String("filename", filenames[i]), zap.Error(err))
return err
}
segData := make([]byte, segLen)
if _, err := io.ReadFull(f, segData); err != nil {
cam.Error("reading header error:", zap.String("filename", filenames[i]), zap.Error(err))
return err
}
headerReader := bytes.NewReader(segData)
headerSeg, err := readHeaderSegment(headerReader)
if err != nil {
cam.Error(
"func readHeaderSegment error:", zap.String("filename", filenames[i]), zap.Error(err))
return err
} }
// Parse duration of a segment. for i := len(filenames) - 2; i < len(filenames)-1; i++ {
per, err = strconv.Atoi(headerSeg.Duration) segment := storage.Segment{}
if err != nil {
cam.Error("parsing duration error:", zap.String("filename", filenames[i]), zap.Error(err))
return err
}
// Setup MPEG-TS muxer. // Open file for reading.
var h264Format format.H264 f, err := os.Open(fmt.Sprintf("%s/%s/%s/%s", dirData, file.Name(), resolution, filenames[i]))
var aacFormat format.MPEG4Audio if err != nil {
cam.Error(
"opening file error for file:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
defer f.Close()
filename, _ := strings.CutSuffix(filenames[i], ".insit") // Read StreamID.
tsFilename := fmt.Sprintf("%s/%s/%s/%s.ts", dirData, file.Name(), resolution, filename) var streamIDLen int32
if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil {
cam.Error(
"reading StreamID length error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
streamIDBytes := make([]byte, streamIDLen)
if _, err := io.ReadFull(f, streamIDBytes); err != nil {
cam.Error("reading StreamID error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
streamID := string(streamIDBytes)
cam.Info(
"cam with Stream ID started to convert:",
zap.String("filename", filenames[i]),
zap.String("stream_id", streamID))
currentMpegtsMuxer := processor.MpegtsMuxer{ // Read header of the file.
FileName: tsFilename,
H264Format: &h264Format,
Mpeg4AudioFormat: &aacFormat,
}
err = currentMpegtsMuxer.Initialize()
if err != nil {
cam.Error("init muxer error:", zap.String("filename", filenames[i]), zap.Error(err))
return err
}
// Read segments.
for {
// Read segments length.
var segLen int32 var segLen int32
err := binary.Read(f, binary.LittleEndian, &segLen) if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil {
if err != nil {
if err == io.EOF {
break
}
cam.Error( cam.Error(
"read segments length error:", zap.String("filename", filenames[i]), zap.Error(err)) "reading header length error:", zap.String("filename", filenames[i]), zap.Error(err))
return err return
} }
// Read segments data. segData := make([]byte, segLen)
segData = make([]byte, segLen)
if _, err := io.ReadFull(f, segData); err != nil { if _, err := io.ReadFull(f, segData); err != nil {
cam.Error( cam.Error("reading header error:", zap.String("filename", filenames[i]), zap.Error(err))
"read segments error:", zap.String("filename", filenames[i]), zap.Error(err)) return
return err
} }
packetReader := bytes.NewReader(segData) headerReader := bytes.NewReader(segData)
packets, err := readPacketSegment(packetReader) headerSeg, err := readHeaderSegment(headerReader)
if err != nil { if err != nil {
cam.Error( cam.Error(
"func readPacketSegment error:", zap.String("filename", filenames[i]), zap.Error(err)) "func readHeaderSegment error:", zap.String("filename", filenames[i]), zap.Error(err))
return err return
} }
segment.Packets = append(segment.Packets, packets...) // Parse duration of a segment.
per, err = strconv.Atoi(headerSeg.Duration)
if err != nil {
cam.Error("parsing duration error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
for _, pkt := range packets { // Setup MPEG-TS muxer.
switch pkt.Type { var h264Format format.H264
case storage.PacketTypeH264: var aacFormat format.MPEG4Audio
// Encode the access unit into MPEG-TS.
err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs)
if err != nil {
cam.Warn(
"write H264 packet error:", zap.String("filename", filenames[i]), zap.Error(err))
}
case storage.PacketTypeLPCM:
// Convert G711 to AAC.
au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples)
if err != nil {
cam.Warn(
"converting to AAC frame error:", zap.String("filename", filenames[i]), zap.Error(err))
}
// Encode the access unit into MPEG-TS. filename, _ := strings.CutSuffix(filenames[i], ".insit")
err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts) tsFilename := fmt.Sprintf("%s/%s/%s/%s.ts", dirData, file.Name(), resolution, filename)
if err != nil {
cam.Warn( currentMpegtsMuxer := processor.MpegtsMuxer{
"write G711 packet error:", zap.String("filename", filenames[i]), zap.Error(err)) FileName: tsFilename,
H264Format: &h264Format,
Mpeg4AudioFormat: &aacFormat,
}
err = currentMpegtsMuxer.Initialize()
if err != nil {
cam.Error("init muxer error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
// Read segments.
for {
// Read segments length.
var segLen int32
err := binary.Read(f, binary.LittleEndian, &segLen)
if err != nil {
if err == io.EOF {
break
}
cam.Error(
"read segments length error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
// Read segments data.
segData = make([]byte, segLen)
if _, err := io.ReadFull(f, segData); err != nil {
cam.Error(
"read segments error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
packetReader := bytes.NewReader(segData)
packets, err := readPacketSegment(packetReader)
if err != nil {
cam.Error(
"func readPacketSegment error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
segment.Packets = append(segment.Packets, packets...)
for _, pkt := range packets {
switch pkt.Type {
case storage.PacketTypeH264:
// Encode the access unit into MPEG-TS.
err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs)
if err != nil {
cam.Warn(
"write H264 packet error:", zap.String("filename", filenames[i]), zap.Error(err))
}
case storage.PacketTypeLPCM:
// Convert G711 to AAC.
au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples)
if err != nil {
cam.Warn(
"converting to AAC frame error:", zap.String("filename", filenames[i]), zap.Error(err))
}
// Encode the access unit into MPEG-TS.
err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts)
if err != nil {
cam.Warn(
"write G711 packet error:", zap.String("filename", filenames[i]), zap.Error(err))
}
} }
} }
} }
} currentMpegtsMuxer.Close()
currentMpegtsMuxer.Close()
if storage.Exists(tsFilename) { if storage.Exists(tsFilename) {
time.Sleep(period * time.Second) time.Sleep(period * time.Second)
}
} }
} }
} }
} }()
} }
return errors.New("func CreateVideo downed") select {}
} }

View File

@ -1,22 +1,15 @@
package main package main
import ( import (
"fmt"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/metrics"
"time"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/config" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/config"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/rtsp" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/rtsp"
logger "git.insit.tech/psa/rtsp_reader-writer/writer/internal/log" logger "git.insit.tech/psa/rtsp_reader-writer/writer/internal/log"
log2 "git.insit.tech/sas/rtsp_proxy/core/log" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/metrics"
) )
func main() { func main() {
go metrics.Metrics() go metrics.Metrics()
logger.StartMainLogger(config.Local, "writer")
config.LogsDirectory = log2.DirCreator(config.Local, "logs")
logger.Log = log2.MainLogging(
fmt.Sprintf("%s/writer-main_%s.log", config.LogsDirectory, time.Now().Format("15-04-05_02-01-2006")))
rtsp.StartWriter() rtsp.StartWriter()

View File

@ -56,7 +56,7 @@ func rtsp(dir string, period int, link string, number int) error {
// Create data folder in the directory. // Create data folder in the directory.
dirData := log2.DirCreator(dir, "data") dirData := log2.DirCreator(dir, "data")
// Create logger. // Create cam logger.
cutURI := LastPartURI(link) cutURI := LastPartURI(link)
cam := log2.CamLogging(fmt.Sprintf("%s/%s/writer-cam_%s.log", dirData, cutURI, time.Now().Format("15-04-05_02-01-2006"))) cam := log2.CamLogging(fmt.Sprintf("%s/%s/writer-cam_%s.log", dirData, cutURI, time.Now().Format("15-04-05_02-01-2006")))

View File

@ -1,7 +1,24 @@
package log package log
import "go.uber.org/zap" import (
"fmt"
"time"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/config"
log2 "git.insit.tech/sas/rtsp_proxy/core/log"
"go.uber.org/zap"
)
var ( var (
Log *zap.Logger Log *zap.Logger
) )
// StartMainLogger establishes main logger.
func StartMainLogger(local, program string) {
config.LogsDirectory = log2.DirCreator(local, "logs")
Log = log2.MainLogging(
fmt.Sprintf("%s/%s-main_%s.log",
config.LogsDirectory,
program,
time.Now().Format("15-04-05_02-01-2006")))
}