1136 lines
30 KiB
Go

package rtsp
import (
"encoding/binary"
"errors"
"fmt"
"go.uber.org/zap"
"log"
"os"
"strconv"
"strings"
"time"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats"
"git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/g711"
"github.com/pion/rtp"
)
// StartWriter starts the program.
func StartWriter(dir string, period int, link string, Log *zap.Logger) error {
err := RTSP(dir, period, link, Log)
if err != nil {
// Change domain if a camera was flipped to another domain.
err = changeDomain(dir, period, link, err, Log)
if err != nil {
return fmt.Errorf("change domain error: %w", err)
}
}
return nil
}
// RTSP processes RTSP protocol.
func RTSP(dir string, period int, link string) error {
// Connect to the server.
c := gortsplib.Client{
UserAgent: "PSA",
}
u, err := base.ParseURL(link)
if err != nil {
return fmt.Errorf("parse URL error: %w", err)
}
err = c.Start(u.Scheme, u.Host)
if err != nil {
return fmt.Errorf("connect to the server error: %w", err)
}
defer c.Close()
desc, res, err := c.Describe(u)
if err != nil || desc == nil {
log.Printf("medias not found for camera [%s]: %v", link, err)
return nil
}
// Create file name structure and directory for files.
resolution := findResolution(res.Body)
resolutions := []string{resolution}
cutURI := cutURI(link)
fn := storage.CreateFileName(dir, resolutions, cutURI, period)
err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755)
if err != nil {
return fmt.Errorf("mkdirall error: %w", err)
}
// Find formats.
var audioFormat string
var videoFormat string
av1Format, av1Media, err := formats.FindAV1Format(desc)
if av1Format != nil {
log.Println("[av1]: format found")
videoFormat = "AV1"
} else {
log.Println(err)
}
g711Format, g711Media, err := formats.FindG711Format(desc)
if g711Format != nil {
log.Println("[g711]: format found")
audioFormat = "G711"
} else {
log.Println(err)
}
h264Format, h264Media, err := formats.FindH264Format(desc)
if h264Format != nil {
log.Println("[h264]: format found")
videoFormat = "H264"
} else {
log.Println(err)
}
aacFormat, aacMedia, err := formats.FindAACFormat(desc)
if aacFormat != nil {
log.Println("[aac]: format found")
audioFormat = "AAC"
} else {
log.Println(err)
}
h265Format, h265Media, err := formats.FindH265Format(desc)
if h265Format != nil {
log.Println("[h265]: format found")
videoFormat = "H265"
} else {
log.Println(err)
}
lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc)
if lpcmFormat != nil {
log.Println("[lpcm]: format found")
audioFormat = "LPCM"
} else {
log.Println(err)
}
mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc)
if mjpegFormat != nil {
log.Println("[mjpeg]: format found")
videoFormat = "MJPEG"
} else {
log.Println(err)
}
opusFormat, opusMedia, err := formats.FindOPUSFormat(desc)
if opusFormat != nil {
log.Println("[opus]: format found")
audioFormat = "OPUS"
} else {
log.Println(err)
}
vp8Format, vp8Media, err := formats.FindVP8Format(desc)
if vp8Format != nil {
log.Println("[vp8]: format found")
videoFormat = "VP8"
} else {
log.Println(err)
}
vp9Format, vp9Media, err := formats.FindVP9Format(desc)
if vp9Format != nil {
log.Println("[vp9]: format found")
videoFormat = "VP9"
} else {
log.Println(err)
}
// Start program according to gotten formats.
switch {
case videoFormat == "AV1" && audioFormat == "":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
av1RTPDec, err := av1Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
av1Dec := &formats.AV1Decoder{}
err = av1Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer av1Dec.Close()
// Setup media.
_, err = c.Setup(desc.BaseURL, av1Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
firstRandomReceived := false
// Process input rtp packets.
c.OnPacketRTP(av1Media, av1Format, func(pkt *rtp.Packet) {
// Process AV1 flow and return PTS and IMG.
pts, img, err := formats.ProcessAV1(&c, av1Media, av1RTPDec, pkt, av1Dec, firstRandomReceived, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "G711":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
g711RTPDec, err := g711Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, g711Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w\n", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(g711Media, g711Format, func(pkt *rtp.Packet) {
// Process G711 flow and return PTS and AU.
pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
// Decode samples (these are 16-bit, big endian LPCM samples).
if g711Format.MULaw {
g711.DecodeMulaw(au)
} else {
g711.DecodeAlaw(au)
}
log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(au))
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "AAC":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v-%v]: start recording", videoFormat, audioFormat)
// Create decoders.
//h264RTPDec, err := h264Format.CreateDecoder()
//if err != nil {
// log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
//}
//
//aacRTPDec, err := aacFormat.CreateDecoder()
//if err != nil {
// log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
//}
//// Setup MPEG-TS muxer.
//currentMpegtsMuxer := formats.MpegtsMuxer{
// FileName: fn.SetNumNTime(),
// H264Format: h264Format,
// Mpeg4AudioFormat: aacFormat,
//}
//err = currentMpegtsMuxer.Initialize()
//if err != nil {
// return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
//}
// Setup all medias.
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
switch forma.(type) {
case *format.H264:
//// Process H264 flow and return PTS and AU.
//pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat)
//if err != nil {
// log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
// return
//}
//
//// Encode the access unit into MPEG-TS.
//err = currentMpegtsMuxer.WriteH264(pts, au)
//if err != nil {
// return
//}
case *format.MPEG4Audio:
//// Process AAC flow and return PTS and AUS.
//pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat)
//if err != nil {
// log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
// return
//}
//
//// Encode access units into MPEG-TS.
//err = currentMpegtsMuxer.WriteAAC(aus, pts)
//if err != nil {
// return
//}
}
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
//// Logic for rotation files.
//currentMpegtsMuxer.Close()
//currentMpegtsMuxer.FileName = fn.SetNumNTime()
//
//err = currentMpegtsMuxer.Initialize()
//if err != nil {
// log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err)
// return
//}
//
//log.Printf("[%v-%v]: new file for recording created: %v",
// videoFormat, audioFormat, currentMpegtsMuxer.FileName)
}
}()
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "G711" || videoFormat == "H264" && audioFormat == "":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v-%v]: start recording", videoFormat, audioFormat)
// Create decoders.
h264RTPDec, err := h264Format.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
g711RTPDec, err := g711Format.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
//// Setup MPEG-TS muxer.
//var aacFormat *format.MPEG4Audio
//
//currentMpegtsMuxer := formats.MpegtsMuxer{
// FileName: fn.SetNumNTime(),
// H264Format: h264Format,
// Mpeg4AudioFormat: aacFormat,
//}
//err = currentMpegtsMuxer.Initialize()
//if err != nil {
// return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
//}
// Setup all medias.
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err)
}
file, err := os.Create(fn.SetNumNTime("insit"))
if err != nil {
fmt.Println("creating file error:", err)
}
defer file.Close()
seg := storage.Segment{
Date: time.Now().Format("15-04-05_02-01-2006"),
Duration: strconv.Itoa(period),
Packet: storage.InterleavedPacket{},
}
// Write StreamID.
if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil {
fmt.Println("write StreamID length error:", err)
}
if _, err := file.Write([]byte(cutURI)); err != nil {
fmt.Println("write StreamID error:", err)
}
// Write header of the file.
err = storage.WriteHeader(file, seg)
if err != nil {
return fmt.Errorf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
switch f := forma.(type) {
case *format.H264:
// Process H264 flow and return PTS and AU.
pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
if au != nil {
// Add appropriate lines to the interleaved packet.
seg.Packet.Type = storage.PacketTypeH264
seg.Packet.Pts = pts
seg.Packet.H264AUs = au
// Write segment with interleaved packets.
if err := storage.WriteInterleavedPacket(file, seg); err != nil {
fmt.Println("write segment error:", err)
return
}
}
//// Encode the access unit into MPEG-TS.
//err = currentMpegtsMuxer.WriteH264(pts, au)
//if err != nil {
// return
//}
case *format.G711:
// Process G711 flow and returns PTS and AU.
pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
if au != nil {
// Convert G711 to LPCM.
lpcmSamples := formats.ConvertG711ToLPCM(au, f.MULaw)
// Add appropriate lines to the interleaved packet.
seg.Packet.Type = storage.PacketTypeLPCM
seg.Packet.Pts = pts
seg.Packet.LPCMSamples = lpcmSamples
// Write segment with interleaved packets.
if err := storage.WriteInterleavedPacket(file, seg); err != nil {
fmt.Println("write segment error:", err)
return
}
}
//// Convert G711 to AAC.
//au, err = formats.ConvertLPCMToAAC(lpcmSamples)
//if err != nil {
// log.Printf("[%v-%v]: converting to AAC frame error: %v\n", videoFormat, audioFormat, err)
//}
//
//// Encode the access unit into MPEG-TS.
//err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pts)
//if err != nil {
// return
}
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
/*
// Logic for rotation files.
currentMpegtsMuxer.Close()
currentMpegtsMuxer.FileName = fn.SetNumNTime()
err = currentMpegtsMuxer.Initialize()
if err != nil {
log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err)
return
}
*/
file.Close()
file, err = os.Create(fn.SetNumNTime("insit"))
if err != nil {
fmt.Println("creating file error:", err)
}
seg = storage.Segment{
Date: time.Now().Format("15-04-05_02-01-2006"),
Duration: strconv.Itoa(period),
Packet: storage.InterleavedPacket{},
}
// Write StreamID.
if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil {
fmt.Println("write StreamID length error:", err)
}
if _, err := file.Write([]byte(cutURI)); err != nil {
fmt.Println("write StreamID error:", err)
}
// Write header of the file.
err = storage.WriteHeader(file, seg)
if err != nil {
log.Printf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err)
}
log.Printf("[%v-%v]: new file for recording created: %v",
videoFormat, audioFormat, seg.Date+".insit")
}
}()
panic(c.Wait())
case videoFormat == "H264-" && audioFormat == "":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
h264RTPDec, err := h264Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup H264 -> RGBA decoder.
h264Dec := &formats.H264Decoder{}
err = h264Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer h264Dec.Close()
// if SPS and PPS are present into the SDP, send them to the decoder
if h264Format.SPS != nil {
h264Dec.Decode([][]byte{h264Format.SPS})
}
if h264Format.PPS != nil {
h264Dec.Decode([][]byte{h264Format.PPS})
}
// Setup media.
_, err = c.Setup(desc.BaseURL, h264Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
firstRandomAccess := false
// Process input rtp packets.
c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) {
// Process H264 flow and return PTS and IMG.
pts, img, err := formats.ProcessH264RGBA(
&c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "H265" && audioFormat == "":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
h265RTPDec, err := h265Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup H264 -> RGBA decoder.
h265Dec := &formats.H265Decoder{}
err = h265Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer h265Dec.Close()
// If VPS, SPS and PPS are present into the SDP, send them to the decoder.
if h265Format.VPS != nil {
h265Dec.Decode([][]byte{h265Format.VPS})
}
if h265Format.SPS != nil {
h265Dec.Decode([][]byte{h265Format.SPS})
}
if h265Format.PPS != nil {
h265Dec.Decode([][]byte{h265Format.PPS})
}
// Setup media.
_, err = c.Setup(desc.BaseURL, h265Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
firstRandomAccess := false
// Process input rtp packets.
c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) {
// Process H265 flow and return PTS and IMG.
pts, img, err := formats.ProcessH265RGBA(
&c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "LPCM":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
lpcmRTPDec, err := lpcmFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0)
if err != nil {
log.Printf("[%v]: setup media error: %v\n", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) {
// Process LPCM flow and return PTS and SAMPLES.
pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples))
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "MJPEG" && audioFormat == "":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
mjpegRTPDec, err := mjpegFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) {
// Process MJPEG flow and return PTS and IMG.
pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "AAC":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
aacRTPDec, err := aacFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, aacMedia, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) {
// Process AAC flow and return PTS and AUS.
pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
for _, au := range aus {
log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au))
}
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "OPUS":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
opusRTPDec, err := opusFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, opusMedia, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) {
// Process OPUS flow and return PTS and OP.
pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op))
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "VP8" && audioFormat == "":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
vp8RTPDec, err := vp8Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup VP8 -> RGBA decoder.
vp8Dec := &formats.VPDecoder{}
err = vp8Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer vp8Dec.Close()
// Setup media.
_, err = c.Setup(desc.BaseURL, vp8Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) {
// Process VP8 flow and return PTS and IMG.
pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "VP9" && audioFormat == "":
// Wait for the next period.
waitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
vp9RTPDec, err := vp9Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup VP9 -> RGBA decoder.
vp9Dec := &formats.VPDecoder{}
err = vp9Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer vp9Dec.Close()
// Setup media.
_, err = c.Setup(desc.BaseURL, vp9Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) {
// Process VP9 flow and return PTS and IMG.
pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
}
return nil
}
// changeDomain changes domain if a camera was flipped to another domain.
func changeDomain(dir string, period int, link string, err error, Log *zap.Logger) error {
err2 := errors.New("404 (Not found)")
if errors.As(err, &err2) {
if strings.Contains(link, "video-1") {
err = RTSP(dir, period, strings.Replace(link, "video-1", "video-2", 1), Log)
if err != nil {
return err
}
} else {
err = RTSP(dir, period, strings.Replace(link, "video-2", "video-1", 1), Log)
if err != nil {
return err
}
}
} else {
panic(err)
}
return nil
}