1054 lines
27 KiB
Go

package rtsp
import (
"errors"
"fmt"
"log"
"os"
"strings"
"time"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/storage"
"git.insit.tech/sas/rtsp_proxy/protos/gens"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/g711"
"github.com/pion/rtp"
_ "github.com/zaf/g711"
)
// StartWriter starts the program.
func StartWriter(dir string, period int, link string) error {
err := RTSP(dir, period, link)
if err != nil {
// Change domain if a camera was flipped to another domain.
err = changeDomain(dir, period, link, err)
if err != nil {
return fmt.Errorf("change domain error: %w", err)
}
}
return nil
}
// RTSP processes RTSP protocol.
func RTSP(dir string, period int, link string) error {
// Connect to the server.
c := gortsplib.Client{
UserAgent: "PSA",
}
u, err := base.ParseURL(link)
if err != nil {
return fmt.Errorf("parse URL error: %w", err)
}
err = c.Start(u.Scheme, u.Host)
if err != nil {
return fmt.Errorf("connect to the server error: %w", err)
}
defer c.Close()
desc, res, err := c.Describe(u)
if err != nil || desc == nil {
log.Printf("medias not found for camera [%s]: %v", link, err)
return nil
}
// Create file name structure and directory for files.
resolution := storage.FindResolution(res.Body)
resolutions := []string{resolution}
cuttedURI := storage.CutURI(link)
fn := storage.CreateFileName(dir, resolutions, cuttedURI, period)
err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755)
if err != nil {
return fmt.Errorf("mkdirall error: %w", err)
}
// Create M3U8 playlist.
go gens.MediaPlaylistGenerator(dir+"/data/"+cuttedURI, "", fn.Duration, resolutions)
// Find formats.
var audioFormat string
var videoFormat string
av1Format, av1Media, err := formats.FindAV1Format(desc)
if av1Format != nil {
log.Println("[av1]: format found")
videoFormat = "AV1"
} else {
log.Println(err)
}
g711Format, g711Media, err := formats.FindG711Format(desc)
if g711Format != nil {
log.Println("[g711]: format found")
audioFormat = "G711"
} else {
log.Println(err)
}
h264Format, h264Media, err := formats.FindH264Format(desc)
if h264Format != nil {
log.Println("[h264]: format found")
videoFormat = "H264"
} else {
log.Println(err)
}
aacFormat, aacMedia, err := formats.FindAACFormat(desc)
if aacFormat != nil {
log.Println("[aac]: format found")
audioFormat = "AAC"
} else {
log.Println(err)
}
h265Format, h265Media, err := formats.FindH265Format(desc)
if h265Format != nil {
log.Println("[h265]: format found")
videoFormat = "H265"
} else {
log.Println(err)
}
lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc)
if lpcmFormat != nil {
log.Println("[lpcm]: format found")
audioFormat = "LPCM"
} else {
log.Println(err)
}
mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc)
if mjpegFormat != nil {
log.Println("[mjpeg]: format found")
videoFormat = "MJPEG"
} else {
log.Println(err)
}
opusFormat, opusMedia, err := formats.FindOPUSFormat(desc)
if opusFormat != nil {
log.Println("[opus]: format found")
audioFormat = "OPUS"
} else {
log.Println(err)
}
vp8Format, vp8Media, err := formats.FindVP8Format(desc)
if vp8Format != nil {
log.Println("[vp8]: format found")
videoFormat = "VP8"
} else {
log.Println(err)
}
vp9Format, vp9Media, err := formats.FindVP9Format(desc)
if vp9Format != nil {
log.Println("[vp9]: format found")
videoFormat = "VP9"
} else {
log.Println(err)
}
// Start program according to gotten formats.
switch {
case videoFormat == "AV1" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
av1RTPDec, err := av1Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
av1Dec := &formats.AV1Decoder{}
err = av1Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer av1Dec.Close()
// Setup media.
_, err = c.Setup(desc.BaseURL, av1Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
firstRandomReceived := false
// Process input rtp packets.
c.OnPacketRTP(av1Media, av1Format, func(pkt *rtp.Packet) {
// Process AV1 flow and return PTS and IMG.
pts, img, err := formats.ProcessAV1(&c, av1Media, av1RTPDec, pkt, av1Dec, firstRandomReceived, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "G711":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
g711RTPDec, err := g711Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, g711Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w\n", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(g711Media, g711Format, func(pkt *rtp.Packet) {
// Process G711 flow and return PTS and AU.
pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
// Decode samples (these are 16-bit, big endian LPCM samples).
if g711Format.MULaw {
g711.DecodeMulaw(au)
} else {
g711.DecodeAlaw(au)
}
log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(au))
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "AAC":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v-%v]: start recording", videoFormat, audioFormat)
// Create decoders.
h264RTPDec, err := h264Format.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
aacRTPDec, err := aacFormat.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
// Setup MPEG-TS muxer.
currentMpegtsMuxer := formats.MpegtsMuxer{
FileName: fn.SetNumNTime(),
H264Format: h264Format,
Mpeg4AudioFormat: aacFormat,
}
err = currentMpegtsMuxer.Initialize()
if err != nil {
return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
}
// Setup all medias.
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
switch forma.(type) {
case *format.H264:
// Process H264 flow and return PTS and AU.
pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
// Encode the access unit into MPEG-TS.
err = currentMpegtsMuxer.WriteH264(au, pts)
if err != nil {
return
}
case *format.MPEG4Audio:
// Process AAC flow and return PTS and AUS.
pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
// Encode access units into MPEG-TS.
err = currentMpegtsMuxer.WriteAAC(aus, pts)
if err != nil {
return
}
}
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
currentMpegtsMuxer.Close()
currentMpegtsMuxer.FileName = fn.SetNumNTime()
err = currentMpegtsMuxer.Initialize()
if err != nil {
log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err)
return
}
log.Printf("[%v-%v]: new file for recording created: %v",
videoFormat, audioFormat, currentMpegtsMuxer.FileName)
}
}()
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "G711":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v-%v]: start recording", videoFormat, audioFormat)
// Create decoders.
h264RTPDec, err := h264Format.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
g711RTPDec, err := g711Format.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
// Setup MPEG-TS muxer.
var aacFormat *format.MPEG4Audio
currentMpegtsMuxer := formats.MpegtsMuxer{
FileName: fn.SetNumNTime(),
H264Format: h264Format,
Mpeg4AudioFormat: aacFormat,
}
err = currentMpegtsMuxer.Initialize()
if err != nil {
return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
}
// Setup all medias.
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
switch f := forma.(type) {
case *format.H264:
// Process H264 flow and return PTS and AU.
pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
// Encode the access unit into MPEG-TS.
err = currentMpegtsMuxer.WriteH264(au, pts)
if err != nil {
return
}
case *format.G711:
// Process G711 flow and returns PTS and AU.
pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
// Convert G711 to AAC.
au, err = formats.ConvertG711ToAAC(au, f.MULaw)
if err != nil {
log.Printf("[%v-%v]: converting to AAC frame error: %v\n", videoFormat, audioFormat, err)
}
// Encode the access unit into MPEG-TS.
err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pts)
if err != nil {
return
}
}
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
currentMpegtsMuxer.Close()
currentMpegtsMuxer.FileName = fn.SetNumNTime()
err = currentMpegtsMuxer.Initialize()
if err != nil {
log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err)
return
}
log.Printf("[%v-%v]: new file for recording created: %v",
videoFormat, audioFormat, currentMpegtsMuxer.FileName)
}
}()
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
h264RTPDec, err := h264Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup H264 -> RGBA decoder.
h264Dec := &formats.H264Decoder{}
err = h264Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer h264Dec.Close()
// if SPS and PPS are present into the SDP, send them to the decoder
if h264Format.SPS != nil {
h264Dec.Decode([][]byte{h264Format.SPS})
}
if h264Format.PPS != nil {
h264Dec.Decode([][]byte{h264Format.PPS})
}
// Setup media.
_, err = c.Setup(desc.BaseURL, h264Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
firstRandomAccess := false
// Process input rtp packets.
c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) {
// Process H264 flow and return PTS and IMG.
pts, img, err := formats.ProcessH264RGBA(
&c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "H265" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
h265RTPDec, err := h265Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup H264 -> RGBA decoder.
h265Dec := &formats.H265Decoder{}
err = h265Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer h265Dec.Close()
// If VPS, SPS and PPS are present into the SDP, send them to the decoder.
if h265Format.VPS != nil {
h265Dec.Decode([][]byte{h265Format.VPS})
}
if h265Format.SPS != nil {
h265Dec.Decode([][]byte{h265Format.SPS})
}
if h265Format.PPS != nil {
h265Dec.Decode([][]byte{h265Format.PPS})
}
// Setup media.
_, err = c.Setup(desc.BaseURL, h265Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
firstRandomAccess := false
// Process input rtp packets.
c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) {
// Process H265 flow and return PTS and IMG.
pts, img, err := formats.ProcessH265RGBA(
&c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "LPCM":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
lpcmRTPDec, err := lpcmFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0)
if err != nil {
log.Printf("[%v]: setup media error: %v\n", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) {
// Process LPCM flow and return PTS and SAMPLES.
pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples))
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "MJPEG" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
mjpegRTPDec, err := mjpegFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) {
// Process MJPEG flow and return PTS and IMG.
pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "AAC":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
aacRTPDec, err := aacFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, aacMedia, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) {
// Process AAC flow and return PTS and AUS.
pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
for _, au := range aus {
log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au))
}
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "OPUS":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
opusRTPDec, err := opusFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, opusMedia, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) {
// Process OPUS flow and return PTS and OP.
pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op))
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "VP8" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
vp8RTPDec, err := vp8Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup VP8 -> RGBA decoder.
vp8Dec := &formats.VPDecoder{}
err = vp8Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer vp8Dec.Close()
// Setup media.
_, err = c.Setup(desc.BaseURL, vp8Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) {
// Process VP8 flow and return PTS and IMG.
pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "VP9" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
vp9RTPDec, err := vp9Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup VP9 -> RGBA decoder.
vp9Dec := &formats.VPDecoder{}
err = vp9Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer vp9Dec.Close()
// Setup media.
_, err = c.Setup(desc.BaseURL, vp9Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) {
// Process VP9 flow and return PTS and IMG.
pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
}
return nil
}
// changeDomain changes domain if a camera was flipped to another domain.
func changeDomain(dir string, period int, link string, err error) error {
err2 := errors.New("404 (Not found)")
if errors.As(err, &err2) {
if strings.Contains(link, "video-1") {
err = RTSP(dir, period, strings.Replace(link, "video-1", "video-2", 1))
if err != nil {
return err
}
} else {
err = RTSP(dir, period, strings.Replace(link, "video-2", "video-1", 1))
if err != nil {
return err
}
}
} else {
panic(err)
}
return nil
}