From b37efdf40303cf850c81da922efb78c0e370999b Mon Sep 17 00:00:00 2001 From: Sergey Petrov Date: Fri, 21 Mar 2025 18:03:39 +0500 Subject: [PATCH] divide the program to 3 parts: ingest, storage, reader; ingest - gets packets from rtsp and send it to storage; storage - converts got packets to custom files; reader - reads custom files and creates containers (e.g. .ts). --- reader/cmd/main.go | 274 ++++- reader/go.mod | 22 +- reader/go.sum | 51 +- reader/internal/processor/file2.go | 1 + reader/internal/processor/g711_to_aac.go | 42 + reader/internal/processor/h264-aac_muxer.go | 133 ++ writer/go.mod | 2 +- writer/go.sum | 4 +- writer/internal/config/source.yaml | 62 +- .../internal/ingest/formats/h264-aac_muxer.go | 2 +- writer/internal/ingest/rtsp/rtsp.go | 1091 +++++++++-------- writer/internal/storage/editor.go | 4 +- writer/internal/storage/file.go | 32 +- writer/internal/storage/segmenter.go | 161 +++ 14 files changed, 1315 insertions(+), 566 deletions(-) create mode 100644 reader/internal/processor/file2.go create mode 100644 reader/internal/processor/g711_to_aac.go create mode 100644 reader/internal/processor/h264-aac_muxer.go diff --git a/reader/cmd/main.go b/reader/cmd/main.go index 6bc02f3..d84b5ac 100644 --- a/reader/cmd/main.go +++ b/reader/cmd/main.go @@ -1,20 +1,276 @@ package main import ( + "bytes" + "encoding/binary" "fmt" + "io" "log" - "net/http" - "reader/pkg/handlers" + "os" + "reader/internal/processor" + + "github.com/bluenviron/gortsplib/v4/pkg/format" ) -func main() { - port := 8080 +//port := 8080 +// +//http.HandleFunc("GET /download", handlers.Download) // example request: {"date": "07-03-2025", "start_time": "16-43", "end_time": "16-44"} +//http.HandleFunc("GET /hls/", handlers.HLS) +// +//log.Println("Starting server on:") +//log.Printf("Serving on HTTP port: %d\n", port) +// +//log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) - http.HandleFunc("GET /download", handlers.Download) // example request: {"date": "07-03-2025", "start_time": "16-43", "end_time": "16-44"} - http.HandleFunc("GET /hls/", handlers.HLS) +// Интерпретируем типы пакетов: +const ( + PacketTypeH264 = 1 + PacketTypeLPCM = 2 +) - log.Println("Starting server on:") - log.Printf("Serving on HTTP port: %d\n", port) +var ( + h264 string + g711 string +) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) +// InterleavedPacket описывает пакет, который может быть либо H264, либо G711. +type InterleavedPacket struct { + Type byte + Pts int64 + H264AUs [][]byte // для H264 + LPCMSamples []byte // для G711 +} + +// Segment содержит строки Start и Duration, а также набор пакетов. +type Segment struct { + Start string + Duration string + Packets []InterleavedPacket +} + +// readString читает строку: сначала длину (int32), затем данные строки. +func readString(r io.Reader) (string, error) { + var length int32 + if err := binary.Read(r, binary.LittleEndian, &length); err != nil { + return "", err + } + buf := make([]byte, length) + if _, err := io.ReadFull(r, buf); err != nil { + return "", err + } + return string(buf), nil +} + +// readHeaderSegment читает сегмент, записанный функцией WriteHeader (с Start и Duration). +func readHeaderSegment(r io.Reader) (Segment, error) { + var seg Segment + start, err := readString(r) + if err != nil { + return seg, err + } + seg.Start = start + + duration, err := readString(r) + if err != nil { + return seg, err + } + seg.Duration = duration + + return seg, nil +} + +// readPacket читает один interleaved пакет. +func readPacket(r io.Reader) (InterleavedPacket, error) { + var pkt InterleavedPacket + + // Читаем тип пакета (1 байт). + typeByte := make([]byte, 1) + if _, err := io.ReadFull(r, typeByte); err != nil { + return pkt, err + } + pkt.Type = typeByte[0] + + // Читаем pts (int64). + if err := binary.Read(r, binary.LittleEndian, &pkt.Pts); err != nil { + return pkt, err + } + + // В зависимости от типа, читаем данные. + if pkt.Type == PacketTypeH264 { + var numAUs int32 + if err := binary.Read(r, binary.LittleEndian, &numAUs); err != nil { + return pkt, err + } + var auList [][]byte + for i := 0; i < int(numAUs); i++ { + var auLen int32 + if err := binary.Read(r, binary.LittleEndian, &auLen); err != nil { + return pkt, err + } + auData := make([]byte, auLen) + if _, err := io.ReadFull(r, auData); err != nil { + return pkt, err + } + auList = append(auList, auData) + } + pkt.H264AUs = auList + } else if pkt.Type == PacketTypeLPCM { + var auLen int32 + if err := binary.Read(r, binary.LittleEndian, &auLen); err != nil { + return pkt, err + } + auData := make([]byte, auLen) + if _, err := io.ReadFull(r, auData); err != nil { + return pkt, err + } + pkt.LPCMSamples = auData + } else { + return pkt, fmt.Errorf("неизвестный тип пакета: %d", pkt.Type) + } + + return pkt, nil +} + +// readPacketSegment читает сегмент, записанный функцией WriteInterleavedPacket: +// сначала число пакетов (int32), затем каждый пакет. +func readPacketSegment(r io.Reader) ([]InterleavedPacket, error) { + var numPackets int32 + if err := binary.Read(r, binary.LittleEndian, &numPackets); err != nil { + return nil, err + } + + var packets []InterleavedPacket + for i := 0; i < int(numPackets); i++ { + pkt, err := readPacket(r) + if err != nil { + return nil, err + } + packets = append(packets, pkt) + } + return packets, nil +} + +func main() { + filename := "/home/psa/GoRepository/data/camera44-center-skver_temnika/12-10-00_21-03-2025.insit" + + segment := Segment{} + + // Открываем файл для чтения. + f, err := os.Open(filename) + if err != nil { + fmt.Println("Ошибка открытия файла:", err) + return + } + defer f.Close() + + // Читаем streamID (первые 4 байта — длина, затем сами байты). + var streamIDLen int32 + if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil { + fmt.Println("Ошибка чтения streamID длины:", err) + return + } + streamIDBytes := make([]byte, streamIDLen) + if _, err := io.ReadFull(f, streamIDBytes); err != nil { + fmt.Println("Ошибка чтения streamID:", err) + return + } + streamID := string(streamIDBytes) + fmt.Println("Stream ID:", streamID) + + // Теперь в файле идут сегменты. Первый сегмент — заголовочный, далее — сегменты с пакетами. + // Читаем первый сегмент как заголовочный. + var segLen int32 + if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil { + fmt.Println("Ошибка чтения длины заголовочного сегмента:", err) + return + } + segData := make([]byte, segLen) + if _, err := io.ReadFull(f, segData); err != nil { + fmt.Println("Ошибка чтения заголовочного сегмента:", err) + return + } + headerReader := bytes.NewReader(segData) + headerSeg, err := readHeaderSegment(headerReader) + if err != nil { + fmt.Println("Ошибка чтения заголовочного сегмента:", err) + return + } + fmt.Println("Заголовочный сегмент:") + fmt.Println("\tStart:", headerSeg.Start) + fmt.Println("\tDuration:", headerSeg.Duration) + + segment = headerSeg + + // Setup MPEG-TS muxer. + var h264Format format.H264 + var aacFormat *format.MPEG4Audio + + h264Format.PayloadTyp = 96 + h264Format.PacketizationMode = 1 + + currentMpegtsMuxer := processor.MpegtsMuxer{ + FileName: segment.Start + "_videoFragment" + "_0" + ".ts", + H264Format: &h264Format, + Mpeg4AudioFormat: aacFormat, + } + + err = currentMpegtsMuxer.Initialize() + if err != nil { + fmt.Printf("[%v-%v]: init muxer error: %w\n", h264, g711, err) + } + + // Читаем последующие сегменты, содержащие пакеты. + segmentIndex := 1 + for { + var segLen int32 + err := binary.Read(f, binary.LittleEndian, &segLen) + if err != nil { + if err == io.EOF { + break // достигнут конец файла + } + fmt.Println("Ошибка чтения длины сегмента:", err) + return + } + segData = make([]byte, segLen) + if _, err := io.ReadFull(f, segData); err != nil { + fmt.Println("Ошибка чтения данных сегмента:", err) + return + } + packetReader := bytes.NewReader(segData) + packets, err := readPacketSegment(packetReader) + if err != nil { + fmt.Println("Ошибка чтения сегмента пакетов:", err) + return + } + + segment.Packets = append(segment.Packets, packets...) + + fmt.Printf("Сегмент пакетов #%d:\n", segmentIndex) + for _, pkt := range packets { + switch pkt.Type { + case PacketTypeH264: + h264 = "h264" + err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs) + if err != nil { + log.Printf("write h264 packet error: %v\n", err) + } + case PacketTypeLPCM: + g711 = "g711" + // Convert G711 to AAC. + au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples) + if err != nil { + log.Printf("converting to AAC frame error: %v\n", err) + } + + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts) + if err != nil { + return + } + } + } + segmentIndex++ + } + + currentMpegtsMuxer.Close() } diff --git a/reader/go.mod b/reader/go.mod index a799cc8..f80cd99 100644 --- a/reader/go.mod +++ b/reader/go.mod @@ -1,5 +1,23 @@ module reader -go 1.23.6 +go 1.24.1 -require git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e // indirect +require ( + git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250321123217-883053a85cbc + github.com/bluenviron/gortsplib/v4 v4.12.3 + github.com/bluenviron/mediacommon v1.14.0 +) + +require ( + github.com/asticode/go-astikit v0.52.0 // indirect + github.com/asticode/go-astits v1.13.0 // indirect + github.com/bluenviron/mediacommon/v2 v2.0.0 // indirect + github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.15 // indirect + github.com/pion/rtp v1.8.13 // indirect + github.com/pion/sdp/v3 v3.0.11 // indirect + golang.org/x/net v0.37.0 // indirect + golang.org/x/sys v0.31.0 // indirect +) diff --git a/reader/go.sum b/reader/go.sum index b4d241c..1057023 100644 --- a/reader/go.sum +++ b/reader/go.sum @@ -1,2 +1,49 @@ -git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e h1:JeOZvcZA4JHfoBG5ES9tpSHrhOj1jmjFzSJAyKIjApU= -git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e/go.mod h1:9Yw6g7jcG9fIkWh6FGXSWTyZs4d7EQWaRhdKnnH2TvA= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250321123217-883053a85cbc h1:dhJVA/B+q3CWBhI1ZBPhClAbY875j7+Ru6XIWimxjW8= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250321123217-883053a85cbc/go.mod h1:hY3OlYFPtsZ/fX/BVUPkjIHNPbfIpzgujKn9TKftI+E= +github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= +github.com/asticode/go-astikit v0.52.0 h1:kTl2XjgiVQhUl1H7kim7NhmTtCMwVBbPrXKqhQhbk8Y= +github.com/asticode/go-astikit v0.52.0/go.mod h1:fV43j20UZYfXzP9oBn33udkvCvDvCDhzjVqoLFuuYZE= +github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwfKZ1c= +github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= +github.com/bluenviron/gortsplib/v4 v4.12.3 h1:3EzbyGb5+MIOJQYiWytRegFEP4EW5paiyTrscQj63WE= +github.com/bluenviron/gortsplib/v4 v4.12.3/go.mod h1:SkZPdaMNr+IvHt2PKRjUXxZN6FDutmSZn4eT0GmF0sk= +github.com/bluenviron/mediacommon v1.14.0 h1:lWCwOBKNKgqmspRpwpvvg3CidYm+XOc2+z/Jw7LM5dQ= +github.com/bluenviron/mediacommon v1.14.0/go.mod h1:z5LP9Tm1ZNfQV5Co54PyOzaIhGMusDfRKmh42nQSnyo= +github.com/bluenviron/mediacommon/v2 v2.0.0 h1:JinZ9v2x6QeAOzA0cDA6aFe8vQuCrU8OyWEhG2iNzwY= +github.com/bluenviron/mediacommon/v2 v2.0.0/go.mod h1:iHEz1SFIet6zBwAQoh1a92vTQ3dV3LpVFbom6/SLz3k= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 h1:yfrARW/aVlqKORCdKrYdU0PZUKPqQvYEUQBKfVlNa9Q= +github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21/go.mod h1:HZqGD/LXHB1VCGUGNzuyxSsD12f3KjbJbvImAmoK/mM= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= +github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= +github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg= +github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= +github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI= +github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= +github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/youpy/go-riff v0.1.0 h1:vZO/37nI4tIET8tQI0Qn0Y79qQh99aEpponTPiPut7k= +github.com/youpy/go-riff v0.1.0/go.mod h1:83nxdDV4Z9RzrTut9losK7ve4hUnxUR8ASSz4BsKXwQ= +github.com/youpy/go-wav v0.3.2 h1:NLM8L/7yZ0Bntadw/0h95OyUsen+DQIVf9gay+SUsMU= +github.com/youpy/go-wav v0.3.2/go.mod h1:0FCieAXAeSdcxFfwLpRuEo0PFmAoc+8NU34h7TUvk50= +github.com/zaf/g711 v1.4.0 h1:XZYkjjiAg9QTBnHqEg37m2I9q3IIDv5JRYXs2N8ma7c= +github.com/zaf/g711 v1.4.0/go.mod h1:eCDXt3dSp/kYYAoooba7ukD/Q75jvAaS4WOMr0l1Roo= +golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= +golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/reader/internal/processor/file2.go b/reader/internal/processor/file2.go new file mode 100644 index 0000000..95af6c9 --- /dev/null +++ b/reader/internal/processor/file2.go @@ -0,0 +1 @@ +package processor diff --git a/reader/internal/processor/g711_to_aac.go b/reader/internal/processor/g711_to_aac.go new file mode 100644 index 0000000..986d0ca --- /dev/null +++ b/reader/internal/processor/g711_to_aac.go @@ -0,0 +1,42 @@ +package processor + +import ( + "bytes" + "fmt" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" + "github.com/gen2brain/aac-go" +) + +// ConvertG711ToLPCM converts G711 to LPCM. +func ConvertG711ToLPCM(g711Samples []byte, mulaw bool) []byte { + var pcmSamples []byte + if mulaw { + pcmSamples = g711.DecodeMulaw(g711Samples) + } else { + pcmSamples = g711.DecodeAlaw(g711Samples) + } + + return pcmSamples +} + +// ConvertLPCMToAAC converts G711 to AAC. +func ConvertLPCMToAAC(pcmSamples []byte) ([]byte, error) { + var buf bytes.Buffer + opts := &aac.Options{ + SampleRate: 8000, // Исходная частота G711 + NumChannels: 1, + } + + enc, err := aac.NewEncoder(&buf, opts) + if err != nil { + return nil, fmt.Errorf("error creating encoder: %v", err) + } + defer enc.Close() + + r := bytes.NewReader(pcmSamples) + if err := enc.Encode(r); err != nil { + return nil, fmt.Errorf("error encoding: %v", err) + } + + return buf.Bytes(), nil +} diff --git a/reader/internal/processor/h264-aac_muxer.go b/reader/internal/processor/h264-aac_muxer.go new file mode 100644 index 0000000..39ce636 --- /dev/null +++ b/reader/internal/processor/h264-aac_muxer.go @@ -0,0 +1,133 @@ +package processor + +import ( + "bufio" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "os" + "sync" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" +) + +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +// MpegtsMuxer allows to save a H264 / MPEG-4 audio stream into a MPEG-TS file. +type MpegtsMuxer struct { + FileName string + H264Format *format.H264 + Mpeg4AudioFormat *format.MPEG4Audio + + f *os.File + b *bufio.Writer + w *mpegts.Writer + h264Track *mpegts.Track + mpeg4AudioTrack *mpegts.Track + dtsExtractor *h264.DTSExtractor2 + mutex sync.Mutex +} + +// Initialize initializes a MpegtsMuxer. +func (e *MpegtsMuxer) Initialize() error { + var err error + e.f, err = os.Create(e.FileName) + if err != nil { + return err + } + e.b = bufio.NewWriter(e.f) + + e.h264Track = &mpegts.Track{ + Codec: &mpegts.CodecH264{}, + } + + e.mpeg4AudioTrack = &mpegts.Track{ + Codec: &mpegts.CodecMPEG4Audio{ + Config: mpeg4audio.Config{}, + }, + } + + e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.h264Track, e.mpeg4AudioTrack}) + + return nil +} + +// Close closes all the MpegtsMuxer resources. +func (e *MpegtsMuxer) Close() { + e.b.Flush() + e.f.Close() +} + +// WriteH264 writes a H264 access unit into MPEG-TS. +func (e *MpegtsMuxer) WriteH264(pts int64, au [][]byte) error { + e.mutex.Lock() + defer e.mutex.Unlock() + + var filteredAU [][]byte + + nonIDRPresent := false + idrPresent := false + + for _, nalu := range au { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS: + e.H264Format.SPS = nalu + continue + + case h264.NALUTypePPS: + e.H264Format.PPS = nalu + continue + + case h264.NALUTypeAccessUnitDelimiter: + continue + + case h264.NALUTypeIDR: + idrPresent = true + + case h264.NALUTypeNonIDR: + nonIDRPresent = true + } + + filteredAU = append(filteredAU, nalu) + } + + au = filteredAU + + if au == nil || (!nonIDRPresent && !idrPresent) { + return nil + } + + // Add SPS and PPS before access unit that contains an IDR. + if idrPresent { + au = append([][]byte{e.H264Format.SPS, e.H264Format.PPS}, au...) + } + + if e.dtsExtractor == nil { + // Skip samples silently until we find one with an IDR. + if !idrPresent { + return nil + } + e.dtsExtractor = h264.NewDTSExtractor2() + } + + dts, err := e.dtsExtractor.Extract(au, pts) + if err != nil { + return err + } + + // Encode into MPEG-TS. + return e.w.WriteH2642(e.h264Track, pts, dts, au) +} + +// WriteAAC writes MPEG-4 audio access units into MPEG-TS. +func (e *MpegtsMuxer) WriteAAC(aus [][]byte, pts int64) error { + e.mutex.Lock() + defer e.mutex.Unlock() + + return e.w.WriteMPEG4Audio(e.mpeg4AudioTrack, multiplyAndDivide(pts, 90000, int64(8000)), aus) +} diff --git a/writer/go.mod b/writer/go.mod index d90b0e0..4c1ffb3 100644 --- a/writer/go.mod +++ b/writer/go.mod @@ -9,7 +9,7 @@ require ( github.com/bluenviron/mediacommon v1.14.0 github.com/bluenviron/mediacommon/v2 v2.0.0 github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 - github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 + github.com/golang/snappy v1.0.0 github.com/pion/rtp v1.8.12 github.com/zaf/g711 v1.4.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/writer/go.sum b/writer/go.sum index 44b79b8..f8b17ea 100644 --- a/writer/go.sum +++ b/writer/go.sum @@ -22,12 +22,12 @@ github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 h1:yfrARW/aVlqKOR github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21/go.mod h1:HZqGD/LXHB1VCGUGNzuyxSsD12f3KjbJbvImAmoK/mM= github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg= github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s= github.com/grafov/m3u8 v0.12.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080= -github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 h1:K7bmEmIesLcvCW0Ic2rCk6LtP5++nTnPmrO8mg5umlA= -github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302/go.mod h1:YQQXrWHN3JEvCtw5ImyTCcPeU/ZLo/YMA+TpB64XdrU= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= diff --git a/writer/internal/config/source.yaml b/writer/internal/config/source.yaml index 5b2d1d5..bdf9b2a 100644 --- a/writer/internal/config/source.yaml +++ b/writer/internal/config/source.yaml @@ -1,36 +1,36 @@ #camera_1: rtsp://intercom-video-1.insit.ru/camera01-centr-pl_slavy -camera_2: rtsp://intercom-video-1.insit.ru/camera03-centr-administraciya +#camera_2: rtsp://intercom-video-1.insit.ru/camera03-centr-administraciya #camera_3: rtsp://intercom-video-1.insit.ru/camera08-centr-skver_kalinina -camera_4: rtsp://intercom-video-1.insit.ru/camera04-centr-pobedy -camera_5: rtsp://intercom-video-1.insit.ru/camera11-centr-dk_kirova -camera_6: rtsp://intercom-video-1.insit.ru/camera13-centr-pobedy_sutyagina -camera_7: rtsp://intercom-video-1.insit.ru/camera09-centr-nalogovaya +#camera_4: rtsp://intercom-video-1.insit.ru/camera04-centr-pobedy +#camera_5: rtsp://intercom-video-1.insit.ru/camera11-centr-dk_kirova +#camera_6: rtsp://intercom-video-1.insit.ru/camera13-centr-pobedy_sutyagina +#camera_7: rtsp://intercom-video-1.insit.ru/camera09-centr-nalogovaya #camera_8: rtsp://intercom-video-1.insit.ru/camera59-centr-prktslavy -camera_9: rtsp://intercom-video-1.insit.ru/camera10-centr-kommunisticheskiy_ilycha -camera_10: rtsp://intercom-video-1.insit.ru/camera16-centr-sportschool2 -camera_11: rtsp://intercom-video-1.insit.ru/camera40-center-kommunisticheskiy -camera_12: rtsp://intercom-video-1.insit.ru/camera12-centr-skver_pavshih_geroev -camera_13: rtsp://intercom-video-1.insit.ru/camera39-center-kommunisticheskiy -camera_14: rtsp://intercom-video-1.insit.ru/camera14-centr-stadion_himik -camera_15: rtsp://intercom-video-1.insit.ru/camera41-center-kuznecova_borby -camera_16: rtsp://intercom-video-1.insit.ru/camera83-gorod-Kommunistichesky -camera_17: rtsp://intercom-video-1.insit.ru/camera54-centr-kirova-kalinina -camera_18: rtsp://intercom-video-1.insit.ru/camera53-centr-pobedy_slavy +#camera_9: rtsp://intercom-video-1.insit.ru/camera10-centr-kommunisticheskiy_ilycha +#camera_10: rtsp://intercom-video-1.insit.ru/camera16-centr-sportschool2 +#camera_11: rtsp://intercom-video-1.insit.ru/camera40-center-kommunisticheskiy +#camera_12: rtsp://intercom-video-1.insit.ru/camera12-centr-skver_pavshih_geroev +#camera_13: rtsp://intercom-video-1.insit.ru/camera39-center-kommunisticheskiy +#camera_14: rtsp://intercom-video-1.insit.ru/camera14-centr-stadion_himik +#camera_15: rtsp://intercom-video-1.insit.ru/camera41-center-kuznecova_borby +#camera_16: rtsp://intercom-video-1.insit.ru/camera83-gorod-Kommunistichesky +#camera_17: rtsp://intercom-video-1.insit.ru/camera54-centr-kirova-kalinina +#camera_18: rtsp://intercom-video-1.insit.ru/camera53-centr-pobedy_slavy camera_19: rtsp://intercom-video-1.insit.ru/camera44-center-skver_temnika -camera_20: rtsp://intercom-video-1.insit.ru/camera28-oktyabrskiy-severnaya23a -camera_21: rtsp://intercom-video-1.insit.ru/dp-qamumnrlkizuypnetljzzkjqamdoti -camera_22: rtsp://intercom-video-1.insit.ru/dp-ohusuxzcvzsnpzzvkpyhddnwxuyeyc -camera_23: rtsp://intercom-video-1.insit.ru/dp-bflbwjulvgfzurmcpejklrfvqairns -camera_24: rtsp://intercom-video-1.insit.ru/dp-ajiymmjyytokybrpganfcxlfyjcdbgezphn -camera_25: rtsp://intercom-video-2.insit.ru/dp-pobedi6a-ii-2125126423 -camera_26: rtsp://intercom-video-1.insit.ru/dp-pobedi11-ii-2108117729 -camera_27: rtsp://intercom-video-2.insit.ru/dp-pobedi11-i-2108117197 -camera_28: rtsp://intercom-video-2.insit.ru/dp-nfhapwbfjpqkmaymfeipraxtzcpedk -camera_29: rtsp://intercom-video-1.insit.ru/dp-swcixufwlheiwwrcvsrbkmhqzqvbxz +#camera_20: rtsp://intercom-video-1.insit.ru/camera28-oktyabrskiy-severnaya23a +#camera_21: rtsp://intercom-video-1.insit.ru/dp-qamumnrlkizuypnetljzzkjqamdoti +#camera_22: rtsp://intercom-video-1.insit.ru/dp-ohusuxzcvzsnpzzvkpyhddnwxuyeyc +#camera_23: rtsp://intercom-video-1.insit.ru/dp-bflbwjulvgfzurmcpejklrfvqairns +#camera_24: rtsp://intercom-video-1.insit.ru/dp-ajiymmjyytokybrpganfcxlfyjcdbgezphn +#camera_25: rtsp://intercom-video-2.insit.ru/dp-pobedi6a-ii-2125126423 +#camera_26: rtsp://intercom-video-1.insit.ru/dp-pobedi11-ii-2108117729 +#camera_27: rtsp://intercom-video-2.insit.ru/dp-pobedi11-i-2108117197 +#camera_28: rtsp://intercom-video-2.insit.ru/dp-nfhapwbfjpqkmaymfeipraxtzcpedk +#camera_29: rtsp://intercom-video-1.insit.ru/dp-swcixufwlheiwwrcvsrbkmhqzqvbxz #camera_30: rtsp://intercom-video-1.insit.ru/dp-nerrjszqrbhjvqmfxskunejafdiihj -camera_31: rtsp://intercom-video-1.insit.ru/dp-aiwukyujwonohpjyzeniispqqullyr -camera_32: rtsp://intercom-video-1.insit.ru/dp-woxvkbynctgfbuztsalttgburbpvjf -camera_33: rtsp://intercom-video-1.insit.ru/dp-fdzbasqehtptsuhxnjeqqnlrixfahcgvlcr -camera_34: rtsp://intercom-video-1.insit.ru/dp-exyeqscyamrbkwkjifagouyprtsdoe -camera_35: rtsp://intercom-video-2.insit.ru/dp-sutyagina3a-iv-uujtwbsjekv -camera_36: rtsp://intercom-video-1.insit.ru/dp-wyshispseamhqmnhkqwkbarshnrvni \ No newline at end of file +#camera_31: rtsp://intercom-video-1.insit.ru/dp-aiwukyujwonohpjyzeniispqqullyr +#camera_32: rtsp://intercom-video-1.insit.ru/dp-woxvkbynctgfbuztsalttgburbpvjf +#camera_33: rtsp://intercom-video-1.insit.ru/dp-fdzbasqehtptsuhxnjeqqnlrixfahcgvlcr +#camera_34: rtsp://intercom-video-1.insit.ru/dp-exyeqscyamrbkwkjifagouyprtsdoe +#camera_35: rtsp://intercom-video-2.insit.ru/dp-sutyagina3a-iv-uujtwbsjekv +#camera_36: rtsp://intercom-video-1.insit.ru/dp-wyshispseamhqmnhkqwkbarshnrvni \ No newline at end of file diff --git a/writer/internal/ingest/formats/h264-aac_muxer.go b/writer/internal/ingest/formats/h264-aac_muxer.go index 92a23b5..9ebca94 100644 --- a/writer/internal/ingest/formats/h264-aac_muxer.go +++ b/writer/internal/ingest/formats/h264-aac_muxer.go @@ -63,7 +63,7 @@ func (e *MpegtsMuxer) Close() { } // WriteH264 writes a H264 access unit into MPEG-TS. -func (e *MpegtsMuxer) WriteH264(au [][]byte, pts int64) error { +func (e *MpegtsMuxer) WriteH264(pts int64, au [][]byte) error { e.mutex.Lock() defer e.mutex.Unlock() diff --git a/writer/internal/ingest/rtsp/rtsp.go b/writer/internal/ingest/rtsp/rtsp.go index e5057a5..4d088ea 100644 --- a/writer/internal/ingest/rtsp/rtsp.go +++ b/writer/internal/ingest/rtsp/rtsp.go @@ -1,10 +1,12 @@ package rtsp import ( + "encoding/binary" "errors" "fmt" "log" "os" + "strconv" "strings" "time" @@ -108,55 +110,55 @@ func RTSP(dir string, period int, link string) error { } else { log.Println(err) } + /* + h265Format, h265Media, err := formats.FindH265Format(desc) + if h265Format != nil { + log.Println("[h265]: format found") + videoFormat = "H265" + } else { + log.Println(err) + } - h265Format, h265Media, err := formats.FindH265Format(desc) - if h265Format != nil { - log.Println("[h265]: format found") - videoFormat = "H265" - } else { - log.Println(err) - } + lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc) + if lpcmFormat != nil { + log.Println("[lpcm]: format found") + audioFormat = "LPCM" + } else { + log.Println(err) + } - lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc) - if lpcmFormat != nil { - log.Println("[lpcm]: format found") - audioFormat = "LPCM" - } else { - log.Println(err) - } + mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc) + if mjpegFormat != nil { + log.Println("[mjpeg]: format found") + videoFormat = "MJPEG" + } else { + log.Println(err) + } - mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc) - if mjpegFormat != nil { - log.Println("[mjpeg]: format found") - videoFormat = "MJPEG" - } else { - log.Println(err) - } + opusFormat, opusMedia, err := formats.FindOPUSFormat(desc) + if opusFormat != nil { + log.Println("[opus]: format found") + audioFormat = "OPUS" + } else { + log.Println(err) + } - opusFormat, opusMedia, err := formats.FindOPUSFormat(desc) - if opusFormat != nil { - log.Println("[opus]: format found") - audioFormat = "OPUS" - } else { - log.Println(err) - } - - vp8Format, vp8Media, err := formats.FindVP8Format(desc) - if vp8Format != nil { - log.Println("[vp8]: format found") - videoFormat = "VP8" - } else { - log.Println(err) - } - - vp9Format, vp9Media, err := formats.FindVP9Format(desc) - if vp9Format != nil { - log.Println("[vp9]: format found") - videoFormat = "VP9" - } else { - log.Println(err) - } + vp8Format, vp8Media, err := formats.FindVP8Format(desc) + if vp8Format != nil { + log.Println("[vp8]: format found") + videoFormat = "VP8" + } else { + log.Println(err) + } + vp9Format, vp9Media, err := formats.FindVP9Format(desc) + if vp9Format != nil { + log.Println("[vp9]: format found") + videoFormat = "VP9" + } else { + log.Println(err) + } + */ // Start program according to gotten formats. switch { case videoFormat == "AV1" && audioFormat == "": @@ -339,7 +341,7 @@ func RTSP(dir string, period int, link string) error { } // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteH264(au, pts) + err = currentMpegtsMuxer.WriteH264(pts, au) if err != nil { return } @@ -407,25 +409,52 @@ func RTSP(dir string, period int, link string) error { log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) } - // Setup MPEG-TS muxer. - var aacFormat *format.MPEG4Audio + //// Setup MPEG-TS muxer. + //var aacFormat *format.MPEG4Audio + // + //currentMpegtsMuxer := formats.MpegtsMuxer{ + // FileName: fn.SetNumNTime(), + // H264Format: h264Format, + // Mpeg4AudioFormat: aacFormat, + //} - currentMpegtsMuxer := formats.MpegtsMuxer{ - FileName: fn.SetNumNTime(), - H264Format: h264Format, - Mpeg4AudioFormat: aacFormat, - } - - err = currentMpegtsMuxer.Initialize() - if err != nil { - return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) - } + //err = currentMpegtsMuxer.Initialize() + //if err != nil { + // return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) + //} // Setup all medias. err = c.SetupAll(desc.BaseURL, desc.Medias) if err != nil { return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err) } + /////////////////////////////////////////////////////////// + file, err := os.Create( + dir + "/data/" + cuttedURI + "/" + time.Now().Format("15-04-05_02-01-2006") + ".insit") + if err != nil { + fmt.Println("Ошибка создания файла:", err) + } + defer file.Close() + + seg := storage.Segment{ + Start: time.Now().Format("15-04-05_02-01-2006"), + Duration: strconv.Itoa(period), + Packets: storage.InterleavedPacket{}, + } + + // Записываем заголовок файла (например, streamID). + streamID := cuttedURI + if err := binary.Write(file, binary.LittleEndian, int32(len(streamID))); err != nil { + fmt.Println("Ошибка записи заголовка:", err) + } + if _, err := file.Write([]byte(streamID)); err != nil { + fmt.Println("Ошибка записи streamID:", err) + } + + err = storage.WriteHeader(file, seg) + if err != nil { + return fmt.Errorf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err) + } // Process input rtp packets. c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { @@ -438,11 +467,23 @@ func RTSP(dir string, period int, link string) error { return } - // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteH264(au, pts) - if err != nil { - return + if au != nil { + + seg.Packets.Type = storage.PacketTypeH264 + seg.Packets.Pts = pts + seg.Packets.H264AUs = au + + // Записываем сегмент с interleaved пакетами. + if err := storage.WriteInterleavedPacket(file, seg); err != nil { + fmt.Println("Ошибка записи сегмента:", err) + return + } } + //// Encode the access unit into MPEG-TS. + //err = currentMpegtsMuxer.WriteH264(pts, au) + //if err != nil { + // return + //} case *format.G711: // Process G711 flow and returns PTS and AU. @@ -452,18 +493,33 @@ func RTSP(dir string, period int, link string) error { return } - // Convert G711 to AAC. - au, err = formats.ConvertG711ToAAC(au, f.MULaw) - if err != nil { - log.Printf("[%v-%v]: converting to AAC frame error: %v\n", videoFormat, audioFormat, err) + if au != nil { + + lpcmSamples := formats.ConvertG711ToLPCM(au, f.MULaw) + + seg.Packets.Type = storage.PacketTypeH264 + seg.Packets.Pts = pts + seg.Packets.LPCMSamples = lpcmSamples + + // Записываем сегмент с interleaved пакетами. + if err := storage.WriteInterleavedPacket(file, seg); err != nil { + fmt.Println("Ошибка записи сегмента:", err) + return + } } - // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pts) - if err != nil { - return - } + //// Convert G711 to AAC. + //au, err = formats.ConvertLPCMToAAC(lpcmSamples) + //if err != nil { + // log.Printf("[%v-%v]: converting to AAC frame error: %v\n", videoFormat, audioFormat, err) + //} + // + //// Encode the access unit into MPEG-TS. + //err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pts) + //if err != nil { + // return } + }) // Start playing. @@ -479,553 +535,584 @@ func RTSP(dir string, period int, link string) error { // Rotate files. go func() { for range ticker.C { - // Logic for rotation files. - currentMpegtsMuxer.Close() - currentMpegtsMuxer.FileName = fn.SetNumNTime() + /* + // Logic for rotation files. + currentMpegtsMuxer.Close() + currentMpegtsMuxer.FileName = fn.SetNumNTime() - err = currentMpegtsMuxer.Initialize() + err = currentMpegtsMuxer.Initialize() + if err != nil { + log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) + return + } + */ + + file.Close() + + file, err = os.Create( + dir + "/data/" + cuttedURI + "/" + time.Now().Format("15-04-05_02-01-2006") + ".insit") if err != nil { - log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) - return + fmt.Println("Ошибка создания файла:", err) + } + + seg = storage.Segment{ + Start: time.Now().Format("15-04-05_02-01-2006"), + Duration: strconv.Itoa(period), + Packets: storage.InterleavedPacket{}, + } + + // Записываем заголовок файла (например, streamID). + streamID = cuttedURI + if err := binary.Write(file, binary.LittleEndian, int32(len(streamID))); err != nil { + fmt.Println("Ошибка записи заголовка:", err) + } + if _, err := file.Write([]byte(streamID)); err != nil { + fmt.Println("Ошибка записи streamID:", err) + } + + err = storage.WriteHeader(file, seg) + if err != nil { + log.Printf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err) } log.Printf("[%v-%v]: new file for recording created: %v", - videoFormat, audioFormat, currentMpegtsMuxer.FileName) + videoFormat, audioFormat, seg.Start+".insit") } }() + /* + panic(c.Wait()) - panic(c.Wait()) + case videoFormat == "H264" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) - case videoFormat == "H264" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + // Create decoder. + h264RTPDec, err := h264Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - // Create decoder. - h264RTPDec, err := h264Format.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + // Setup H264 -> RGBA decoder. + h264Dec := &formats.H264Decoder{} + err = h264Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer h264Dec.Close() - // Setup H264 -> RGBA decoder. - h264Dec := &formats.H264Decoder{} - err = h264Dec.Initialize() - if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) - } - defer h264Dec.Close() + // if SPS and PPS are present into the SDP, send them to the decoder + if h264Format.SPS != nil { + h264Dec.Decode([][]byte{h264Format.SPS}) + } + if h264Format.PPS != nil { + h264Dec.Decode([][]byte{h264Format.PPS}) + } - // if SPS and PPS are present into the SDP, send them to the decoder - if h264Format.SPS != nil { - h264Dec.Decode([][]byte{h264Format.SPS}) - } - if h264Format.PPS != nil { - h264Dec.Decode([][]byte{h264Format.PPS}) - } + // Setup media. + _, err = c.Setup(desc.BaseURL, h264Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } - // Setup media. - _, err = c.Setup(desc.BaseURL, h264Media, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } + firstRandomAccess := false - firstRandomAccess := false - - // Process input rtp packets. - c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) { - // Process H264 flow and return PTS and IMG. - pts, img, err := formats.ProcessH264RGBA( - &c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat) - if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return - } - - log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) - }) - - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } - - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() - - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() - - err = currentMpegtsMuxer.initialize() + // Process input rtp packets. + c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) { + // Process H264 flow and return PTS and IMG. + pts, img, err := formats.ProcessH264RGBA( + &c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat) if err != nil { - panic(err) + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - */ - } - }() + log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } - case videoFormat == "H265" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - h265RTPDec, err := h265Format.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup H264 -> RGBA decoder. - h265Dec := &formats.H265Decoder{} - err = h265Dec.Initialize() - if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) - } - defer h265Dec.Close() + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } - // If VPS, SPS and PPS are present into the SDP, send them to the decoder. - if h265Format.VPS != nil { - h265Dec.Decode([][]byte{h265Format.VPS}) - } - if h265Format.SPS != nil { - h265Dec.Decode([][]byte{h265Format.SPS}) - } - if h265Format.PPS != nil { - h265Dec.Decode([][]byte{h265Format.PPS}) - } + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - // Setup media. - _, err = c.Setup(desc.BaseURL, h265Media, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } + } + }() - firstRandomAccess := false + panic(c.Wait()) - // Process input rtp packets. - c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) { - // Process H265 flow and return PTS and IMG. - pts, img, err := formats.ProcessH265RGBA( - &c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat) - if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return - } + case videoFormat == "H265" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) - log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) - }) + // Create decoder. + h265RTPDec, err := h265Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } + // Setup H264 -> RGBA decoder. + h265Dec := &formats.H265Decoder{} + err = h265Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer h265Dec.Close() - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + // If VPS, SPS and PPS are present into the SDP, send them to the decoder. + if h265Format.VPS != nil { + h265Dec.Decode([][]byte{h265Format.VPS}) + } + if h265Format.SPS != nil { + h265Dec.Decode([][]byte{h265Format.SPS}) + } + if h265Format.PPS != nil { + h265Dec.Decode([][]byte{h265Format.PPS}) + } - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Setup media. + _, err = c.Setup(desc.BaseURL, h265Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } - err = currentMpegtsMuxer.initialize() + firstRandomAccess := false + + // Process input rtp packets. + c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) { + // Process H265 flow and return PTS and IMG. + pts, img, err := formats.ProcessH265RGBA( + &c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat) if err != nil { - panic(err) + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - */ - } - }() + log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } - case videoFormat == "" && audioFormat == "LPCM": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - lpcmRTPDec, err := lpcmFormat.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0) - if err != nil { - log.Printf("[%v]: setup media error: %v\n", audioFormat, err) - } + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } - // Process input rtp packets. - c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) { - // Process LPCM flow and return PTS and SAMPLES. - pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat) - if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return - } + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples)) - }) + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "" && audioFormat == "LPCM": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + lpcmRTPDec, err := lpcmFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } - err = currentMpegtsMuxer.initialize() + // Setup media. + _, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0) + if err != nil { + log.Printf("[%v]: setup media error: %v\n", audioFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) { + // Process LPCM flow and return PTS and SAMPLES. + pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat) if err != nil { - panic(err) + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - */ - } - }() + log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples)) + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } - case videoFormat == "MJPEG" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - mjpegRTPDec, err := mjpegFormat.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } - // Process input rtp packets. - c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) { - // Process MJPEG flow and return PTS and IMG. - pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat) - if err != nil { - log.Printf("[%v]: process packet error: %v\n", videoFormat, err) - return - } + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) - }) + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "MJPEG" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + mjpegRTPDec, err := mjpegFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - err = currentMpegtsMuxer.initialize() + // Setup media. + _, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) { + // Process MJPEG flow and return PTS and IMG. + pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat) if err != nil { - panic(err) + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - */ - } - }() + log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } - case videoFormat == "" && audioFormat == "AAC": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - aacRTPDec, err := aacFormat.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, aacMedia, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) - } + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } - // Process input rtp packets. - c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) { - // Process AAC flow and return PTS and AUS. - pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) - if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return - } + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - for _, au := range aus { - log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au)) - } - }) + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "" && audioFormat == "AAC": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + aacRTPDec, err := aacFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } - err = currentMpegtsMuxer.initialize() + // Setup media. + _, err = c.Setup(desc.BaseURL, aacMedia, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) { + // Process AAC flow and return PTS and AUS. + pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) if err != nil { - panic(err) + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - */ - } - }() + for _, au := range aus { + log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au)) + } + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } - case videoFormat == "" && audioFormat == "OPUS": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", audioFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - opusRTPDec, err := opusFormat.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup media. - _, err = c.Setup(desc.BaseURL, opusMedia, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) - } + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } - // Process input rtp packets. - c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) { - // Process OPUS flow and return PTS and OP. - pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat) - if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return - } + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op)) - }) + } + }() - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) - } + panic(c.Wait()) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + case videoFormat == "" && audioFormat == "OPUS": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Create decoder. + opusRTPDec, err := opusFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } - err = currentMpegtsMuxer.initialize() + // Setup media. + _, err = c.Setup(desc.BaseURL, opusMedia, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) { + // Process OPUS flow and return PTS and OP. + pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat) if err != nil { - panic(err) + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - */ - } - }() + log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op)) + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } - case videoFormat == "VP8" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - vp8RTPDec, err := vp8Format.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup VP8 -> RGBA decoder. - vp8Dec := &formats.VPDecoder{} - err = vp8Dec.Initialize() - if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) - } - defer vp8Dec.Close() + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } - // Setup media. - _, err = c.Setup(desc.BaseURL, vp8Media, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - // Process input rtp packets. - c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) { - // Process VP8 flow and return PTS and IMG. - pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat) - if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return - } + } + }() - log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) - }) + panic(c.Wait()) - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } + case videoFormat == "VP8" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + // Create decoder. + vp8RTPDec, err := vp8Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Setup VP8 -> RGBA decoder. + vp8Dec := &formats.VPDecoder{} + err = vp8Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer vp8Dec.Close() - err = currentMpegtsMuxer.initialize() + // Setup media. + _, err = c.Setup(desc.BaseURL, vp8Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) { + // Process VP8 flow and return PTS and IMG. + pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat) if err != nil { - panic(err) + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - */ - } - }() + log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + }) - panic(c.Wait()) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } - case videoFormat == "VP9" && audioFormat == "": - // Wait for the next period. - storage.WaitPeriod(period) - log.Printf("[%v]: start recording", videoFormat) + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Create decoder. - vp9RTPDec, err := vp9Format.CreateDecoder() - if err != nil { - log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() - // Setup VP9 -> RGBA decoder. - vp9Dec := &formats.VPDecoder{} - err = vp9Dec.Initialize() - if err != nil { - log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) - } - defer vp9Dec.Close() + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } - // Setup media. - _, err = c.Setup(desc.BaseURL, vp9Media, 0, 0) - if err != nil { - return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) - } + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - // Process input rtp packets. - c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) { - // Process VP9 flow and return PTS and IMG. - pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat) - if err != nil { - log.Printf("[%v]: process packet error: %v\n", audioFormat, err) - return - } + } + }() - log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) - }) + panic(c.Wait()) - // Start playing. - _, err = c.Play(nil) - if err != nil { - return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) - } + case videoFormat == "VP9" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + // Create decoder. + vp9RTPDec, err := vp9Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } - // Rotate files. - go func() { - for range ticker.C { - // Logic for rotation files. - /* - currentMpegtsMuxer.close() - currentMpegtsMuxer.fileName = fn.SetNumNTime() + // Setup VP9 -> RGBA decoder. + vp9Dec := &formats.VPDecoder{} + err = vp9Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer vp9Dec.Close() - err = currentMpegtsMuxer.initialize() + // Setup media. + _, err = c.Setup(desc.BaseURL, vp9Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) { + // Process VP9 flow and return PTS and IMG. + pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat) if err != nil { - panic(err) + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return } - log.Println("New file for recording created:", currentMpegtsMuxer.fileName) - */ - } - }() + log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + }) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + + } + }() + */ panic(c.Wait()) + } return nil } diff --git a/writer/internal/storage/editor.go b/writer/internal/storage/editor.go index afdd262..24d3de8 100644 --- a/writer/internal/storage/editor.go +++ b/writer/internal/storage/editor.go @@ -14,9 +14,9 @@ func CutURI(URI string) (CutterURI string) { } // CreateFileName creates FileName structure. -func CreateFileName(dir string, resolutions []string, URI string, period int) *protos.FileName { +func CreateFileName(dir string, resolutions []string, cuttedURI string, period int) *protos.FileName { fn := protos.FileName{ - Path: dir + "/data/" + URI + "/" + resolutions[0], + Path: dir + "/data/" + cuttedURI + "/" + resolutions[0], TimeNow: time.Now().Format("15-04-05_02-01-2006"), Name: "videoFragment", Number: -1, diff --git a/writer/internal/storage/file.go b/writer/internal/storage/file.go index 7b7eb39..44fad45 100644 --- a/writer/internal/storage/file.go +++ b/writer/internal/storage/file.go @@ -1,19 +1,23 @@ package storage -import "time" +const ( + PacketTypeH264 = 1 + PacketTypeLPCM = 2 +) +// InterleavedPacket представляет пакет, который может быть либо H264, либо G711. +type InterleavedPacket struct { + // Type: 1 для H264, 2 для G711. + Type byte + Pts int64 + + H264AUs [][]byte + LPCMSamples []byte +} + +// Segment содержит строковые поля Start и Duration, а также набор interleaved пакетов. type Segment struct { - Start time.Time - Duration time.Duration - - H264 byte - G711 byte - - SPS []byte - PPS []byte -} - -type StreamRecord struct { - ID string - Segments []Segment + Start string + Duration string + Packets InterleavedPacket } diff --git a/writer/internal/storage/segmenter.go b/writer/internal/storage/segmenter.go index 82be054..e3de726 100644 --- a/writer/internal/storage/segmenter.go +++ b/writer/internal/storage/segmenter.go @@ -1 +1,162 @@ package storage + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "os" + "time" + + "github.com/golang/snappy" +) + +// writeString записывает строку: сначала длину (int32), затем байты строки. +func writeString(w io.Writer, s string) error { + if err := binary.Write(w, binary.LittleEndian, int32(len(s))); err != nil { + return err + } + _, err := w.Write([]byte(s)) + return err +} + +// WritePacket записывает один interleaved пакет в writer. +func WritePacket(w io.Writer, pkt InterleavedPacket) error { + // Записываем тип пакета (1 байт). + if err := binary.Write(w, binary.LittleEndian, pkt.Type); err != nil { + return err + } + // Записываем pts. + if err := binary.Write(w, binary.LittleEndian, pkt.Pts); err != nil { + return err + } + // В зависимости от типа пакета записываем данные access unit. + if pkt.Type == PacketTypeH264 { + // Для H264 AU — [][]byte. + numAUs := int32(len(pkt.H264AUs)) + if err := binary.Write(w, binary.LittleEndian, numAUs); err != nil { + return err + } + for _, au := range pkt.H264AUs { + // Сначала длина AU. + if err := binary.Write(w, binary.LittleEndian, int32(len(au))); err != nil { + return err + } + // Затем сами данные. + if _, err := w.Write(au); err != nil { + return err + } + } + } else if pkt.Type == PacketTypeLPCM { + // Для G711 AU — []byte. + if err := binary.Write(w, binary.LittleEndian, int32(len(pkt.LPCMSamples))); err != nil { + return err + } + if _, err := w.Write(pkt.LPCMSamples); err != nil { + return err + } + } else { + return fmt.Errorf("неизвестный тип пакета: %d", pkt.Type) + } + return nil +} + +// WriteHeader writes header to a file. +func WriteHeader(w io.Writer, seg Segment) error { + var buf bytes.Buffer + + // Записываем строки Start и Duration. + if err := writeString(&buf, seg.Start); err != nil { + return err + } + if err := writeString(&buf, seg.Duration); err != nil { + return err + } + + // Сначала записываем длину сегмента, затем данные сегмента. + segData := buf.Bytes() + if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil { + return err + } + _, err := w.Write(segData) + return err +} + +// WriteInterleavedPacket записывает один сегмент в writer. Сначала собирается содержимое сегмента в буфер, +// затем записывается его длина (int32) и данные. +func WriteInterleavedPacket(w io.Writer, seg Segment) error { + var buf bytes.Buffer + + // Записываем количество пакетов. + if err := binary.Write(&buf, binary.LittleEndian, int32(1)); err != nil { + return err + } + + // Для каждого пакета записываем данные. + if err := WritePacket(&buf, seg.Packets); err != nil { + return err + } + + // Сначала записываем длину сегмента, затем данные сегмента. + segData := buf.Bytes() + if err := binary.Write(w, binary.LittleEndian, int32(len(segData))); err != nil { + return err + } + _, err := w.Write(segData) + return err +} + +func main() { + now := time.Now() + // Пример сегмента с interleaved пакетами. + seg := Segment{ + Start: now.Format(time.RFC3339), + Duration: "1m", // длительность сегмента в виде строки + Packets: InterleavedPacket{ + + Type: PacketTypeH264, + Pts: now.UnixNano(), + H264AUs: [][]byte{ + []byte{0x00, 0x01, 0x02}, + []byte{0x03, 0x04}, + }, + }, + } + + // Открываем файл для записи. + f, err := os.Create("stream_interleaved.bin") + if err != nil { + fmt.Println("Ошибка создания файла:", err) + return + } + defer f.Close() + + // Оборачиваем writer через snappy для быстрой компрессии (если требуется). + writer := snappy.NewBufferedWriter(f) + defer writer.Close() + + // Записываем заголовок файла (например, streamID). + streamID := "example_stream" + if err := binary.Write(writer, binary.LittleEndian, int32(len(streamID))); err != nil { + fmt.Println("Ошибка записи заголовка:", err) + return + } + if _, err := writer.Write([]byte(streamID)); err != nil { + fmt.Println("Ошибка записи streamID:", err) + return + } + + // Записываем сегмент с interleaved пакетами. + if err := WriteInterleavedPacket(writer, seg); err != nil { + fmt.Println("Ошибка записи сегмента:", err) + return + } + + // Обязательно делаем Flush, чтобы данные точно записались. + if err := writer.Flush(); err != nil { + fmt.Println("Ошибка при сбросе данных:", err) + return + } + + fmt.Println("Сегмент с interleaved пакетами успешно записан.") +}