package unpacker import ( "bytes" "encoding/binary" "fmt" "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage" "git.insit.tech/sas/rtsp_proxy/core/gen" "git.insit.tech/sas/rtsp_proxy/proto/common" "github.com/bluenviron/gortsplib/v4/pkg/format" "go.uber.org/zap" "io" "log" "os" "reader/internal/processor" "strconv" "strings" "time" ) // CreateFlow generate TS files and M3U8 playlists. func CreateFlow(dir string, cutURI string, fn *common.FileName, resolutions []string, Log *zap.Logger) { fmt.Println("Reader started") // Establish starting period. per := 60 period := time.Duration(per) // Create M3U8 playlist. go gen.MediaPlaylistGenerator(dir+"/data", cutURI, fn.Duration, resolutions, Log) for { time.Sleep(period * time.Second) for _, resolution := range resolutions { filenames := gen.StringDirEntryList(dir+"/data/"+cutURI+"/"+resolution, "insit", Log) for i := len(filenames) - 2; i < len(filenames)-1; i++ { segment := storage.Segment{} // Open file for reading. f, err := os.Open(dir + "/data/" + cutURI + "/" + resolution + "/" + filenames[i]) if err != nil { fmt.Println("opening file error: ", err) return } defer f.Close() // Read StreamID. var streamIDLen int32 if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil { fmt.Println("reading StreamID length error: ", err) return } streamIDBytes := make([]byte, streamIDLen) if _, err := io.ReadFull(f, streamIDBytes); err != nil { fmt.Println("reading StreamID error: ", err) return } streamID := string(streamIDBytes) fmt.Println("Stream ID:", streamID) // Read header of the file. var segLen int32 if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil { fmt.Println("reading header length error: ", err) return } segData := make([]byte, segLen) if _, err := io.ReadFull(f, segData); err != nil { fmt.Println("reading header error: ", err) return } headerReader := bytes.NewReader(segData) headerSeg, err := readHeaderSegment(headerReader) if err != nil { fmt.Println("func readHeaderSegment error: ", err) return } // Parse duration of a segment. per, err = strconv.Atoi(headerSeg.Duration) if err != nil { fmt.Println("parsing duration error: ", err) } // Setup MPEG-TS muxer. var h264Format format.H264 var aacFormat format.MPEG4Audio filename, _ := strings.CutSuffix(filenames[i], ".insit") currentMpegtsMuxer := processor.MpegtsMuxer{ FileName: dir + "/data/" + cutURI + "/" + resolution + "/" + filename + ".ts", H264Format: &h264Format, Mpeg4AudioFormat: &aacFormat, } err = currentMpegtsMuxer.Initialize() if err != nil { fmt.Printf("init muxer error: %w\n", err) } // Read segments. for { // Read segments length. var segLen int32 err := binary.Read(f, binary.LittleEndian, &segLen) if err != nil { if err == io.EOF { break } fmt.Println("read segments length error: ", err) return } // Read segments data. segData = make([]byte, segLen) if _, err := io.ReadFull(f, segData); err != nil { fmt.Println("read segments error: ", err) return } packetReader := bytes.NewReader(segData) packets, err := readPacketSegment(packetReader) if err != nil { fmt.Println("func readPacketSegment error: ", err) return } segment.Packets = append(segment.Packets, packets...) for _, pkt := range packets { switch pkt.Type { case storage.PacketTypeH264: // Encode the access unit into MPEG-TS. err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs) if err != nil { log.Printf("write H264 packet error: %v\n", err) } case storage.PacketTypeLPCM: // Convert G711 to AAC. au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples) if err != nil { log.Printf("converting to AAC frame error: %v\n", err) } // Encode the access unit into MPEG-TS. err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts) if err != nil { log.Printf("write G711 packet error: %v\n", err) } } } } currentMpegtsMuxer.Close() } } } }