2025-04-07 17:58:23 +05:00

220 lines
6.5 KiB
Go

package unpacker
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"os"
"reader/internal/config"
"strconv"
"strings"
"time"
"git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage"
"git.insit.tech/sas/rtsp_proxy/core/gen"
log2 "git.insit.tech/sas/rtsp_proxy/core/log"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"go.uber.org/zap"
logger "reader/internal/log"
"reader/internal/processor"
)
// CreateVideo generate TS files and M3U8 playlists.
func CreateVideo() error {
exist := storage.Exists(config.DirData)
if !exist {
return errors.New("directory does not exist")
}
// Read directory.
files, err := storage.ReadDir(config.DirData)
if err != nil {
return err
}
for _, file := range files {
// Create logger.
cam := log2.CamLogging(
fmt.Sprintf("%s/%s/log/reader-cam_%s.log", config.DirData, file.Name(), strconv.FormatInt(time.Now().Unix(), 10)))
res, err := storage.ReadDir(fmt.Sprintf("%s/%s", config.DirData, file.Name()))
if err != nil {
cam.Error(
"error reading directory",
zap.String("dir", fmt.Sprintf("%s/%s", config.DirData, file.Name())), zap.Error(err))
return err
}
resolutions := make([]string, 0)
for _, r := range res {
if r.IsDir() && r.Name() != "log" {
resolutions = append(resolutions, r.Name())
}
}
logger.Log.Info("start process camera:", zap.String("cam_name", file.Name()))
log.Println("start process camera: ", file.Name())
// Establish starting period.
per := 60
period := time.Duration(per)
// Create M3U8 playlist.
go gen.MediaPlaylistGenerator(config.DirData, file.Name(), float64(period), resolutions, cam)
go func() {
for {
for _, resolution := range resolutions {
filenames := gen.StringDirEntryList(
fmt.Sprintf("%s/%s/%s", config.DirData, file.Name(), resolution), "insit", logger.Log)
if len(filenames) == 0 {
break
}
// Setup MPEG-TS muxer.
var h264Format format.H264
var aacFormat format.MPEG4Audio
currentMpegTSMuxer := processor.MpegTSMuxer{
H264Format: &h264Format,
Mpeg4AudioFormat: &aacFormat,
}
for i := len(filenames) - 4; i < len(filenames)-1; i++ {
segment := storage.Segment{}
// Open file for reading.
f, err := os.Open(fmt.Sprintf("%s/%s/%s/%s", config.DirData, file.Name(), resolution, filenames[i]))
if err != nil {
cam.Error(
"opening file error for file:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
defer f.Close()
// Read StreamID.
var streamIDLen int32
if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil {
cam.Error(
"reading StreamID length error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
streamIDBytes := make([]byte, streamIDLen)
if _, err := io.ReadFull(f, streamIDBytes); err != nil {
cam.Error("reading StreamID error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
streamID := string(streamIDBytes)
cam.Info(
"cam with Stream ID started to convert:",
zap.String("filename", filenames[i]),
zap.String("stream_id", streamID))
// Read header of the file.
var segLen int32
if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil {
cam.Error(
"reading header length error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
segData := make([]byte, segLen)
if _, err := io.ReadFull(f, segData); err != nil {
cam.Error("reading header error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
headerReader := bytes.NewReader(segData)
headerSeg, err := readHeaderSegment(headerReader)
if err != nil {
cam.Error(
"func readHeaderSegment error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
// Parse duration of a segment.
per, err = strconv.Atoi(headerSeg.Duration)
if err != nil {
cam.Error("parsing duration error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
filename, _ := strings.CutSuffix(filenames[i], ".insit")
tsFilename := fmt.Sprintf("%s/%s/%s/%s.ts", config.DirData, file.Name(), resolution, filename)
currentMpegTSMuxer.FileName = tsFilename
err = currentMpegTSMuxer.Initialize()
if err != nil {
cam.Error("init muxer error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
// Read segments.
for {
// Read segments length.
var segLen int32
err := binary.Read(f, binary.LittleEndian, &segLen)
if err != nil {
if err == io.EOF {
break
}
cam.Error(
"read segments length error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
// Read segments data.
segData = make([]byte, segLen)
if _, err := io.ReadFull(f, segData); err != nil {
cam.Error(
"read segments error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
packetReader := bytes.NewReader(segData)
packets, err := readPacketSegment(packetReader)
if err != nil {
cam.Error(
"func readPacketSegment error:", zap.String("filename", filenames[i]), zap.Error(err))
return
}
segment.Packets = append(segment.Packets, packets...)
for _, pkt := range packets {
switch pkt.Type {
case storage.PacketTypeH264:
// Encode the access unit into MPEG-TS.
err = currentMpegTSMuxer.WriteH264(pkt.Pts, pkt.H264AUs)
if err != nil {
cam.Warn(
"write H264 packet error:", zap.String("filename", filenames[i]), zap.Error(err))
}
case storage.PacketTypeLPCM:
// Convert G711 to AAC.
au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples)
if err != nil {
cam.Warn(
"converting to AAC frame error:", zap.String("filename", filenames[i]), zap.Error(err))
}
// Encode the access unit into MPEG-TS.
err = currentMpegTSMuxer.WriteAAC([][]byte{au}, pkt.Pts)
if err != nil {
cam.Warn(
"write G711 packet error:", zap.String("filename", filenames[i]), zap.Error(err))
}
}
}
}
currentMpegTSMuxer.Close()
if storage.Exists(tsFilename) {
time.Sleep(period / 15 * time.Second)
}
}
}
}
}()
}
select {}
}