191 lines
5.0 KiB
Go

package procRTSP
import (
"fmt"
"git.insit.tech/sas/rtsp_proxy/protos/gens"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm"
"github.com/pion/rtp"
_ "github.com/zaf/g711"
"log"
"time"
"writer/internal/config"
"writer/internal/media"
"writer/pkg/converter"
)
var (
resolutions = []string{"1280x720"}
h264RTPDec *rtph264.Decoder
g711RTPDec *rtplpcm.Decoder
)
// ProcRTSP process RTSP protocol and writes H264 and PCM flows into TS container.
func ProcRTSP(period int, URI string) error {
// Create FileName structure
fn := config.CreateFileName(resolutions, period)
////////////////////////////////////////////////////////////////////////////////////////
// Create M3U8 playlist.
go gens.MediaPlaylistGenerator("/home/psa/GoRepository/data", "", fn.Duration, resolutions)
////////////////////////////////////////////////////////////////////////////////////////
// Initialise client.
c := gortsplib.Client{
UserAgent: "PSA",
}
// Parse URL.
u, err := base.ParseURL(URI)
if err != nil {
return fmt.Errorf("parse URL error: %w", err)
}
// Connect to the server.
err = c.Start(u.Scheme, u.Host)
if err != nil {
return fmt.Errorf("connect to the server error: %w", err)
}
defer c.Close()
// Find available medias.
desc, _, err := c.Describe(u)
if err != nil || desc == nil {
return fmt.Errorf("medias not found: %w", err)
}
////////////////////////////////////////////////////////////////////////////////////////
// Find the H264 media and format.
h264Format, h264Media, err := media.CheckH264Format(desc)
// Find the G711 media and format.
g711Format, g711Media, err := media.CheckG711Format(desc)
// Initialising variable for AAC.
var mpeg4AudioFormat *format.MPEG4Audio
////////////////////////////////////////////////////////////////////////////////////////
// Create RTP -> H264 decoder.
if h264Format != nil {
h264RTPDec, err = h264Format.CreateDecoder()
if err != nil {
return fmt.Errorf("create H264 decoder error: %w", err)
}
}
// Create RTP -> H264 decoder.
if g711Format != nil {
g711RTPDec, err = g711Format.CreateDecoder()
if err != nil {
return fmt.Errorf("create G711 decoder error: %w", err)
}
}
////////////////////////////////////////////////////////////////////////////////////////
// Wait for the next period.
config.Waiter(period)
log.Println("Start recording")
////////////////////////////////////////////////////////////////////////////////////////
// Setup MPEG-TS muxer.
currentMpegtsMuxer := &converter.MpegtsMuxer{
FileName: fn.SetNumNTime(),
H264Format: h264Format,
Mpeg4AudioFormat: mpeg4AudioFormat,
}
err = currentMpegtsMuxer.Initialize(resolutions)
if err != nil {
panic(err)
}
defer currentMpegtsMuxer.Close()
////////////////////////////////////////////////////////////////////////////////////////
// Setup all medias.
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
panic(err)
}
////////////////////////////////////////////////////////////////////////////////////////
// Called when a H264/RTP or G711/RTP packet arrives.
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
switch f := forma.(type) {
case *format.H264:
// Process H264 flow and return PTS and AU.
pts, au, err := media.ProcH264(&c, h264Media, h264RTPDec, pkt)
if err != nil {
fmt.Printf("process G711 error: %s\n", err)
}
// Encode the access unit into MPEG-TS.
err = currentMpegtsMuxer.WriteH264(au, pts)
if err != nil {
log.Printf("writing H264 packet: %s\n", err)
}
case *format.G711:
// Process G711 flow and returns PTS and AU.
_, au, err := media.ProcG711(&c, g711Media, g711RTPDec, pkt)
if err != nil {
fmt.Printf("process G711 error: %s\n", err)
}
// Convert G711 to AAC.
_, err = converter.ConvertG711ToAAC(au, f.MULaw) // take aacAu
if err != nil {
log.Printf("converting G711 to AAC frame: %s\n", err)
}
/*
// Encode the access unit into MPEG-TS.
err = MpegtsMuxer.writeMPEG4Audio([][]byte{aacAu}, pts)
if err != nil {
log.Printf("MPEG-TS write error: %v", err)
return
}
*/
}
})
////////////////////////////////////////////////////////////////////////////////////////
// Send PLAY request.
_, err = c.Play(nil)
if err != nil {
log.Fatalln("Ошибка запуска воспроизведения:", err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Minute)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
currentMpegtsMuxer.Close()
currentMpegtsMuxer.FileName = fn.SetNumNTime()
err = currentMpegtsMuxer.Initialize(resolutions)
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.FileName)
}
}()
panic(c.Wait())
}