159 lines
4.3 KiB
Go
159 lines
4.3 KiB
Go
package unpacker
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage"
|
|
"git.insit.tech/sas/rtsp_proxy/core/gen"
|
|
"git.insit.tech/sas/rtsp_proxy/proto/common"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
|
"go.uber.org/zap"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"reader/internal/processor"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// CreateFlow generate TS files and M3U8 playlists.
|
|
func CreateFlow(dir string, cutURI string, fn *common.FileName, resolutions []string, Log *zap.Logger) {
|
|
fmt.Println("Reader started")
|
|
|
|
// Establish starting period.
|
|
per := 60
|
|
period := time.Duration(per)
|
|
|
|
// Create M3U8 playlist.
|
|
go gen.MediaPlaylistGenerator(dir+"/data", cutURI, fn.Duration, resolutions, Log)
|
|
|
|
for {
|
|
time.Sleep(period * time.Second)
|
|
for _, resolution := range resolutions {
|
|
filenames := gen.StringDirEntryList(dir+"/data/"+cutURI+"/"+resolution, "insit", Log)
|
|
|
|
for i := len(filenames) - 2; i < len(filenames)-1; i++ {
|
|
|
|
segment := storage.Segment{}
|
|
|
|
// Open file for reading.
|
|
f, err := os.Open(dir + "/data/" + cutURI + "/" + resolution + "/" + filenames[i])
|
|
if err != nil {
|
|
fmt.Println("opening file error: ", err)
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
// Read StreamID.
|
|
var streamIDLen int32
|
|
if err := binary.Read(f, binary.LittleEndian, &streamIDLen); err != nil {
|
|
fmt.Println("reading StreamID length error: ", err)
|
|
return
|
|
}
|
|
streamIDBytes := make([]byte, streamIDLen)
|
|
if _, err := io.ReadFull(f, streamIDBytes); err != nil {
|
|
fmt.Println("reading StreamID error: ", err)
|
|
return
|
|
}
|
|
streamID := string(streamIDBytes)
|
|
fmt.Println("Stream ID:", streamID)
|
|
|
|
// Read header of the file.
|
|
var segLen int32
|
|
if err := binary.Read(f, binary.LittleEndian, &segLen); err != nil {
|
|
fmt.Println("reading header length error: ", err)
|
|
return
|
|
}
|
|
segData := make([]byte, segLen)
|
|
if _, err := io.ReadFull(f, segData); err != nil {
|
|
fmt.Println("reading header error: ", err)
|
|
return
|
|
}
|
|
headerReader := bytes.NewReader(segData)
|
|
headerSeg, err := readHeaderSegment(headerReader)
|
|
if err != nil {
|
|
fmt.Println("func readHeaderSegment error: ", err)
|
|
return
|
|
}
|
|
|
|
// Parse duration of a segment.
|
|
per, err = strconv.Atoi(headerSeg.Duration)
|
|
if err != nil {
|
|
fmt.Println("parsing duration error: ", err)
|
|
}
|
|
|
|
// Setup MPEG-TS muxer.
|
|
var h264Format format.H264
|
|
var aacFormat format.MPEG4Audio
|
|
|
|
filename, _ := strings.CutSuffix(filenames[i], ".insit")
|
|
|
|
currentMpegtsMuxer := processor.MpegtsMuxer{
|
|
FileName: dir + "/data/" + cutURI + "/" + resolution + "/" + filename + ".ts",
|
|
H264Format: &h264Format,
|
|
Mpeg4AudioFormat: &aacFormat,
|
|
}
|
|
|
|
err = currentMpegtsMuxer.Initialize()
|
|
if err != nil {
|
|
fmt.Printf("init muxer error: %w\n", err)
|
|
}
|
|
|
|
// Read segments.
|
|
for {
|
|
// Read segments length.
|
|
var segLen int32
|
|
err := binary.Read(f, binary.LittleEndian, &segLen)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
fmt.Println("read segments length error: ", err)
|
|
return
|
|
}
|
|
// Read segments data.
|
|
segData = make([]byte, segLen)
|
|
if _, err := io.ReadFull(f, segData); err != nil {
|
|
fmt.Println("read segments error: ", err)
|
|
return
|
|
}
|
|
packetReader := bytes.NewReader(segData)
|
|
packets, err := readPacketSegment(packetReader)
|
|
if err != nil {
|
|
fmt.Println("func readPacketSegment error: ", err)
|
|
return
|
|
}
|
|
|
|
segment.Packets = append(segment.Packets, packets...)
|
|
|
|
for _, pkt := range packets {
|
|
switch pkt.Type {
|
|
case storage.PacketTypeH264:
|
|
// Encode the access unit into MPEG-TS.
|
|
err = currentMpegtsMuxer.WriteH264(pkt.Pts, pkt.H264AUs)
|
|
if err != nil {
|
|
log.Printf("write H264 packet error: %v\n", err)
|
|
}
|
|
case storage.PacketTypeLPCM:
|
|
// Convert G711 to AAC.
|
|
au, err := processor.ConvertLPCMToAAC(pkt.LPCMSamples)
|
|
if err != nil {
|
|
log.Printf("converting to AAC frame error: %v\n", err)
|
|
}
|
|
|
|
// Encode the access unit into MPEG-TS.
|
|
err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pkt.Pts)
|
|
if err != nil {
|
|
log.Printf("write G711 packet error: %v\n", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
currentMpegtsMuxer.Close()
|
|
}
|
|
}
|
|
}
|
|
}
|