From 61971d0acba6827e766f0e86292e225978cac7c2 Mon Sep 17 00:00:00 2001 From: Sergey Petrov Date: Wed, 26 Mar 2025 10:58:07 +0500 Subject: [PATCH] Added creating M3U8 playlist and consistent checking of each new file in storage. --- reader/cmd/main.go | 284 ++------------------ reader/go.mod | 12 +- reader/go.sum | 25 +- reader/internal/processor/file2.go | 1 - reader/internal/processor/h264-aac_muxer.go | 3 + reader/internal/unpacker/proc.go | 158 +++++++++++ reader/internal/unpacker/unpack.go | 109 ++++++++ 7 files changed, 316 insertions(+), 276 deletions(-) delete mode 100644 reader/internal/processor/file2.go create mode 100644 reader/internal/unpacker/proc.go create mode 100644 reader/internal/unpacker/unpack.go diff --git a/reader/cmd/main.go b/reader/cmd/main.go index d84b5ac..6d3e8c4 100644 --- a/reader/cmd/main.go +++ b/reader/cmd/main.go @@ -1,276 +1,26 @@ package main import ( - "bytes" - "encoding/binary" - "fmt" - "io" - "log" - "os" - "reader/internal/processor" - - "github.com/bluenviron/gortsplib/v4/pkg/format" + "git.insit.tech/sas/rtsp_proxy/core/log" + "git.insit.tech/sas/rtsp_proxy/proto/common" + "reader/internal/unpacker" ) -//port := 8080 -// -//http.HandleFunc("GET /download", handlers.Download) // example request: {"date": "07-03-2025", "start_time": "16-43", "end_time": "16-44"} -//http.HandleFunc("GET /hls/", handlers.HLS) -// -//log.Println("Starting server on:") -//log.Printf("Serving on HTTP port: %d\n", port) -// -//log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) - -// Интерпретируем типы пакетов: -const ( - PacketTypeH264 = 1 - PacketTypeLPCM = 2 -) - -var ( - h264 string - g711 string -) - -// InterleavedPacket описывает пакет, который может быть либо H264, либо G711. -type InterleavedPacket struct { - Type byte - Pts int64 - H264AUs [][]byte // для H264 - LPCMSamples []byte // для G711 -} - -// Segment содержит строки Start и Duration, а также набор пакетов. -type Segment struct { - Start string - Duration string - Packets []InterleavedPacket -} - -// readString читает строку: сначала длину (int32), затем данные строки. -func readString(r io.Reader) (string, error) { - var length int32 - if err := binary.Read(r, binary.LittleEndian, &length); err != nil { - return "", err - } - buf := make([]byte, length) - if _, err := io.ReadFull(r, buf); err != nil { - return "", err - } - return string(buf), nil -} - -// readHeaderSegment читает сегмент, записанный функцией WriteHeader (с Start и Duration). -func readHeaderSegment(r io.Reader) (Segment, error) { - var seg Segment - start, err := readString(r) - if err != nil { - return seg, err - } - seg.Start = start - - duration, err := readString(r) - if err != nil { - return seg, err - } - seg.Duration = duration - - return seg, nil -} - -// readPacket читает один interleaved пакет. -func readPacket(r io.Reader) (InterleavedPacket, error) { - var pkt InterleavedPacket - - // Читаем тип пакета (1 байт). - typeByte := make([]byte, 1) - if _, err := io.ReadFull(r, typeByte); err != nil { - return pkt, err - } - pkt.Type = typeByte[0] - - // Читаем pts (int64). - if err := binary.Read(r, binary.LittleEndian, &pkt.Pts); err != nil { - return pkt, err - } - - // В зависимости от типа, читаем данные. - if pkt.Type == PacketTypeH264 { - var numAUs int32 - if err := binary.Read(r, binary.LittleEndian, &numAUs); err != nil { - return pkt, err - } - var auList [][]byte - for i := 0; i < int(numAUs); i++ { - var auLen int32 - if err := binary.Read(r, binary.LittleEndian, &auLen); err != nil { - return pkt, err - } - auData := make([]byte, auLen) - if _, err := io.ReadFull(r, auData); err != nil { - return pkt, err - } - auList = append(auList, auData) - } - pkt.H264AUs = auList - } else if pkt.Type == PacketTypeLPCM { - var auLen int32 - if err := binary.Read(r, binary.LittleEndian, &auLen); err != nil { - return pkt, err - } - auData := make([]byte, auLen) - if _, err := io.ReadFull(r, auData); err != nil { - return pkt, err - } - pkt.LPCMSamples = auData - } else { - return pkt, fmt.Errorf("неизвестный тип пакета: %d", pkt.Type) - } - - return pkt, nil -} - -// readPacketSegment читает сегмент, записанный функцией WriteInterleavedPacket: -// сначала число пакетов (int32), затем каждый пакет. -func readPacketSegment(r io.Reader) ([]InterleavedPacket, error) { - var numPackets int32 - if err := binary.Read(r, binary.LittleEndian, &numPackets); err != nil { - return nil, err - } - - var packets []InterleavedPacket - for i := 0; i < int(numPackets); i++ { - pkt, err := readPacket(r) - if err != nil { - return nil, err - } - packets = append(packets, pkt) - } - return packets, nil -} - func main() { - filename := "/home/psa/GoRepository/data/camera44-center-skver_temnika/12-10-00_21-03-2025.insit" + //port := 8080 + // + //http.HandleFunc("GET /download", handlers.Download) // example request: {"date": "07-03-2025", "start_time": "16-43", "end_time": "16-44"} + //http.HandleFunc("GET /hls/", handlers.HLS) + // + //log.Println("Starting server on:") + //log.Printf("Serving on HTTP port: %d\n", port) + // + //log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) - segment := Segment{} + Log := log.MainLogging("/home/psa/GoRepository/" + "/data/" + "camera54-centr-kirova-kalinina") - // Открываем файл для чтения. - f, err := os.Open(filename) - if err != nil { - fmt.Println("Ошибка открытия файла:", err) - return - } - defer f.Close() - - // Читаем streamID (первые 4 байта — длина, затем сами байты). - var streamIDLen int32 - if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil { - fmt.Println("Ошибка чтения streamID длины:", err) - return - } - streamIDBytes := make([]byte, streamIDLen) - if _, err := io.ReadFull(f, streamIDBytes); err != nil { - fmt.Println("Ошибка чтения streamID:", err) - return - } - streamID := string(streamIDBytes) - fmt.Println("Stream ID:", streamID) - - // Теперь в файле идут сегменты. Первый сегмент — заголовочный, далее — сегменты с пакетами. - // Читаем первый сегмент как заголовочный. - var segLen int32 - if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil { - fmt.Println("Ошибка чтения длины заголовочного сегмента:", err) - return - } - segData := make([]byte, segLen) - if _, err := io.ReadFull(f, segData); err != nil { - fmt.Println("Ошибка чтения заголовочного сегмента:", err) - return - } - headerReader := bytes.NewReader(segData) - headerSeg, err := readHeaderSegment(headerReader) - if err != nil { - fmt.Println("Ошибка чтения заголовочного сегмента:", err) - return - } - fmt.Println("Заголовочный сегмент:") - fmt.Println("\tStart:", headerSeg.Start) - fmt.Println("\tDuration:", headerSeg.Duration) - - segment = headerSeg - - // Setup MPEG-TS muxer. - var h264Format format.H264 - var aacFormat *format.MPEG4Audio - - h264Format.PayloadTyp = 96 - h264Format.PacketizationMode = 1 - - currentMpegtsMuxer := processor.MpegtsMuxer{ - FileName: segment.Start + "_videoFragment" + "_0" + ".ts", - H264Format: &h264Format, - Mpeg4AudioFormat: aacFormat, - } - - err = currentMpegtsMuxer.Initialize() - if err != nil { - fmt.Printf("[%v-%v]: init muxer error: %w\n", h264, g711, err) - } - - // Читаем последующие сегменты, содержащие пакеты. - segmentIndex := 1 - for { - var segLen int32 - err := binary.Read(f, binary.LittleEndian, &segLen) - if err != nil { - if err == io.EOF { - break // достигнут конец файла - } - fmt.Println("Ошибка чтения длины сегмента:", err) - return - } - segData = make([]byte, segLen) - if _, err := io.ReadFull(f, segData); err != nil { - fmt.Println("Ошибка чтения данных сегмента:", err) - return - } - packetReader := bytes.NewReader(segData) - packets, err := readPacketSegment(packetReader) - if err != nil { - fmt.Println("Ошибка чтения сегмента пакетов:", err) - return - } - - segment.Packets = append(segment.Packets, packets...) - - fmt.Printf("Сегмент пакетов #%d:\n", segmentIndex) - for _, pkt := range packets { - switch pkt.Type { - case PacketTypeH264: - h264 = "h264" - err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs) - if err != nil { - log.Printf("write h264 packet error: %v\n", err) - } - case PacketTypeLPCM: - g711 = "g711" - // Convert G711 to AAC. - au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples) - if err != nil { - log.Printf("converting to AAC frame error: %v\n", err) - } - - // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts) - if err != nil { - return - } - } - } - segmentIndex++ - } - - currentMpegtsMuxer.Close() + unpacker.CreateFlow("/home/psa/GoRepository", + "camera54-centr-kirova-kalinina", + &common.FileName{Duration: 60}, + []string{"1920-1080"}, Log) } diff --git a/reader/go.mod b/reader/go.mod index f80cd99..37e28dc 100644 --- a/reader/go.mod +++ b/reader/go.mod @@ -3,21 +3,27 @@ module reader go 1.24.1 require ( - git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250321123217-883053a85cbc + git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250325111946-a4678342ee71 + git.insit.tech/sas/rtsp_proxy v0.0.0-20250326040356-446f7f0578d9 github.com/bluenviron/gortsplib/v4 v4.12.3 github.com/bluenviron/mediacommon v1.14.0 + github.com/bluenviron/mediacommon/v2 v2.0.0 + github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 + go.uber.org/zap v1.27.0 ) require ( github.com/asticode/go-astikit v0.52.0 // indirect github.com/asticode/go-astits v1.13.0 // indirect - github.com/bluenviron/mediacommon/v2 v2.0.0 // indirect - github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/grafov/m3u8 v0.12.1 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.15 // indirect github.com/pion/rtp v1.8.13 // indirect github.com/pion/sdp/v3 v3.0.11 // indirect + github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.37.0 // indirect golang.org/x/sys v0.31.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/reader/go.sum b/reader/go.sum index 1057023..7469968 100644 --- a/reader/go.sum +++ b/reader/go.sum @@ -1,5 +1,7 @@ -git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250321123217-883053a85cbc h1:dhJVA/B+q3CWBhI1ZBPhClAbY875j7+Ru6XIWimxjW8= -git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250321123217-883053a85cbc/go.mod h1:hY3OlYFPtsZ/fX/BVUPkjIHNPbfIpzgujKn9TKftI+E= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250325111946-a4678342ee71 h1:ObdTAquutHq2Abh9J02S8wBkeHuOYS+6fd8chf2yQC4= +git.insit.tech/psa/rtsp_reader-writer/writer v0.0.0-20250325111946-a4678342ee71/go.mod h1:TjxUTANLCPQpjNFVlGZWlGn0ICGV9oQP7aed0xexFkw= +git.insit.tech/sas/rtsp_proxy v0.0.0-20250326040356-446f7f0578d9 h1:8aOLn23rkkXR/mpkOtbaNrCWsiBs0Pl+/ds3CzUuHZo= +git.insit.tech/sas/rtsp_proxy v0.0.0-20250326040356-446f7f0578d9/go.mod h1:/AHWd1Otr+ikOLWzpXtoozzifEx9ZKou+R6DgwaEzr0= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astikit v0.52.0 h1:kTl2XjgiVQhUl1H7kim7NhmTtCMwVBbPrXKqhQhbk8Y= github.com/asticode/go-astikit v0.52.0/go.mod h1:fV43j20UZYfXzP9oBn33udkvCvDvCDhzjVqoLFuuYZE= @@ -12,12 +14,14 @@ github.com/bluenviron/mediacommon v1.14.0/go.mod h1:z5LP9Tm1ZNfQV5Co54PyOzaIhGMu github.com/bluenviron/mediacommon/v2 v2.0.0 h1:JinZ9v2x6QeAOzA0cDA6aFe8vQuCrU8OyWEhG2iNzwY= github.com/bluenviron/mediacommon/v2 v2.0.0/go.mod h1:iHEz1SFIet6zBwAQoh1a92vTQ3dV3LpVFbom6/SLz3k= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 h1:yfrARW/aVlqKORCdKrYdU0PZUKPqQvYEUQBKfVlNa9Q= github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21/go.mod h1:HZqGD/LXHB1VCGUGNzuyxSsD12f3KjbJbvImAmoK/mM= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s= +github.com/grafov/m3u8 v0.12.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= @@ -27,8 +31,9 @@ github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI= github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= @@ -39,11 +44,21 @@ github.com/youpy/go-wav v0.3.2 h1:NLM8L/7yZ0Bntadw/0h95OyUsen+DQIVf9gay+SUsMU= github.com/youpy/go-wav v0.3.2/go.mod h1:0FCieAXAeSdcxFfwLpRuEo0PFmAoc+8NU34h7TUvk50= github.com/zaf/g711 v1.4.0 h1:XZYkjjiAg9QTBnHqEg37m2I9q3IIDv5JRYXs2N8ma7c= github.com/zaf/g711 v1.4.0/go.mod h1:eCDXt3dSp/kYYAoooba7ukD/Q75jvAaS4WOMr0l1Roo= +github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2 h1:0iAY2pL6yYhNYpdc1DbFq0p7ocyu5MlgKmkealhz3nk= +github.com/zencoder/go-dash v0.0.0-20201006100653-2f93b14912b2/go.mod h1:c8Gxxfmh0jmZ6G+ISlpa315WBVkzd8mEhu6gN9mn5Qg= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/reader/internal/processor/file2.go b/reader/internal/processor/file2.go deleted file mode 100644 index 95af6c9..0000000 --- a/reader/internal/processor/file2.go +++ /dev/null @@ -1 +0,0 @@ -package processor diff --git a/reader/internal/processor/h264-aac_muxer.go b/reader/internal/processor/h264-aac_muxer.go index 39ce636..342031e 100644 --- a/reader/internal/processor/h264-aac_muxer.go +++ b/reader/internal/processor/h264-aac_muxer.go @@ -3,6 +3,7 @@ package processor import ( "bufio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "log" "os" "sync" @@ -115,6 +116,8 @@ func (e *MpegtsMuxer) WriteH264(pts int64, au [][]byte) error { e.dtsExtractor = h264.NewDTSExtractor2() } + log.Printf("pts: %d", pts) + dts, err := e.dtsExtractor.Extract(au, pts) if err != nil { return err diff --git a/reader/internal/unpacker/proc.go b/reader/internal/unpacker/proc.go new file mode 100644 index 0000000..b8ee5ff --- /dev/null +++ b/reader/internal/unpacker/proc.go @@ -0,0 +1,158 @@ +package unpacker + +import ( + "bytes" + "encoding/binary" + "fmt" + "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage" + "git.insit.tech/sas/rtsp_proxy/core/gen" + "git.insit.tech/sas/rtsp_proxy/proto/common" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "go.uber.org/zap" + "io" + "log" + "os" + "reader/internal/processor" + "strconv" + "strings" + "time" +) + +// CreateFlow generate TS files and M3U8 playlists. +func CreateFlow(dir string, cutURI string, fn *common.FileName, resolutions []string, Log *zap.Logger) { + fmt.Println("Reader started") + + // Establish starting period. + per := 60 + period := time.Duration(per) + + // Create M3U8 playlist. + go gen.MediaPlaylistGenerator(dir+"/data", cutURI, fn.Duration, resolutions, Log) + + for { + time.Sleep(period * time.Second) + for _, resolution := range resolutions { + filenames := gen.StringDirEntryList(dir+"/data/"+cutURI+"/"+resolution, "insit", Log) + + for i := len(filenames) - 2; i < len(filenames)-1; i++ { + + segment := storage.Segment{} + + // Open file for reading. + f, err := os.Open(dir + "/data/" + cutURI + "/" + resolution + "/" + filenames[i]) + if err != nil { + fmt.Println("opening file error: ", err) + return + } + defer f.Close() + + // Read StreamID. + var streamIDLen int32 + if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil { + fmt.Println("reading StreamID length error: ", err) + return + } + streamIDBytes := make([]byte, streamIDLen) + if _, err := io.ReadFull(f, streamIDBytes); err != nil { + fmt.Println("reading StreamID error: ", err) + return + } + streamID := string(streamIDBytes) + fmt.Println("Stream ID:", streamID) + + // Read header of the file. + var segLen int32 + if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil { + fmt.Println("reading header length error: ", err) + return + } + segData := make([]byte, segLen) + if _, err := io.ReadFull(f, segData); err != nil { + fmt.Println("reading header error: ", err) + return + } + headerReader := bytes.NewReader(segData) + headerSeg, err := readHeaderSegment(headerReader) + if err != nil { + fmt.Println("func readHeaderSegment error: ", err) + return + } + + // Parse duration of a segment. + per, err = strconv.Atoi(headerSeg.Duration) + if err != nil { + fmt.Println("parsing duration error: ", err) + } + + // Setup MPEG-TS muxer. + var h264Format format.H264 + var aacFormat format.MPEG4Audio + + filename, _ := strings.CutSuffix(filenames[i], ".insit") + + currentMpegtsMuxer := processor.MpegtsMuxer{ + FileName: dir + "/data/" + cutURI + "/" + resolution + "/" + filename + ".ts", + H264Format: &h264Format, + Mpeg4AudioFormat: &aacFormat, + } + + err = currentMpegtsMuxer.Initialize() + if err != nil { + fmt.Printf("init muxer error: %w\n", err) + } + + // Read segments. + for { + // Read segments length. + var segLen int32 + err := binary.Read(f, binary.LittleEndian, &segLen) + if err != nil { + if err == io.EOF { + break + } + fmt.Println("read segments length error: ", err) + return + } + // Read segments data. + segData = make([]byte, segLen) + if _, err := io.ReadFull(f, segData); err != nil { + fmt.Println("read segments error: ", err) + return + } + packetReader := bytes.NewReader(segData) + packets, err := readPacketSegment(packetReader) + if err != nil { + fmt.Println("func readPacketSegment error: ", err) + return + } + + segment.Packets = append(segment.Packets, packets...) + + for _, pkt := range packets { + switch pkt.Type { + case storage.PacketTypeH264: + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs) + if err != nil { + log.Printf("write H264 packet error: %v\n", err) + } + case storage.PacketTypeLPCM: + // Convert G711 to AAC. + au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples) + if err != nil { + log.Printf("converting to AAC frame error: %v\n", err) + } + + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts) + if err != nil { + log.Printf("write G711 packet error: %v\n", err) + } + } + } + } + currentMpegtsMuxer.Close() + } + } + } +} diff --git a/reader/internal/unpacker/unpack.go b/reader/internal/unpacker/unpack.go new file mode 100644 index 0000000..77feef7 --- /dev/null +++ b/reader/internal/unpacker/unpack.go @@ -0,0 +1,109 @@ +package unpacker + +import ( + "encoding/binary" + "fmt" + "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage" + "io" +) + +// readString reads string length and then reads string data. +func readString(r io.Reader) (string, error) { + var length int32 + if err := binary.Read(r, binary.LittleEndian, &length); err != nil { + return "", err + } + buf := make([]byte, length) + if _, err := io.ReadFull(r, buf); err != nil { + return "", err + } + return string(buf), nil +} + +// readHeaderSegment reads header of the segment. +func readHeaderSegment(r io.Reader) (storage.Segment, error) { + var seg storage.Segment + date, err := readString(r) + if err != nil { + return seg, err + } + seg.Date = date + + duration, err := readString(r) + if err != nil { + return seg, err + } + seg.Duration = duration + + return seg, nil +} + +// readPacket reads one interleaved packet. +func readPacket(r io.Reader) (storage.InterleavedPacket, error) { + var pkt storage.InterleavedPacket + + // Read type of the packet. + typeByte := make([]byte, 1) + if _, err := io.ReadFull(r, typeByte); err != nil { + return pkt, err + } + pkt.Type = typeByte[0] + + // Read PTS (int64). + if err := binary.Read(r, binary.LittleEndian, &pkt.Pts); err != nil { + return pkt, err + } + + // Read data of the segment. + if pkt.Type == storage.PacketTypeH264 { + var numAUs int32 + if err := binary.Read(r, binary.LittleEndian, &numAUs); err != nil { + return pkt, err + } + var auList [][]byte + for i := 0; i < int(numAUs); i++ { + var auLen int32 + if err := binary.Read(r, binary.LittleEndian, &auLen); err != nil { + return pkt, err + } + auData := make([]byte, auLen) + if _, err := io.ReadFull(r, auData); err != nil { + return pkt, err + } + auList = append(auList, auData) + } + pkt.H264AUs = auList + } else if pkt.Type == storage.PacketTypeLPCM { + var auLen int32 + if err := binary.Read(r, binary.LittleEndian, &auLen); err != nil { + return pkt, err + } + auData := make([]byte, auLen) + if _, err := io.ReadFull(r, auData); err != nil { + return pkt, err + } + pkt.LPCMSamples = auData + } else { + return pkt, fmt.Errorf("unknown type of the packet: %d", pkt.Type) + } + + return pkt, nil +} + +// readPacketSegment reads segment packets. +func readPacketSegment(r io.Reader) ([]storage.InterleavedPacket, error) { + var numPackets int32 + if err := binary.Read(r, binary.LittleEndian, &numPackets); err != nil { + return nil, err + } + + var packets []storage.InterleavedPacket + for i := 0; i < int(numPackets); i++ { + pkt, err := readPacket(r) + if err != nil { + return nil, err + } + packets = append(packets, pkt) + } + return packets, nil +}