From 489b60b11e7849362aa7a76faf5fe26a917537f1 Mon Sep 17 00:00:00 2001 From: Sergey Petrov Date: Tue, 25 Mar 2025 15:19:12 +0500 Subject: [PATCH] Part program in two logical units: ingest and storage. --- writer/cmd/main.go | 4 +- writer/go.mod | 11 +- writer/go.sum | 37 +- .../editor.go => ingest/rtsp/operator.go} | 27 +- writer/internal/ingest/rtsp/rtsp.go | 1133 +++++++++-------- writer/internal/storage/file.go | 121 +- writer/internal/storage/file_manager.go | 19 + writer/internal/storage/segmenter.go | 162 --- writer/internal/storage/temp.go | 67 + 9 files changed, 802 insertions(+), 779 deletions(-) rename writer/internal/{storage/editor.go => ingest/rtsp/operator.go} (50%) create mode 100644 writer/internal/storage/file_manager.go delete mode 100644 writer/internal/storage/segmenter.go create mode 100644 writer/internal/storage/temp.go diff --git a/writer/cmd/main.go b/writer/cmd/main.go index c4ff8d2..eef343f 100644 --- a/writer/cmd/main.go +++ b/writer/cmd/main.go @@ -12,13 +12,13 @@ func main() { flag.Parse() // Parse camera links from YAML file into struct Cameras. - c, err := config.ParseCamerasYAML(*directory) + cams, err := config.ParseCamerasYAML(*directory) if err != nil { panic(err) } // Connect to each camera. - for _, link := range c { + for _, link := range cams { log.Printf("process camera:\n %s\n", link) go func() { diff --git a/writer/go.mod b/writer/go.mod index 4c1ffb3..046600c 100644 --- a/writer/go.mod +++ b/writer/go.mod @@ -3,14 +3,14 @@ module git.insit.tech/psa/rtsp_reader-writer/writer go 1.24.1 require ( - git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e + git.insit.tech/sas/rtsp_proxy v0.0.0-20250325071536-7b97f96d64e6 github.com/Eyevinn/mp4ff v0.47.0 github.com/bluenviron/gortsplib/v4 v4.12.3 github.com/bluenviron/mediacommon v1.14.0 github.com/bluenviron/mediacommon/v2 v2.0.0 github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 github.com/golang/snappy v1.0.0 - github.com/pion/rtp v1.8.12 + github.com/pion/rtp v1.8.13 github.com/zaf/g711 v1.4.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -18,12 +18,15 @@ require ( require ( github.com/asticode/go-astikit v0.52.0 // indirect github.com/asticode/go-astits v1.13.0 // indirect - github.com/cyub/ringbuffer v0.0.0-20221202135829-35445cc89929 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grafov/m3u8 v0.12.1 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.15 // indirect - github.com/pion/sdp/v3 v3.0.10 // indirect + github.com/pion/sdp/v3 v3.0.11 // indirect + github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.37.0 // indirect golang.org/x/sys v0.31.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/writer/go.sum b/writer/go.sum index f8b17ea..848c717 100644 --- a/writer/go.sum +++ b/writer/go.sum @@ -1,5 +1,5 @@ -git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e h1:JeOZvcZA4JHfoBG5ES9tpSHrhOj1jmjFzSJAyKIjApU= -git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e/go.mod h1:9Yw6g7jcG9fIkWh6FGXSWTyZs4d7EQWaRhdKnnH2TvA= +git.insit.tech/sas/rtsp_proxy v0.0.0-20250325071536-7b97f96d64e6 h1:9WyrbPswZtjr3sqoCYaqus22j5st2HuVjWMiBTAorsM= +git.insit.tech/sas/rtsp_proxy v0.0.0-20250325071536-7b97f96d64e6/go.mod h1:/AHWd1Otr+ikOLWzpXtoozzifEx9ZKou+R6DgwaEzr0= github.com/Eyevinn/mp4ff v0.47.0 h1:XSSHYt5+I0fyOnHWoNwM72DtivlmHFR0V9azgIi+ZVU= github.com/Eyevinn/mp4ff v0.47.0/go.mod h1:hJNUUqOBryLAzUW9wpCJyw2HaI+TCd2rUPhafoS5lgg= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= @@ -13,11 +13,9 @@ github.com/bluenviron/mediacommon v1.14.0 h1:lWCwOBKNKgqmspRpwpvvg3CidYm+XOc2+z/ github.com/bluenviron/mediacommon v1.14.0/go.mod h1:z5LP9Tm1ZNfQV5Co54PyOzaIhGMusDfRKmh42nQSnyo= github.com/bluenviron/mediacommon/v2 v2.0.0 h1:JinZ9v2x6QeAOzA0cDA6aFe8vQuCrU8OyWEhG2iNzwY= github.com/bluenviron/mediacommon/v2 v2.0.0/go.mod h1:iHEz1SFIet6zBwAQoh1a92vTQ3dV3LpVFbom6/SLz3k= -github.com/cyub/ringbuffer v0.0.0-20221202135829-35445cc89929 h1:ZPgY61C6ZvVsr4YIBOOcXZ2lw/FuJ4GcoTRPDkHQ3Zg= -github.com/cyub/ringbuffer v0.0.0-20221202135829-35445cc89929/go.mod h1:049xI0W4vQexZfPJl70HOMs0eJ7FAZxTtm4iPp1cEDY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 h1:yfrARW/aVlqKORCdKrYdU0PZUKPqQvYEUQBKfVlNa9Q= github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21/go.mod h1:HZqGD/LXHB1VCGUGNzuyxSsD12f3KjbJbvImAmoK/mM= github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg= @@ -32,20 +30,16 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= -github.com/pion/rtp v1.8.12 h1:nsKs8Wi0jQyBFHU3qmn/OvtZrhktVfJY0vRxwACsL5U= -github.com/pion/rtp v1.8.12/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= -github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA= -github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= +github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg= +github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= +github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI= +github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/youpy/go-riff v0.1.0 h1:vZO/37nI4tIET8tQI0Qn0Y79qQh99aEpponTPiPut7k= @@ -54,13 +48,22 @@ github.com/youpy/go-wav v0.3.2 h1:NLM8L/7yZ0Bntadw/0h95OyUsen+DQIVf9gay+SUsMU= github.com/youpy/go-wav v0.3.2/go.mod h1:0FCieAXAeSdcxFfwLpRuEo0PFmAoc+8NU34h7TUvk50= github.com/zaf/g711 v1.4.0 h1:XZYkjjiAg9QTBnHqEg37m2I9q3IIDv5JRYXs2N8ma7c= github.com/zaf/g711 v1.4.0/go.mod h1:eCDXt3dSp/kYYAoooba7ukD/Q75jvAaS4WOMr0l1Roo= +github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2 h1:0iAY2pL6yYhNYpdc1DbFq0p7ocyu5MlgKmkealhz3nk= +github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2/go.mod h1:c8Gxxfmh0jmZ6G+ISlpa315WBVkzd8mEhu6gN9mn5Qg= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/writer/internal/storage/editor.go b/writer/internal/ingest/rtsp/operator.go similarity index 50% rename from writer/internal/storage/editor.go rename to writer/internal/ingest/rtsp/operator.go index 24d3de8..c59d3b5 100644 --- a/writer/internal/storage/editor.go +++ b/writer/internal/ingest/rtsp/operator.go @@ -1,33 +1,19 @@ -package storage +package rtsp import ( - "git.insit.tech/sas/rtsp_proxy/protos" "log" "strings" "time" ) -// CutURI returns the last part of the URI after "/". -func CutURI(URI string) (CutterURI string) { +// cutURI returns the last part of the URI after "/". +func cutURI(URI string) (CutURI string) { splitted := strings.Split(URI, "/") return splitted[len(splitted)-1] } -// CreateFileName creates FileName structure. -func CreateFileName(dir string, resolutions []string, cuttedURI string, period int) *protos.FileName { - fn := protos.FileName{ - Path: dir + "/data/" + cuttedURI + "/" + resolutions[0], - TimeNow: time.Now().Format("15-04-05_02-01-2006"), - Name: "videoFragment", - Number: -1, - Duration: float64(period), - } - - return &fn -} - -// WaitPeriod waits for the next period. -func WaitPeriod(period int) { +// waitPeriod waits for the next period. +func waitPeriod(period int) { periodTD := time.Duration(period) * time.Second now := time.Now() @@ -37,7 +23,8 @@ func WaitPeriod(period int) { time.Sleep(waitDuration) } -func FindResolution(Body []byte) string { +// findResolution finds resolution in SDP. +func findResolution(Body []byte) string { split := strings.Split(string(Body), "\r\n") for _, line := range split { if strings.Contains(line, "a=x-dimensions:") { diff --git a/writer/internal/ingest/rtsp/rtsp.go b/writer/internal/ingest/rtsp/rtsp.go index 4d088ea..b330196 100644 --- a/writer/internal/ingest/rtsp/rtsp.go +++ b/writer/internal/ingest/rtsp/rtsp.go @@ -12,14 +12,12 @@ import ( "git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/storage" - "git.insit.tech/sas/rtsp_proxy/protos/gens" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" "github.com/pion/rtp" - _ "github.com/zaf/g711" ) // StartWriter starts the program. @@ -61,19 +59,25 @@ func RTSP(dir string, period int, link string) error { } // Create file name structure and directory for files. - resolution := storage.FindResolution(res.Body) + resolution := findResolution(res.Body) resolutions := []string{resolution} + cutURI := cutURI(link) - cuttedURI := storage.CutURI(link) - fn := storage.CreateFileName(dir, resolutions, cuttedURI, period) + fn := storage.CreateFileName(dir, resolutions, cutURI, period) err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755) if err != nil { return fmt.Errorf("mkdirall error: %w", err) } - // Create M3U8 playlist. - go gens.MediaPlaylistGenerator(dir+"/data/"+cuttedURI, "", fn.Duration, resolutions) + //////////////////////////////////////////////////////////////////////////////////////////////////// + + /* + // Create M3U8 playlist. + go gen.MediaPlaylistGenerator(dir+"/data/"+cutURI, "", fn.Duration, resolutions) + */ + + //////////////////////////////////////////////////////////////////////////////////////////////////// // Find formats. var audioFormat string @@ -110,60 +114,60 @@ func RTSP(dir string, period int, link string) error { } else { log.Println(err) } - /* - h265Format, h265Media, err := formats.FindH265Format(desc) - if h265Format != nil { - log.Println("[h265]: format found") - videoFormat = "H265" - } else { - log.Println(err) - } - lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc) - if lpcmFormat != nil { - log.Println("[lpcm]: format found") - audioFormat = "LPCM" - } else { - log.Println(err) - } + h265Format, h265Media, err := formats.FindH265Format(desc) + if h265Format != nil { + log.Println("[h265]: format found") + videoFormat = "H265" + } else { + log.Println(err) + } - mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc) - if mjpegFormat != nil { - log.Println("[mjpeg]: format found") - videoFormat = "MJPEG" - } else { - log.Println(err) - } + lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc) + if lpcmFormat != nil { + log.Println("[lpcm]: format found") + audioFormat = "LPCM" + } else { + log.Println(err) + } - opusFormat, opusMedia, err := formats.FindOPUSFormat(desc) - if opusFormat != nil { - log.Println("[opus]: format found") - audioFormat = "OPUS" - } else { - log.Println(err) - } + mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc) + if mjpegFormat != nil { + log.Println("[mjpeg]: format found") + videoFormat = "MJPEG" + } else { + log.Println(err) + } - vp8Format, vp8Media, err := formats.FindVP8Format(desc) - if vp8Format != nil { - log.Println("[vp8]: format found") - videoFormat = "VP8" - } else { - log.Println(err) - } + opusFormat, opusMedia, err := formats.FindOPUSFormat(desc) + if opusFormat != nil { + log.Println("[opus]: format found") + audioFormat = "OPUS" + } else { + log.Println(err) + } + + vp8Format, vp8Media, err := formats.FindVP8Format(desc) + if vp8Format != nil { + log.Println("[vp8]: format found") + videoFormat = "VP8" + } else { + log.Println(err) + } + + vp9Format, vp9Media, err := formats.FindVP9Format(desc) + if vp9Format != nil { + log.Println("[vp9]: format found") + videoFormat = "VP9" + } else { + log.Println(err) + } - vp9Format, vp9Media, err := formats.FindVP9Format(desc) - if vp9Format != nil { - log.Println("[vp9]: format found") - videoFormat = "VP9" - } else { - log.Println(err) - } - */ // Start program according to gotten formats. switch { case videoFormat == "AV1" && audioFormat == "": // Wait for the next period. - storage.WaitPeriod(period) + waitPeriod(period) log.Printf("[%v]: start recording", videoFormat) // Create decoder. @@ -231,7 +235,7 @@ func RTSP(dir string, period int, link string) error { case videoFormat == "" && audioFormat == "G711": // Wait for the next period. - storage.WaitPeriod(period) + waitPeriod(period) log.Printf("[%v]: start recording", audioFormat) // Create decoder. @@ -297,31 +301,31 @@ func RTSP(dir string, period int, link string) error { case videoFormat == "H264" && audioFormat == "AAC": // Wait for the next period. - storage.WaitPeriod(period) + waitPeriod(period) log.Printf("[%v-%v]: start recording", videoFormat, audioFormat) // Create decoders. - h264RTPDec, err := h264Format.CreateDecoder() - if err != nil { - log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) - } + //h264RTPDec, err := h264Format.CreateDecoder() + //if err != nil { + // log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + //} + // + //aacRTPDec, err := aacFormat.CreateDecoder() + //if err != nil { + // log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + //} - aacRTPDec, err := aacFormat.CreateDecoder() - if err != nil { - log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) - } + //// Setup MPEG-TS muxer. + //currentMpegtsMuxer := formats.MpegtsMuxer{ + // FileName: fn.SetNumNTime(), + // H264Format: h264Format, + // Mpeg4AudioFormat: aacFormat, + //} - // Setup MPEG-TS muxer. - currentMpegtsMuxer := formats.MpegtsMuxer{ - FileName: fn.SetNumNTime(), - H264Format: h264Format, - Mpeg4AudioFormat: aacFormat, - } - - err = currentMpegtsMuxer.Initialize() - if err != nil { - return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) - } + //err = currentMpegtsMuxer.Initialize() + //if err != nil { + // return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) + //} // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) @@ -333,32 +337,32 @@ func RTSP(dir string, period int, link string) error { c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { switch forma.(type) { case *format.H264: - // Process H264 flow and return PTS and AU. - pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat) - if err != nil { - log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) - return - } - - // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteH264(pts, au) - if err != nil { - return - } + //// Process H264 flow and return PTS and AU. + //pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat) + //if err != nil { + // log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) + // return + //} + // + //// Encode the access unit into MPEG-TS. + //err = currentMpegtsMuxer.WriteH264(pts, au) + //if err != nil { + // return + //} case *format.MPEG4Audio: - // Process AAC flow and return PTS and AUS. - pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) - if err != nil { - log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) - return - } - - // Encode access units into MPEG-TS. - err = currentMpegtsMuxer.WriteAAC(aus, pts) - if err != nil { - return - } + //// Process AAC flow and return PTS and AUS. + //pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) + //if err != nil { + // log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) + // return + //} + // + //// Encode access units into MPEG-TS. + //err = currentMpegtsMuxer.WriteAAC(aus, pts) + //if err != nil { + // return + //} } }) @@ -376,26 +380,26 @@ func RTSP(dir string, period int, link string) error { // Rotate files. go func() { for range ticker.C { - // Logic for rotation files. - currentMpegtsMuxer.Close() - currentMpegtsMuxer.FileName = fn.SetNumNTime() - - err = currentMpegtsMuxer.Initialize() - if err != nil { - log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) - return - } - - log.Printf("[%v-%v]: new file for recording created: %v", - videoFormat, audioFormat, currentMpegtsMuxer.FileName) + //// Logic for rotation files. + //currentMpegtsMuxer.Close() + //currentMpegtsMuxer.FileName = fn.SetNumNTime() + // + //err = currentMpegtsMuxer.Initialize() + //if err != nil { + // log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) + // return + //} + // + //log.Printf("[%v-%v]: new file for recording created: %v", + // videoFormat, audioFormat, currentMpegtsMuxer.FileName) } }() panic(c.Wait()) - case videoFormat == "H264" && audioFormat == "G711": + case videoFormat == "H264" && audioFormat == "G711" || videoFormat == "H264" && audioFormat == "": // Wait for the next period. - storage.WaitPeriod(period) + waitPeriod(period) log.Printf("[%v-%v]: start recording", videoFormat, audioFormat) // Create decoders. @@ -428,29 +432,28 @@ func RTSP(dir string, period int, link string) error { if err != nil { return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err) } - /////////////////////////////////////////////////////////// - file, err := os.Create( - dir + "/data/" + cuttedURI + "/" + time.Now().Format("15-04-05_02-01-2006") + ".insit") + + file, err := os.Create(fn.SetNumNTime("insit")) if err != nil { - fmt.Println("Ошибка создания файла:", err) + fmt.Println("creating file error:", err) } defer file.Close() seg := storage.Segment{ - Start: time.Now().Format("15-04-05_02-01-2006"), + Date: time.Now().Format("15-04-05_02-01-2006"), Duration: strconv.Itoa(period), Packets: storage.InterleavedPacket{}, } - // Записываем заголовок файла (например, streamID). - streamID := cuttedURI - if err := binary.Write(file, binary.LittleEndian, int32(len(streamID))); err != nil { - fmt.Println("Ошибка записи заголовка:", err) + // Write StreamID. + if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil { + fmt.Println("write StreamID length error:", err) } - if _, err := file.Write([]byte(streamID)); err != nil { - fmt.Println("Ошибка записи streamID:", err) + if _, err := file.Write([]byte(cutURI)); err != nil { + fmt.Println("write StreamID error:", err) } + // Write header of the file. err = storage.WriteHeader(file, seg) if err != nil { return fmt.Errorf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err) @@ -468,17 +471,18 @@ func RTSP(dir string, period int, link string) error { } if au != nil { - + // Add appropriate lines to the interleaved packet. seg.Packets.Type = storage.PacketTypeH264 seg.Packets.Pts = pts seg.Packets.H264AUs = au - // Записываем сегмент с interleaved пакетами. + // Write segment with interleaved packets. if err := storage.WriteInterleavedPacket(file, seg); err != nil { - fmt.Println("Ошибка записи сегмента:", err) + fmt.Println("write segment error:", err) return } } + //// Encode the access unit into MPEG-TS. //err = currentMpegtsMuxer.WriteH264(pts, au) //if err != nil { @@ -494,16 +498,17 @@ func RTSP(dir string, period int, link string) error { } if au != nil { - + // Convert G711 to LPCM. lpcmSamples := formats.ConvertG711ToLPCM(au, f.MULaw) - seg.Packets.Type = storage.PacketTypeH264 + // Add appropriate lines to the interleaved packet. + seg.Packets.Type = storage.PacketTypeLPCM seg.Packets.Pts = pts seg.Packets.LPCMSamples = lpcmSamples - // Записываем сегмент с interleaved пакетами. + // Write segment with interleaved packets. if err := storage.WriteInterleavedPacket(file, seg); err != nil { - fmt.Println("Ошибка записи сегмента:", err) + fmt.Println("write segment error:", err) return } } @@ -549,568 +554,566 @@ func RTSP(dir string, period int, link string) error { file.Close() - file, err = os.Create( - dir + "/data/" + cuttedURI + "/" + time.Now().Format("15-04-05_02-01-2006") + ".insit") + file, err = os.Create(fn.SetNumNTime("insit")) if err != nil { - fmt.Println("Ошибка создания файла:", err) + fmt.Println("creating file error:", err) } seg = storage.Segment{ - Start: time.Now().Format("15-04-05_02-01-2006"), + Date: time.Now().Format("15-04-05_02-01-2006"), Duration: strconv.Itoa(period), Packets: storage.InterleavedPacket{}, } - // Записываем заголовок файла (например, streamID). - streamID = cuttedURI - if err := binary.Write(file, binary.LittleEndian, int32(len(streamID))); err != nil { - fmt.Println("Ошибка записи заголовка:", err) + // Write StreamID. + if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil { + fmt.Println("write StreamID length error:", err) } - if _, err := file.Write([]byte(streamID)); err != nil { - fmt.Println("Ошибка записи streamID:", err) + if _, err := file.Write([]byte(cutURI)); err != nil { + fmt.Println("write StreamID error:", err) } + // Write header of the file. err = storage.WriteHeader(file, seg) if err != nil { log.Printf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err) } log.Printf("[%v-%v]: new file for recording created: %v", - videoFormat, audioFormat, seg.Start+".insit") + videoFormat, audioFormat, seg.Date+".insit") } }() - /* - panic(c.Wait()) + panic(c.Wait()) - case videoFormat == "H264" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + case videoFormat == "H264-" && audioFormat == "": + // Wait for the next period. + waitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) - // Create decoder. - h264RTPDec, err := h264Format.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + // Create decoder. + h264RTPDec, err := h264Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - // Setup H264 -> RGBA decoder. - h264Dec := &formats.H264Decoder{} - err = h264Dec.Initialize() - if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) - } - defer h264Dec.Close() + // Setup H264 -> RGBA decoder. + h264Dec := &formats.H264Decoder{} + err = h264Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer h264Dec.Close() - // if SPS and PPS are present into the SDP, send them to the decoder - if h264Format.SPS != nil { - h264Dec.Decode([][]byte{h264Format.SPS}) - } - if h264Format.PPS != nil { - h264Dec.Decode([][]byte{h264Format.PPS}) - } + // if SPS and PPS are present into the SDP, send them to the decoder + if h264Format.SPS != nil { + h264Dec.Decode([][]byte{h264Format.SPS}) + } + if h264Format.PPS != nil { + h264Dec.Decode([][]byte{h264Format.PPS}) + } - // Setup media. - _, err = c.Setup(desc.BaseURL, h264Media, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } + // Setup media. + _, err = c.Setup(desc.BaseURL, h264Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } - firstRandomAccess := false + firstRandomAccess := false - // Process input rtp packets. - c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) { - // Process H264 flow and return PTS and IMG. - pts, img, err := formats.ProcessH264RGBA( - &c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat) + // Process input rtp packets. + c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) { + // Process H264 flow and return PTS and IMG. + pts, img, err := formats.ProcessH264RGBA( + &c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return + } + + log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return + panic(err) } - log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) - }) + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "H265" && audioFormat == "": + // Wait for the next period. + waitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + h265RTPDec, err := h265Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - err = currentMpegtsMuxer.initialize() - if err != nil { - panic(err) - } + // Setup H264 -> RGBA decoder. + h265Dec := &formats.H265Decoder{} + err = h265Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer h265Dec.Close() - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + // If VPS, SPS and PPS are present into the SDP, send them to the decoder. + if h265Format.VPS != nil { + h265Dec.Decode([][]byte{h265Format.VPS}) + } + if h265Format.SPS != nil { + h265Dec.Decode([][]byte{h265Format.SPS}) + } + if h265Format.PPS != nil { + h265Dec.Decode([][]byte{h265Format.PPS}) + } - } - }() + // Setup media. + _, err = c.Setup(desc.BaseURL, h265Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } - panic(c.Wait()) + firstRandomAccess := false - case videoFormat == "H265" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + // Process input rtp packets. + c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) { + // Process H265 flow and return PTS and IMG. + pts, img, err := formats.ProcessH265RGBA( + &c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return + } - // Create decoder. - h265RTPDec, err := h265Format.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + }) - // Setup H264 -> RGBA decoder. - h265Dec := &formats.H265Decoder{} - err = h265Dec.Initialize() - if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) - } - defer h265Dec.Close() + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } - // If VPS, SPS and PPS are present into the SDP, send them to the decoder. - if h265Format.VPS != nil { - h265Dec.Decode([][]byte{h265Format.VPS}) - } - if h265Format.SPS != nil { - h265Dec.Decode([][]byte{h265Format.SPS}) - } - if h265Format.PPS != nil { - h265Dec.Decode([][]byte{h265Format.PPS}) - } + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Setup media. - _, err = c.Setup(desc.BaseURL, h265Media, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - firstRandomAccess := false - - // Process input rtp packets. - c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) { - // Process H265 flow and return PTS and IMG. - pts, img, err := formats.ProcessH265RGBA( - &c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat) + err = currentMpegtsMuxer.initialize() if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return + panic(err) } - log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) - }) + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "" && audioFormat == "LPCM": + // Wait for the next period. + waitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + lpcmRTPDec, err := lpcmFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } - err = currentMpegtsMuxer.initialize() - if err != nil { - panic(err) - } + // Setup media. + _, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0) + if err != nil { + log.Printf("[%v]: setup media error: %v\n", audioFormat, err) + } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + // Process input rtp packets. + c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) { + // Process LPCM flow and return PTS and SAMPLES. + pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } - } - }() + log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples)) + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } - case videoFormat == "" && audioFormat == "LPCM": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - lpcmRTPDec, err := lpcmFormat.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0) - if err != nil { - log.Printf("[%v]: setup media error: %v\n", audioFormat, err) - } - - // Process input rtp packets. - c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) { - // Process LPCM flow and return PTS and SAMPLES. - pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat) + err = currentMpegtsMuxer.initialize() if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + panic(err) } - log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples)) - }) + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "MJPEG" && audioFormat == "": + // Wait for the next period. + waitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + mjpegRTPDec, err := mjpegFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - err = currentMpegtsMuxer.initialize() - if err != nil { - panic(err) - } + // Setup media. + _, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + // Process input rtp packets. + c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) { + // Process MJPEG flow and return PTS and IMG. + pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return + } - } - }() + log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } - case videoFormat == "MJPEG" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - mjpegRTPDec, err := mjpegFormat.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } - - // Process input rtp packets. - c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) { - // Process MJPEG flow and return PTS and IMG. - pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat) + err = currentMpegtsMuxer.initialize() if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return + panic(err) } - log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) - }) + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "" && audioFormat == "AAC": + // Wait for the next period. + waitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + aacRTPDec, err := aacFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } - err = currentMpegtsMuxer.initialize() - if err != nil { - panic(err) - } + // Setup media. + _, err = c.Setup(desc.BaseURL, aacMedia, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) + } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + // Process input rtp packets. + c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) { + // Process AAC flow and return PTS and AUS. + pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } - } - }() + for _, au := range aus { + log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au)) + } + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } - case videoFormat == "" && audioFormat == "AAC": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - aacRTPDec, err := aacFormat.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, aacMedia, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) - } - - // Process input rtp packets. - c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) { - // Process AAC flow and return PTS and AUS. - pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) + err = currentMpegtsMuxer.initialize() if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + panic(err) } - for _, au := range aus { - log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au)) - } - }) + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "" && audioFormat == "OPUS": + // Wait for the next period. + waitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + opusRTPDec, err := opusFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } - err = currentMpegtsMuxer.initialize() - if err != nil { - panic(err) - } + // Setup media. + _, err = c.Setup(desc.BaseURL, opusMedia, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) + } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + // Process input rtp packets. + c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) { + // Process OPUS flow and return PTS and OP. + pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } - } - }() + log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op)) + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } - case videoFormat == "" && audioFormat == "OPUS": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - opusRTPDec, err := opusFormat.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, opusMedia, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) - } - - // Process input rtp packets. - c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) { - // Process OPUS flow and return PTS and OP. - pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat) + err = currentMpegtsMuxer.initialize() if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + panic(err) } - log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op)) - }) + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "VP8" && audioFormat == "": + // Wait for the next period. + waitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + vp8RTPDec, err := vp8Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - err = currentMpegtsMuxer.initialize() - if err != nil { - panic(err) - } + // Setup VP8 -> RGBA decoder. + vp8Dec := &formats.VPDecoder{} + err = vp8Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer vp8Dec.Close() - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + // Setup media. + _, err = c.Setup(desc.BaseURL, vp8Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } - } - }() + // Process input rtp packets. + c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) { + // Process VP8 flow and return PTS and IMG. + pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } - panic(c.Wait()) + log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + }) - case videoFormat == "VP8" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } - // Create decoder. - vp8RTPDec, err := vp8Format.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Setup VP8 -> RGBA decoder. - vp8Dec := &formats.VPDecoder{} - err = vp8Dec.Initialize() - if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) - } - defer vp8Dec.Close() + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, vp8Media, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } - - // Process input rtp packets. - c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) { - // Process VP8 flow and return PTS and IMG. - pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat) + err = currentMpegtsMuxer.initialize() if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + panic(err) } - log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) - }) + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "VP9" && audioFormat == "": + // Wait for the next period. + waitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + vp9RTPDec, err := vp9Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - err = currentMpegtsMuxer.initialize() - if err != nil { - panic(err) - } + // Setup VP9 -> RGBA decoder. + vp9Dec := &formats.VPDecoder{} + err = vp9Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer vp9Dec.Close() - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + // Setup media. + _, err = c.Setup(desc.BaseURL, vp9Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } - } - }() + // Process input rtp packets. + c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) { + // Process VP9 flow and return PTS and IMG. + pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } - panic(c.Wait()) + log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + }) - case videoFormat == "VP9" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } - // Create decoder. - vp9RTPDec, err := vp9Format.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Setup VP9 -> RGBA decoder. - vp9Dec := &formats.VPDecoder{} - err = vp9Dec.Initialize() - if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) - } - defer vp9Dec.Close() + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, vp9Media, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } - - // Process input rtp packets. - c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) { - // Process VP9 flow and return PTS and IMG. - pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat) + err = currentMpegtsMuxer.initialize() if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return + panic(err) } - log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) - }) + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } - - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() - - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() - - err = currentMpegtsMuxer.initialize() - if err != nil { - panic(err) - } - - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - - } - }() - */ panic(c.Wait()) } diff --git a/writer/internal/storage/file.go b/writer/internal/storage/file.go index 44fad45..ac41257 100644 --- a/writer/internal/storage/file.go +++ b/writer/internal/storage/file.go @@ -1,23 +1,126 @@ package storage -const ( - PacketTypeH264 = 1 - PacketTypeLPCM = 2 +import ( + "bytes" + "encoding/binary" + "fmt" + "io" ) -// InterleavedPacket представляет пакет, который может быть либо H264, либо G711. +// Constants for detection video and audio types. +const ( + PacketTypeAV1 = 1 + PacketTypeH264 = 2 + PacketTypeH265 = 3 + PacketTypeMJPEG = 4 + PacketTypeVP8 = 5 + PacketTypeVP9 = 6 + + PacketTypeG711 = 21 + PacketTypeAAC = 22 + PacketTypeLPCM = 23 + PacketTypeOPUS = 24 +) + +// InterleavedPacket is the structure of each inner packet. type InterleavedPacket struct { - // Type: 1 для H264, 2 для G711. Type byte Pts int64 - - H264AUs [][]byte + // Для H264 – access units как [][]byte. + H264AUs [][]byte + // Для LPCM (из G711) – samples. LPCMSamples []byte } -// Segment содержит строковые поля Start и Duration, а также набор interleaved пакетов. +// Segment is overall structure according to each period. type Segment struct { - Start string + Date string Duration string Packets InterleavedPacket } + +// writeString записывает строку: сначала int32 длина, затем данные. +func writeString(w io.Writer, s string) error { + if err := binary.Write(w, binary.LittleEndian, int32(len(s))); err != nil { + return err + } + _, err := w.Write([]byte(s)) + return err +} + +// WritePacket записывает один interleaved пакет. +func WritePacket(w io.Writer, pkt InterleavedPacket) error { + // Записываем тип пакета (1 байт) + if err := binary.Write(w, binary.LittleEndian, pkt.Type); err != nil { + return err + } + // Записываем pts (int64) + if err := binary.Write(w, binary.LittleEndian, pkt.Pts); err != nil { + return err + } + + if pkt.Type == PacketTypeH264 { + // Для H264 – AU как [][]byte. + numAUs := int32(len(pkt.H264AUs)) + if err := binary.Write(w, binary.LittleEndian, numAUs); err != nil { + return err + } + for _, au := range pkt.H264AUs { + if err := binary.Write(w, binary.LittleEndian, int32(len(au))); err != nil { + return err + } + if _, err := w.Write(au); err != nil { + return err + } + } + } else if pkt.Type == PacketTypeLPCM { + // Для LPCM – просто длина и данные. + if err := binary.Write(w, binary.LittleEndian, int32(len(pkt.LPCMSamples))); err != nil { + return err + } + if _, err := w.Write(pkt.LPCMSamples); err != nil { + return err + } + } else { + return fmt.Errorf("неизвестный тип пакета: %d", pkt.Type) + } + return nil +} + +// WriteHeader записывает заголовок сегмента (Start и Duration). +func WriteHeader(w io.Writer, seg Segment) error { + var buf bytes.Buffer + if err := writeString(&buf, seg.Date); err != nil { + return err + } + if err := writeString(&buf, seg.Duration); err != nil { + return err + } + segData := buf.Bytes() + if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil { + return err + } + _, err := w.Write(segData) + return err +} + +// WriteInterleavedPacket записывает сегмент с пакетами. +// Теперь количество пакетов определяется динамически. +func WriteInterleavedPacket(w io.Writer, seg Segment) error { + var buf bytes.Buffer + + if err := binary.Write(&buf, binary.LittleEndian, int32(1)); err != nil { + return err + } + // Записываем каждый пакет. + if err := WritePacket(&buf, seg.Packets); err != nil { + return err + } + + segData := buf.Bytes() + if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil { + return err + } + _, err := w.Write(segData) + return err +} diff --git a/writer/internal/storage/file_manager.go b/writer/internal/storage/file_manager.go new file mode 100644 index 0000000..ce0b5c1 --- /dev/null +++ b/writer/internal/storage/file_manager.go @@ -0,0 +1,19 @@ +package storage + +import ( + "git.insit.tech/sas/rtsp_proxy/proto/common" + "time" +) + +// CreateFileName creates FileName structure. +func CreateFileName(dir string, resolutions []string, cuttedURI string, period int) *common.FileName { + fn := common.FileName{ + Path: dir + "/data/" + cuttedURI + "/" + resolutions[0], + TimeNow: time.Now().Format("15-04-05_02-01-2006"), + Name: "videoFragment", + Number: -1, + Duration: float64(period), + } + + return &fn +} diff --git a/writer/internal/storage/segmenter.go b/writer/internal/storage/segmenter.go deleted file mode 100644 index e3de726..0000000 --- a/writer/internal/storage/segmenter.go +++ /dev/null @@ -1,162 +0,0 @@ -package storage - -import ( - "bytes" - "encoding/binary" - "fmt" - "io" - "os" - "time" - - "github.com/golang/snappy" -) - -// writeString записывает строку: сначала длину (int32), затем байты строки. -func writeString(w io.Writer, s string) error { - if err := binary.Write(w, binary.LittleEndian, int32(len(s))); err != nil { - return err - } - _, err := w.Write([]byte(s)) - return err -} - -// WritePacket записывает один interleaved пакет в writer. -func WritePacket(w io.Writer, pkt InterleavedPacket) error { - // Записываем тип пакета (1 байт). - if err := binary.Write(w, binary.LittleEndian, pkt.Type); err != nil { - return err - } - // Записываем pts. - if err := binary.Write(w, binary.LittleEndian, pkt.Pts); err != nil { - return err - } - // В зависимости от типа пакета записываем данные access unit. - if pkt.Type == PacketTypeH264 { - // Для H264 AU — [][]byte. - numAUs := int32(len(pkt.H264AUs)) - if err := binary.Write(w, binary.LittleEndian, numAUs); err != nil { - return err - } - for _, au := range pkt.H264AUs { - // Сначала длина AU. - if err := binary.Write(w, binary.LittleEndian, int32(len(au))); err != nil { - return err - } - // Затем сами данные. - if _, err := w.Write(au); err != nil { - return err - } - } - } else if pkt.Type == PacketTypeLPCM { - // Для G711 AU — []byte. - if err := binary.Write(w, binary.LittleEndian, int32(len(pkt.LPCMSamples))); err != nil { - return err - } - if _, err := w.Write(pkt.LPCMSamples); err != nil { - return err - } - } else { - return fmt.Errorf("неизвестный тип пакета: %d", pkt.Type) - } - return nil -} - -// WriteHeader writes header to a file. -func WriteHeader(w io.Writer, seg Segment) error { - var buf bytes.Buffer - - // Записываем строки Start и Duration. - if err := writeString(&buf, seg.Start); err != nil { - return err - } - if err := writeString(&buf, seg.Duration); err != nil { - return err - } - - // Сначала записываем длину сегмента, затем данные сегмента. - segData := buf.Bytes() - if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil { - return err - } - _, err := w.Write(segData) - return err -} - -// WriteInterleavedPacket записывает один сегмент в writer. Сначала собирается содержимое сегмента в буфер, -// затем записывается его длина (int32) и данные. -func WriteInterleavedPacket(w io.Writer, seg Segment) error { - var buf bytes.Buffer - - // Записываем количество пакетов. - if err := binary.Write(&buf, binary.LittleEndian, int32(1)); err != nil { - return err - } - - // Для каждого пакета записываем данные. - if err := WritePacket(&buf, seg.Packets); err != nil { - return err - } - - // Сначала записываем длину сегмента, затем данные сегмента. - segData := buf.Bytes() - if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil { - return err - } - _, err := w.Write(segData) - return err -} - -func main() { - now := time.Now() - // Пример сегмента с interleaved пакетами. - seg := Segment{ - Start: now.Format(time.RFC3339), - Duration: "1m", // длительность сегмента в виде строки - Packets: InterleavedPacket{ - - Type: PacketTypeH264, - Pts: now.UnixNano(), - H264AUs: [][]byte{ - []byte{0x00, 0x01, 0x02}, - []byte{0x03, 0x04}, - }, - }, - } - - // Открываем файл для записи. - f, err := os.Create("stream_interleaved.bin") - if err != nil { - fmt.Println("Ошибка создания файла:", err) - return - } - defer f.Close() - - // Оборачиваем writer через snappy для быстрой компрессии (если требуется). - writer := snappy.NewBufferedWriter(f) - defer writer.Close() - - // Записываем заголовок файла (например, streamID). - streamID := "example_stream" - if err := binary.Write(writer, binary.LittleEndian, int32(len(streamID))); err != nil { - fmt.Println("Ошибка записи заголовка:", err) - return - } - if _, err := writer.Write([]byte(streamID)); err != nil { - fmt.Println("Ошибка записи streamID:", err) - return - } - - // Записываем сегмент с interleaved пакетами. - if err := WriteInterleavedPacket(writer, seg); err != nil { - fmt.Println("Ошибка записи сегмента:", err) - return - } - - // Обязательно делаем Flush, чтобы данные точно записались. - if err := writer.Flush(); err != nil { - fmt.Println("Ошибка при сбросе данных:", err) - return - } - - fmt.Println("Сегмент с interleaved пакетами успешно записан.") -} diff --git a/writer/internal/storage/temp.go b/writer/internal/storage/temp.go new file mode 100644 index 0000000..0ed8ea8 --- /dev/null +++ b/writer/internal/storage/temp.go @@ -0,0 +1,67 @@ +package storage + +import ( + "encoding/binary" + "fmt" + "os" + "time" + + "github.com/golang/snappy" +) + +func main() { + now := time.Now() + // Обратите внимание: Packets – срез, поэтому оборачиваем пакет в литерал среза. + seg := Segment{ + Date: now.Format(time.RFC3339), + Duration: "1m", + Packets: InterleavedPacket{ + + Type: PacketTypeH264, + Pts: now.UnixNano(), + H264AUs: [][]byte{ + []byte{0x00, 0x01, 0x02}, + []byte{0x03, 0x04}, + }, + }, + } + + f, err := os.Create("stream_interleaved.bin") + if err != nil { + fmt.Println("Ошибка создания файла:", err) + return + } + defer f.Close() + + writer := snappy.NewBufferedWriter(f) + defer writer.Close() + + streamID := "example_stream" + if err := binary.Write(writer, binary.LittleEndian, int32(len(streamID))); err != nil { + fmt.Println("Ошибка записи заголовка:", err) + return + } + if _, err := writer.Write([]byte(streamID)); err != nil { + fmt.Println("Ошибка записи streamID:", err) + return + } + + // Здесь можно записывать сначала заголовочный сегмент, затем сегменты с пакетами. + // Например, если заголовочный сегмент содержит только Start и Duration: + if err := WriteHeader(writer, seg); err != nil { + fmt.Println("Ошибка записи заголовочного сегмента:", err) + return + } + // А затем записываем сегмент с пакетами: + if err := WriteInterleavedPacket(writer, seg); err != nil { + fmt.Println("Ошибка записи сегмента пакетов:", err) + return + } + + if err := writer.Flush(); err != nil { + fmt.Println("Ошибка при сбросе данных:", err) + return + } + + fmt.Println("Сегмент с interleaved пакетами успешно записан.") +}