package unpacker import ( "bytes" "encoding/binary" "errors" "fmt" "io" "log" "os" "reader/internal/config" "strconv" "strings" "time" "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage" "git.insit.tech/sas/rtsp_proxy/core/gen" log2 "git.insit.tech/sas/rtsp_proxy/core/log" "github.com/bluenviron/gortsplib/v4/pkg/format" "go.uber.org/zap" logger "reader/internal/log" "reader/internal/processor" ) // CreateVideo generate TS files and M3U8 playlists. func CreateVideo() error { exist := storage.Exists(config.DirData) if !exist { return errors.New("directory does not exist") } // Read directory. files, err := storage.ReadDir(config.DirData) if err != nil { return err } for _, file := range files { // Create logger. cam := log2.CamLogging( fmt.Sprintf("%s/%s/log/reader-cam_%s.log", config.DirData, file.Name(), strconv.FormatInt(time.Now().Unix(), 10))) res, err := storage.ReadDir(fmt.Sprintf("%s/%s", config.DirData, file.Name())) if err != nil { cam.Error( "error reading directory", zap.String("dir", fmt.Sprintf("%s/%s", config.DirData, file.Name())), zap.Error(err)) return err } resolutions := make([]string, 0) for _, r := range res { if r.IsDir() && r.Name() != "log" { resolutions = append(resolutions, r.Name()) } } logger.Log.Info("start process camera:", zap.String("cam_name", file.Name())) log.Println("start process camera: ", file.Name()) // Establish starting period. per := 60 period := time.Duration(per) // Create M3U8 playlist. go gen.MediaPlaylistGenerator(config.DirData, file.Name(), float64(period), resolutions, cam) go func() { for { for _, resolution := range resolutions { filenames := gen.StringDirEntryList( fmt.Sprintf("%s/%s/%s", config.DirData, file.Name(), resolution), "insit", logger.Log) if len(filenames) == 0 { break } // Setup MPEG-TS muxer. var h264Format format.H264 var aacFormat format.MPEG4Audio currentMpegTSMuxer := processor.MpegTSMuxer{ H264Format: &h264Format, Mpeg4AudioFormat: &aacFormat, } for i := len(filenames) - 4; i < len(filenames)-1; i++ { segment := storage.Segment{} // Open file for reading. f, err := os.Open(fmt.Sprintf("%s/%s/%s/%s", config.DirData, file.Name(), resolution, filenames[i])) if err != nil { cam.Error( "opening file error for file:", zap.String("filename", filenames[i]), zap.Error(err)) return } defer f.Close() // Read StreamID. var streamIDLen int32 if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil { cam.Error( "reading StreamID length error:", zap.String("filename", filenames[i]), zap.Error(err)) return } streamIDBytes := make([]byte, streamIDLen) if _, err := io.ReadFull(f, streamIDBytes); err != nil { cam.Error("reading StreamID error:", zap.String("filename", filenames[i]), zap.Error(err)) return } streamID := string(streamIDBytes) cam.Info( "cam with Stream ID started to convert:", zap.String("filename", filenames[i]), zap.String("stream_id", streamID)) // Read header of the file. var segLen int32 if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil { cam.Error( "reading header length error:", zap.String("filename", filenames[i]), zap.Error(err)) return } segData := make([]byte, segLen) if _, err := io.ReadFull(f, segData); err != nil { cam.Error("reading header error:", zap.String("filename", filenames[i]), zap.Error(err)) return } headerReader := bytes.NewReader(segData) headerSeg, err := readHeaderSegment(headerReader) if err != nil { cam.Error( "func readHeaderSegment error:", zap.String("filename", filenames[i]), zap.Error(err)) return } // Parse duration of a segment. per, err = strconv.Atoi(headerSeg.Duration) if err != nil { cam.Error("parsing duration error:", zap.String("filename", filenames[i]), zap.Error(err)) return } filename, _ := strings.CutSuffix(filenames[i], ".insit") tsFilename := fmt.Sprintf("%s/%s/%s/%s.ts", config.DirData, file.Name(), resolution, filename) currentMpegTSMuxer.FileName = tsFilename err = currentMpegTSMuxer.Initialize() if err != nil { cam.Error("init muxer error:", zap.String("filename", filenames[i]), zap.Error(err)) return } // Read segments. for { // Read segments length. var segLen int32 err := binary.Read(f, binary.LittleEndian, &segLen) if err != nil { if err == io.EOF { break } cam.Error( "read segments length error:", zap.String("filename", filenames[i]), zap.Error(err)) return } // Read segments data. segData = make([]byte, segLen) if _, err := io.ReadFull(f, segData); err != nil { cam.Error( "read segments error:", zap.String("filename", filenames[i]), zap.Error(err)) return } packetReader := bytes.NewReader(segData) packets, err := readPacketSegment(packetReader) if err != nil { cam.Error( "func readPacketSegment error:", zap.String("filename", filenames[i]), zap.Error(err)) return } segment.Packets = append(segment.Packets, packets...) for _, pkt := range packets { switch pkt.Type { case storage.PacketTypeH264: // Encode the access unit into MPEG-TS. err = currentMpegTSMuxer.WriteH264(pkt.Pts, pkt.H264AUs) if err != nil { cam.Warn( "write H264 packet error:", zap.String("filename", filenames[i]), zap.Error(err)) } case storage.PacketTypeLPCM: // Convert G711 to AAC. au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples) if err != nil { cam.Warn( "converting to AAC frame error:", zap.String("filename", filenames[i]), zap.Error(err)) } // Encode the access unit into MPEG-TS. err = currentMpegTSMuxer.WriteAAC([][]byte{au}, pkt.Pts) if err != nil { cam.Warn( "write G711 packet error:", zap.String("filename", filenames[i]), zap.Error(err)) } } } } currentMpegTSMuxer.Close() if storage.Exists(tsFilename) { time.Sleep(3 * time.Second) } } } } }() } select {} }