Part program in two logical units: ingest and storage.

This commit is contained in:
Сергей Петров 2025-03-25 15:19:12 +05:00
parent c47438f2b1
commit 489b60b11e
9 changed files with 802 additions and 779 deletions

View File

@ -12,13 +12,13 @@ func main() {
flag.Parse()
// Parse camera links from YAML file into struct Cameras.
c, err := config.ParseCamerasYAML(*directory)
cams, err := config.ParseCamerasYAML(*directory)
if err != nil {
panic(err)
}
// Connect to each camera.
for _, link := range c {
for _, link := range cams {
log.Printf("process camera:\n %s\n", link)
go func() {

View File

@ -3,14 +3,14 @@ module git.insit.tech/psa/rtsp_reader-writer/writer
go 1.24.1
require (
git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e
git.insit.tech/sas/rtsp_proxy v0.0.0-20250325071536-7b97f96d64e6
github.com/Eyevinn/mp4ff v0.47.0
github.com/bluenviron/gortsplib/v4 v4.12.3
github.com/bluenviron/mediacommon v1.14.0
github.com/bluenviron/mediacommon/v2 v2.0.0
github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21
github.com/golang/snappy v1.0.0
github.com/pion/rtp v1.8.12
github.com/pion/rtp v1.8.13
github.com/zaf/g711 v1.4.0
gopkg.in/yaml.v3 v3.0.1
)
@ -18,12 +18,15 @@ require (
require (
github.com/asticode/go-astikit v0.52.0 // indirect
github.com/asticode/go-astits v1.13.0 // indirect
github.com/cyub/ringbuffer v0.0.0-20221202135829-35445cc89929 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grafov/m3u8 v0.12.1 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.15 // indirect
github.com/pion/sdp/v3 v3.0.10 // indirect
github.com/pion/sdp/v3 v3.0.11 // indirect
github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.37.0 // indirect
golang.org/x/sys v0.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
)

View File

@ -1,5 +1,5 @@
git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e h1:JeOZvcZA4JHfoBG5ES9tpSHrhOj1jmjFzSJAyKIjApU=
git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e/go.mod h1:9Yw6g7jcG9fIkWh6FGXSWTyZs4d7EQWaRhdKnnH2TvA=
git.insit.tech/sas/rtsp_proxy v0.0.0-20250325071536-7b97f96d64e6 h1:9WyrbPswZtjr3sqoCYaqus22j5st2HuVjWMiBTAorsM=
git.insit.tech/sas/rtsp_proxy v0.0.0-20250325071536-7b97f96d64e6/go.mod h1:/AHWd1Otr+ikOLWzpXtoozzifEx9ZKou+R6DgwaEzr0=
github.com/Eyevinn/mp4ff v0.47.0 h1:XSSHYt5+I0fyOnHWoNwM72DtivlmHFR0V9azgIi+ZVU=
github.com/Eyevinn/mp4ff v0.47.0/go.mod h1:hJNUUqOBryLAzUW9wpCJyw2HaI+TCd2rUPhafoS5lgg=
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
@ -13,11 +13,9 @@ github.com/bluenviron/mediacommon v1.14.0 h1:lWCwOBKNKgqmspRpwpvvg3CidYm+XOc2+z/
github.com/bluenviron/mediacommon v1.14.0/go.mod h1:z5LP9Tm1ZNfQV5Co54PyOzaIhGMusDfRKmh42nQSnyo=
github.com/bluenviron/mediacommon/v2 v2.0.0 h1:JinZ9v2x6QeAOzA0cDA6aFe8vQuCrU8OyWEhG2iNzwY=
github.com/bluenviron/mediacommon/v2 v2.0.0/go.mod h1:iHEz1SFIet6zBwAQoh1a92vTQ3dV3LpVFbom6/SLz3k=
github.com/cyub/ringbuffer v0.0.0-20221202135829-35445cc89929 h1:ZPgY61C6ZvVsr4YIBOOcXZ2lw/FuJ4GcoTRPDkHQ3Zg=
github.com/cyub/ringbuffer v0.0.0-20221202135829-35445cc89929/go.mod h1:049xI0W4vQexZfPJl70HOMs0eJ7FAZxTtm4iPp1cEDY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 h1:yfrARW/aVlqKORCdKrYdU0PZUKPqQvYEUQBKfVlNa9Q=
github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21/go.mod h1:HZqGD/LXHB1VCGUGNzuyxSsD12f3KjbJbvImAmoK/mM=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
@ -32,20 +30,16 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
github.com/pion/rtp v1.8.12 h1:nsKs8Wi0jQyBFHU3qmn/OvtZrhktVfJY0vRxwACsL5U=
github.com/pion/rtp v1.8.12/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA=
github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg=
github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI=
github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/youpy/go-riff v0.1.0 h1:vZO/37nI4tIET8tQI0Qn0Y79qQh99aEpponTPiPut7k=
@ -54,13 +48,22 @@ github.com/youpy/go-wav v0.3.2 h1:NLM8L/7yZ0Bntadw/0h95OyUsen+DQIVf9gay+SUsMU=
github.com/youpy/go-wav v0.3.2/go.mod h1:0FCieAXAeSdcxFfwLpRuEo0PFmAoc+8NU34h7TUvk50=
github.com/zaf/g711 v1.4.0 h1:XZYkjjiAg9QTBnHqEg37m2I9q3IIDv5JRYXs2N8ma7c=
github.com/zaf/g711 v1.4.0/go.mod h1:eCDXt3dSp/kYYAoooba7ukD/Q75jvAaS4WOMr0l1Roo=
github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2 h1:0iAY2pL6yYhNYpdc1DbFq0p7ocyu5MlgKmkealhz3nk=
github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2/go.mod h1:c8Gxxfmh0jmZ6G+ISlpa315WBVkzd8mEhu6gN9mn5Qg=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,33 +1,19 @@
package storage
package rtsp
import (
"git.insit.tech/sas/rtsp_proxy/protos"
"log"
"strings"
"time"
)
// CutURI returns the last part of the URI after "/".
func CutURI(URI string) (CutterURI string) {
// cutURI returns the last part of the URI after "/".
func cutURI(URI string) (CutURI string) {
splitted := strings.Split(URI, "/")
return splitted[len(splitted)-1]
}
// CreateFileName creates FileName structure.
func CreateFileName(dir string, resolutions []string, cuttedURI string, period int) *protos.FileName {
fn := protos.FileName{
Path: dir + "/data/" + cuttedURI + "/" + resolutions[0],
TimeNow: time.Now().Format("15-04-05_02-01-2006"),
Name: "videoFragment",
Number: -1,
Duration: float64(period),
}
return &fn
}
// WaitPeriod waits for the next period.
func WaitPeriod(period int) {
// waitPeriod waits for the next period.
func waitPeriod(period int) {
periodTD := time.Duration(period) * time.Second
now := time.Now()
@ -37,7 +23,8 @@ func WaitPeriod(period int) {
time.Sleep(waitDuration)
}
func FindResolution(Body []byte) string {
// findResolution finds resolution in SDP.
func findResolution(Body []byte) string {
split := strings.Split(string(Body), "\r\n")
for _, line := range split {
if strings.Contains(line, "a=x-dimensions:") {

View File

@ -12,14 +12,12 @@ import (
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/storage"
"git.insit.tech/sas/rtsp_proxy/protos/gens"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/g711"
"github.com/pion/rtp"
_ "github.com/zaf/g711"
)
// StartWriter starts the program.
@ -61,19 +59,25 @@ func RTSP(dir string, period int, link string) error {
}
// Create file name structure and directory for files.
resolution := storage.FindResolution(res.Body)
resolution := findResolution(res.Body)
resolutions := []string{resolution}
cutURI := cutURI(link)
cuttedURI := storage.CutURI(link)
fn := storage.CreateFileName(dir, resolutions, cuttedURI, period)
fn := storage.CreateFileName(dir, resolutions, cutURI, period)
err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755)
if err != nil {
return fmt.Errorf("mkdirall error: %w", err)
}
////////////////////////////////////////////////////////////////////////////////////////////////////
/*
// Create M3U8 playlist.
go gens.MediaPlaylistGenerator(dir+"/data/"+cuttedURI, "", fn.Duration, resolutions)
go gen.MediaPlaylistGenerator(dir+"/data/"+cutURI, "", fn.Duration, resolutions)
*/
////////////////////////////////////////////////////////////////////////////////////////////////////
// Find formats.
var audioFormat string
@ -110,7 +114,7 @@ func RTSP(dir string, period int, link string) error {
} else {
log.Println(err)
}
/*
h265Format, h265Media, err := formats.FindH265Format(desc)
if h265Format != nil {
log.Println("[h265]: format found")
@ -158,12 +162,12 @@ func RTSP(dir string, period int, link string) error {
} else {
log.Println(err)
}
*/
// Start program according to gotten formats.
switch {
case videoFormat == "AV1" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
@ -231,7 +235,7 @@ func RTSP(dir string, period int, link string) error {
case videoFormat == "" && audioFormat == "G711":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
@ -297,31 +301,31 @@ func RTSP(dir string, period int, link string) error {
case videoFormat == "H264" && audioFormat == "AAC":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v-%v]: start recording", videoFormat, audioFormat)
// Create decoders.
h264RTPDec, err := h264Format.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
//h264RTPDec, err := h264Format.CreateDecoder()
//if err != nil {
// log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
//}
//
//aacRTPDec, err := aacFormat.CreateDecoder()
//if err != nil {
// log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
//}
aacRTPDec, err := aacFormat.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
//// Setup MPEG-TS muxer.
//currentMpegtsMuxer := formats.MpegtsMuxer{
// FileName: fn.SetNumNTime(),
// H264Format: h264Format,
// Mpeg4AudioFormat: aacFormat,
//}
// Setup MPEG-TS muxer.
currentMpegtsMuxer := formats.MpegtsMuxer{
FileName: fn.SetNumNTime(),
H264Format: h264Format,
Mpeg4AudioFormat: aacFormat,
}
err = currentMpegtsMuxer.Initialize()
if err != nil {
return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
}
//err = currentMpegtsMuxer.Initialize()
//if err != nil {
// return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
//}
// Setup all medias.
err = c.SetupAll(desc.BaseURL, desc.Medias)
@ -333,32 +337,32 @@ func RTSP(dir string, period int, link string) error {
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
switch forma.(type) {
case *format.H264:
// Process H264 flow and return PTS and AU.
pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
// Encode the access unit into MPEG-TS.
err = currentMpegtsMuxer.WriteH264(pts, au)
if err != nil {
return
}
//// Process H264 flow and return PTS and AU.
//pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat)
//if err != nil {
// log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
// return
//}
//
//// Encode the access unit into MPEG-TS.
//err = currentMpegtsMuxer.WriteH264(pts, au)
//if err != nil {
// return
//}
case *format.MPEG4Audio:
// Process AAC flow and return PTS and AUS.
pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
// Encode access units into MPEG-TS.
err = currentMpegtsMuxer.WriteAAC(aus, pts)
if err != nil {
return
}
//// Process AAC flow and return PTS and AUS.
//pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat)
//if err != nil {
// log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
// return
//}
//
//// Encode access units into MPEG-TS.
//err = currentMpegtsMuxer.WriteAAC(aus, pts)
//if err != nil {
// return
//}
}
})
@ -376,26 +380,26 @@ func RTSP(dir string, period int, link string) error {
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
currentMpegtsMuxer.Close()
currentMpegtsMuxer.FileName = fn.SetNumNTime()
err = currentMpegtsMuxer.Initialize()
if err != nil {
log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err)
return
}
log.Printf("[%v-%v]: new file for recording created: %v",
videoFormat, audioFormat, currentMpegtsMuxer.FileName)
//// Logic for rotation files.
//currentMpegtsMuxer.Close()
//currentMpegtsMuxer.FileName = fn.SetNumNTime()
//
//err = currentMpegtsMuxer.Initialize()
//if err != nil {
// log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err)
// return
//}
//
//log.Printf("[%v-%v]: new file for recording created: %v",
// videoFormat, audioFormat, currentMpegtsMuxer.FileName)
}
}()
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "G711":
case videoFormat == "H264" && audioFormat == "G711" || videoFormat == "H264" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v-%v]: start recording", videoFormat, audioFormat)
// Create decoders.
@ -428,29 +432,28 @@ func RTSP(dir string, period int, link string) error {
if err != nil {
return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err)
}
///////////////////////////////////////////////////////////
file, err := os.Create(
dir + "/data/" + cuttedURI + "/" + time.Now().Format("15-04-05_02-01-2006") + ".insit")
file, err := os.Create(fn.SetNumNTime("insit"))
if err != nil {
fmt.Println("Ошибка создания файла:", err)
fmt.Println("creating file error:", err)
}
defer file.Close()
seg := storage.Segment{
Start: time.Now().Format("15-04-05_02-01-2006"),
Date: time.Now().Format("15-04-05_02-01-2006"),
Duration: strconv.Itoa(period),
Packets: storage.InterleavedPacket{},
}
// Записываем заголовок файла (например, streamID).
streamID := cuttedURI
if err := binary.Write(file, binary.LittleEndian, int32(len(streamID))); err != nil {
fmt.Println("Ошибка записи заголовка:", err)
// Write StreamID.
if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil {
fmt.Println("write StreamID length error:", err)
}
if _, err := file.Write([]byte(streamID)); err != nil {
fmt.Println("Ошибка записи streamID:", err)
if _, err := file.Write([]byte(cutURI)); err != nil {
fmt.Println("write StreamID error:", err)
}
// Write header of the file.
err = storage.WriteHeader(file, seg)
if err != nil {
return fmt.Errorf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err)
@ -468,17 +471,18 @@ func RTSP(dir string, period int, link string) error {
}
if au != nil {
// Add appropriate lines to the interleaved packet.
seg.Packets.Type = storage.PacketTypeH264
seg.Packets.Pts = pts
seg.Packets.H264AUs = au
// Записываем сегмент с interleaved пакетами.
// Write segment with interleaved packets.
if err := storage.WriteInterleavedPacket(file, seg); err != nil {
fmt.Println("Ошибка записи сегмента:", err)
fmt.Println("write segment error:", err)
return
}
}
//// Encode the access unit into MPEG-TS.
//err = currentMpegtsMuxer.WriteH264(pts, au)
//if err != nil {
@ -494,16 +498,17 @@ func RTSP(dir string, period int, link string) error {
}
if au != nil {
// Convert G711 to LPCM.
lpcmSamples := formats.ConvertG711ToLPCM(au, f.MULaw)
seg.Packets.Type = storage.PacketTypeH264
// Add appropriate lines to the interleaved packet.
seg.Packets.Type = storage.PacketTypeLPCM
seg.Packets.Pts = pts
seg.Packets.LPCMSamples = lpcmSamples
// Записываем сегмент с interleaved пакетами.
// Write segment with interleaved packets.
if err := storage.WriteInterleavedPacket(file, seg); err != nil {
fmt.Println("Ошибка записи сегмента:", err)
fmt.Println("write segment error:", err)
return
}
}
@ -549,42 +554,40 @@ func RTSP(dir string, period int, link string) error {
file.Close()
file, err = os.Create(
dir + "/data/" + cuttedURI + "/" + time.Now().Format("15-04-05_02-01-2006") + ".insit")
file, err = os.Create(fn.SetNumNTime("insit"))
if err != nil {
fmt.Println("Ошибка создания файла:", err)
fmt.Println("creating file error:", err)
}
seg = storage.Segment{
Start: time.Now().Format("15-04-05_02-01-2006"),
Date: time.Now().Format("15-04-05_02-01-2006"),
Duration: strconv.Itoa(period),
Packets: storage.InterleavedPacket{},
}
// Записываем заголовок файла (например, streamID).
streamID = cuttedURI
if err := binary.Write(file, binary.LittleEndian, int32(len(streamID))); err != nil {
fmt.Println("Ошибка записи заголовка:", err)
// Write StreamID.
if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil {
fmt.Println("write StreamID length error:", err)
}
if _, err := file.Write([]byte(streamID)); err != nil {
fmt.Println("Ошибка записи streamID:", err)
if _, err := file.Write([]byte(cutURI)); err != nil {
fmt.Println("write StreamID error:", err)
}
// Write header of the file.
err = storage.WriteHeader(file, seg)
if err != nil {
log.Printf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err)
}
log.Printf("[%v-%v]: new file for recording created: %v",
videoFormat, audioFormat, seg.Start+".insit")
videoFormat, audioFormat, seg.Date+".insit")
}
}()
/*
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "":
case videoFormat == "H264-" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
@ -654,7 +657,7 @@ func RTSP(dir string, period int, link string) error {
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
@ -662,7 +665,7 @@ func RTSP(dir string, period int, link string) error {
case videoFormat == "H265" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
@ -735,7 +738,7 @@ func RTSP(dir string, period int, link string) error {
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
@ -743,7 +746,7 @@ func RTSP(dir string, period int, link string) error {
case videoFormat == "" && audioFormat == "LPCM":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
@ -794,7 +797,7 @@ func RTSP(dir string, period int, link string) error {
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
@ -802,7 +805,7 @@ func RTSP(dir string, period int, link string) error {
case videoFormat == "MJPEG" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
@ -853,7 +856,7 @@ func RTSP(dir string, period int, link string) error {
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
@ -861,7 +864,7 @@ func RTSP(dir string, period int, link string) error {
case videoFormat == "" && audioFormat == "AAC":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
@ -914,7 +917,7 @@ func RTSP(dir string, period int, link string) error {
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
@ -922,7 +925,7 @@ func RTSP(dir string, period int, link string) error {
case videoFormat == "" && audioFormat == "OPUS":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
@ -973,7 +976,7 @@ func RTSP(dir string, period int, link string) error {
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
@ -981,7 +984,7 @@ func RTSP(dir string, period int, link string) error {
case videoFormat == "VP8" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
@ -1040,7 +1043,7 @@ func RTSP(dir string, period int, link string) error {
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
@ -1048,7 +1051,7 @@ func RTSP(dir string, period int, link string) error {
case videoFormat == "VP9" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
@ -1107,10 +1110,10 @@ func RTSP(dir string, period int, link string) error {
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
*/
panic(c.Wait())
}

View File

@ -1,23 +1,126 @@
package storage
const (
PacketTypeH264 = 1
PacketTypeLPCM = 2
import (
"bytes"
"encoding/binary"
"fmt"
"io"
)
// InterleavedPacket представляет пакет, который может быть либо H264, либо G711.
// Constants for detection video and audio types.
const (
PacketTypeAV1 = 1
PacketTypeH264 = 2
PacketTypeH265 = 3
PacketTypeMJPEG = 4
PacketTypeVP8 = 5
PacketTypeVP9 = 6
PacketTypeG711 = 21
PacketTypeAAC = 22
PacketTypeLPCM = 23
PacketTypeOPUS = 24
)
// InterleavedPacket is the structure of each inner packet.
type InterleavedPacket struct {
// Type: 1 для H264, 2 для G711.
Type byte
Pts int64
// Для H264 access units как [][]byte.
H264AUs [][]byte
// Для LPCM (из G711) samples.
LPCMSamples []byte
}
// Segment содержит строковые поля Start и Duration, а также набор interleaved пакетов.
// Segment is overall structure according to each period.
type Segment struct {
Start string
Date string
Duration string
Packets InterleavedPacket
}
// writeString записывает строку: сначала int32 длина, затем данные.
func writeString(w io.Writer, s string) error {
if err := binary.Write(w, binary.LittleEndian, int32(len(s))); err != nil {
return err
}
_, err := w.Write([]byte(s))
return err
}
// WritePacket записывает один interleaved пакет.
func WritePacket(w io.Writer, pkt InterleavedPacket) error {
// Записываем тип пакета (1 байт)
if err := binary.Write(w, binary.LittleEndian, pkt.Type); err != nil {
return err
}
// Записываем pts (int64)
if err := binary.Write(w, binary.LittleEndian, pkt.Pts); err != nil {
return err
}
if pkt.Type == PacketTypeH264 {
// Для H264 AU как [][]byte.
numAUs := int32(len(pkt.H264AUs))
if err := binary.Write(w, binary.LittleEndian, numAUs); err != nil {
return err
}
for _, au := range pkt.H264AUs {
if err := binary.Write(w, binary.LittleEndian, int32(len(au))); err != nil {
return err
}
if _, err := w.Write(au); err != nil {
return err
}
}
} else if pkt.Type == PacketTypeLPCM {
// Для LPCM просто длина и данные.
if err := binary.Write(w, binary.LittleEndian, int32(len(pkt.LPCMSamples))); err != nil {
return err
}
if _, err := w.Write(pkt.LPCMSamples); err != nil {
return err
}
} else {
return fmt.Errorf("неизвестный тип пакета: %d", pkt.Type)
}
return nil
}
// WriteHeader записывает заголовок сегмента (Start и Duration).
func WriteHeader(w io.Writer, seg Segment) error {
var buf bytes.Buffer
if err := writeString(&buf, seg.Date); err != nil {
return err
}
if err := writeString(&buf, seg.Duration); err != nil {
return err
}
segData := buf.Bytes()
if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil {
return err
}
_, err := w.Write(segData)
return err
}
// WriteInterleavedPacket записывает сегмент с пакетами.
// Теперь количество пакетов определяется динамически.
func WriteInterleavedPacket(w io.Writer, seg Segment) error {
var buf bytes.Buffer
if err := binary.Write(&buf, binary.LittleEndian, int32(1)); err != nil {
return err
}
// Записываем каждый пакет.
if err := WritePacket(&buf, seg.Packets); err != nil {
return err
}
segData := buf.Bytes()
if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil {
return err
}
_, err := w.Write(segData)
return err
}

View File

@ -0,0 +1,19 @@
package storage
import (
"git.insit.tech/sas/rtsp_proxy/proto/common"
"time"
)
// CreateFileName creates FileName structure.
func CreateFileName(dir string, resolutions []string, cuttedURI string, period int) *common.FileName {
fn := common.FileName{
Path: dir + "/data/" + cuttedURI + "/" + resolutions[0],
TimeNow: time.Now().Format("15-04-05_02-01-2006"),
Name: "videoFragment",
Number: -1,
Duration: float64(period),
}
return &fn
}

View File

@ -1,162 +0,0 @@
package storage
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"os"
"time"
"github.com/golang/snappy"
)
// writeString записывает строку: сначала длину (int32), затем байты строки.
func writeString(w io.Writer, s string) error {
if err := binary.Write(w, binary.LittleEndian, int32(len(s))); err != nil {
return err
}
_, err := w.Write([]byte(s))
return err
}
// WritePacket записывает один interleaved пакет в writer.
func WritePacket(w io.Writer, pkt InterleavedPacket) error {
// Записываем тип пакета (1 байт).
if err := binary.Write(w, binary.LittleEndian, pkt.Type); err != nil {
return err
}
// Записываем pts.
if err := binary.Write(w, binary.LittleEndian, pkt.Pts); err != nil {
return err
}
// В зависимости от типа пакета записываем данные access unit.
if pkt.Type == PacketTypeH264 {
// Для H264 AU — [][]byte.
numAUs := int32(len(pkt.H264AUs))
if err := binary.Write(w, binary.LittleEndian, numAUs); err != nil {
return err
}
for _, au := range pkt.H264AUs {
// Сначала длина AU.
if err := binary.Write(w, binary.LittleEndian, int32(len(au))); err != nil {
return err
}
// Затем сами данные.
if _, err := w.Write(au); err != nil {
return err
}
}
} else if pkt.Type == PacketTypeLPCM {
// Для G711 AU — []byte.
if err := binary.Write(w, binary.LittleEndian, int32(len(pkt.LPCMSamples))); err != nil {
return err
}
if _, err := w.Write(pkt.LPCMSamples); err != nil {
return err
}
} else {
return fmt.Errorf("неизвестный тип пакета: %d", pkt.Type)
}
return nil
}
// WriteHeader writes header to a file.
func WriteHeader(w io.Writer, seg Segment) error {
var buf bytes.Buffer
// Записываем строки Start и Duration.
if err := writeString(&buf, seg.Start); err != nil {
return err
}
if err := writeString(&buf, seg.Duration); err != nil {
return err
}
// Сначала записываем длину сегмента, затем данные сегмента.
segData := buf.Bytes()
if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil {
return err
}
_, err := w.Write(segData)
return err
}
// WriteInterleavedPacket записывает один сегмент в writer. Сначала собирается содержимое сегмента в буфер,
// затем записывается его длина (int32) и данные.
func WriteInterleavedPacket(w io.Writer, seg Segment) error {
var buf bytes.Buffer
// Записываем количество пакетов.
if err := binary.Write(&buf, binary.LittleEndian, int32(1)); err != nil {
return err
}
// Для каждого пакета записываем данные.
if err := WritePacket(&buf, seg.Packets); err != nil {
return err
}
// Сначала записываем длину сегмента, затем данные сегмента.
segData := buf.Bytes()
if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil {
return err
}
_, err := w.Write(segData)
return err
}
func main() {
now := time.Now()
// Пример сегмента с interleaved пакетами.
seg := Segment{
Start: now.Format(time.RFC3339),
Duration: "1m", // длительность сегмента в виде строки
Packets: InterleavedPacket{
Type: PacketTypeH264,
Pts: now.UnixNano(),
H264AUs: [][]byte{
[]byte{0x00, 0x01, 0x02},
[]byte{0x03, 0x04},
},
},
}
// Открываем файл для записи.
f, err := os.Create("stream_interleaved.bin")
if err != nil {
fmt.Println("Ошибка создания файла:", err)
return
}
defer f.Close()
// Оборачиваем writer через snappy для быстрой компрессии (если требуется).
writer := snappy.NewBufferedWriter(f)
defer writer.Close()
// Записываем заголовок файла (например, streamID).
streamID := "example_stream"
if err := binary.Write(writer, binary.LittleEndian, int32(len(streamID))); err != nil {
fmt.Println("Ошибка записи заголовка:", err)
return
}
if _, err := writer.Write([]byte(streamID)); err != nil {
fmt.Println("Ошибка записи streamID:", err)
return
}
// Записываем сегмент с interleaved пакетами.
if err := WriteInterleavedPacket(writer, seg); err != nil {
fmt.Println("Ошибка записи сегмента:", err)
return
}
// Обязательно делаем Flush, чтобы данные точно записались.
if err := writer.Flush(); err != nil {
fmt.Println("Ошибка при сбросе данных:", err)
return
}
fmt.Println("Сегмент с interleaved пакетами успешно записан.")
}

View File

@ -0,0 +1,67 @@
package storage
import (
"encoding/binary"
"fmt"
"os"
"time"
"github.com/golang/snappy"
)
func main() {
now := time.Now()
// Обратите внимание: Packets срез, поэтому оборачиваем пакет в литерал среза.
seg := Segment{
Date: now.Format(time.RFC3339),
Duration: "1m",
Packets: InterleavedPacket{
Type: PacketTypeH264,
Pts: now.UnixNano(),
H264AUs: [][]byte{
[]byte{0x00, 0x01, 0x02},
[]byte{0x03, 0x04},
},
},
}
f, err := os.Create("stream_interleaved.bin")
if err != nil {
fmt.Println("Ошибка создания файла:", err)
return
}
defer f.Close()
writer := snappy.NewBufferedWriter(f)
defer writer.Close()
streamID := "example_stream"
if err := binary.Write(writer, binary.LittleEndian, int32(len(streamID))); err != nil {
fmt.Println("Ошибка записи заголовка:", err)
return
}
if _, err := writer.Write([]byte(streamID)); err != nil {
fmt.Println("Ошибка записи streamID:", err)
return
}
// Здесь можно записывать сначала заголовочный сегмент, затем сегменты с пакетами.
// Например, если заголовочный сегмент содержит только Start и Duration:
if err := WriteHeader(writer, seg); err != nil {
fmt.Println("Ошибка записи заголовочного сегмента:", err)
return
}
// А затем записываем сегмент с пакетами:
if err := WriteInterleavedPacket(writer, seg); err != nil {
fmt.Println("Ошибка записи сегмента пакетов:", err)
return
}
if err := writer.Flush(); err != nil {
fmt.Println("Ошибка при сбросе данных:", err)
return
}
fmt.Println("Сегмент с interleaved пакетами успешно записан.")
}