1141 lines
31 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rtsp
import (
"encoding/binary"
"errors"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats"
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/storage"
"git.insit.tech/sas/rtsp_proxy/protos/gens"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/g711"
"github.com/pion/rtp"
_ "github.com/zaf/g711"
)
// StartWriter starts the program.
func StartWriter(dir string, period int, link string) error {
err := RTSP(dir, period, link)
if err != nil {
// Change domain if a camera was flipped to another domain.
err = changeDomain(dir, period, link, err)
if err != nil {
return fmt.Errorf("change domain error: %w", err)
}
}
return nil
}
// RTSP processes RTSP protocol.
func RTSP(dir string, period int, link string) error {
// Connect to the server.
c := gortsplib.Client{
UserAgent: "PSA",
}
u, err := base.ParseURL(link)
if err != nil {
return fmt.Errorf("parse URL error: %w", err)
}
err = c.Start(u.Scheme, u.Host)
if err != nil {
return fmt.Errorf("connect to the server error: %w", err)
}
defer c.Close()
desc, res, err := c.Describe(u)
if err != nil || desc == nil {
log.Printf("medias not found for camera [%s]: %v", link, err)
return nil
}
// Create file name structure and directory for files.
resolution := storage.FindResolution(res.Body)
resolutions := []string{resolution}
cuttedURI := storage.CutURI(link)
fn := storage.CreateFileName(dir, resolutions, cuttedURI, period)
err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755)
if err != nil {
return fmt.Errorf("mkdirall error: %w", err)
}
// Create M3U8 playlist.
go gens.MediaPlaylistGenerator(dir+"/data/"+cuttedURI, "", fn.Duration, resolutions)
// Find formats.
var audioFormat string
var videoFormat string
av1Format, av1Media, err := formats.FindAV1Format(desc)
if av1Format != nil {
log.Println("[av1]: format found")
videoFormat = "AV1"
} else {
log.Println(err)
}
g711Format, g711Media, err := formats.FindG711Format(desc)
if g711Format != nil {
log.Println("[g711]: format found")
audioFormat = "G711"
} else {
log.Println(err)
}
h264Format, h264Media, err := formats.FindH264Format(desc)
if h264Format != nil {
log.Println("[h264]: format found")
videoFormat = "H264"
} else {
log.Println(err)
}
aacFormat, aacMedia, err := formats.FindAACFormat(desc)
if aacFormat != nil {
log.Println("[aac]: format found")
audioFormat = "AAC"
} else {
log.Println(err)
}
/*
h265Format, h265Media, err := formats.FindH265Format(desc)
if h265Format != nil {
log.Println("[h265]: format found")
videoFormat = "H265"
} else {
log.Println(err)
}
lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc)
if lpcmFormat != nil {
log.Println("[lpcm]: format found")
audioFormat = "LPCM"
} else {
log.Println(err)
}
mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc)
if mjpegFormat != nil {
log.Println("[mjpeg]: format found")
videoFormat = "MJPEG"
} else {
log.Println(err)
}
opusFormat, opusMedia, err := formats.FindOPUSFormat(desc)
if opusFormat != nil {
log.Println("[opus]: format found")
audioFormat = "OPUS"
} else {
log.Println(err)
}
vp8Format, vp8Media, err := formats.FindVP8Format(desc)
if vp8Format != nil {
log.Println("[vp8]: format found")
videoFormat = "VP8"
} else {
log.Println(err)
}
vp9Format, vp9Media, err := formats.FindVP9Format(desc)
if vp9Format != nil {
log.Println("[vp9]: format found")
videoFormat = "VP9"
} else {
log.Println(err)
}
*/
// Start program according to gotten formats.
switch {
case videoFormat == "AV1" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
av1RTPDec, err := av1Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
av1Dec := &formats.AV1Decoder{}
err = av1Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer av1Dec.Close()
// Setup media.
_, err = c.Setup(desc.BaseURL, av1Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
firstRandomReceived := false
// Process input rtp packets.
c.OnPacketRTP(av1Media, av1Format, func(pkt *rtp.Packet) {
// Process AV1 flow and return PTS and IMG.
pts, img, err := formats.ProcessAV1(&c, av1Media, av1RTPDec, pkt, av1Dec, firstRandomReceived, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "G711":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
g711RTPDec, err := g711Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, g711Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w\n", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(g711Media, g711Format, func(pkt *rtp.Packet) {
// Process G711 flow and return PTS and AU.
pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
// Decode samples (these are 16-bit, big endian LPCM samples).
if g711Format.MULaw {
g711.DecodeMulaw(au)
} else {
g711.DecodeAlaw(au)
}
log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(au))
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
*/
}
}()
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "AAC":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v-%v]: start recording", videoFormat, audioFormat)
// Create decoders.
h264RTPDec, err := h264Format.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
aacRTPDec, err := aacFormat.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
// Setup MPEG-TS muxer.
currentMpegtsMuxer := formats.MpegtsMuxer{
FileName: fn.SetNumNTime(),
H264Format: h264Format,
Mpeg4AudioFormat: aacFormat,
}
err = currentMpegtsMuxer.Initialize()
if err != nil {
return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
}
// Setup all medias.
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
switch forma.(type) {
case *format.H264:
// Process H264 flow and return PTS and AU.
pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
// Encode the access unit into MPEG-TS.
err = currentMpegtsMuxer.WriteH264(pts, au)
if err != nil {
return
}
case *format.MPEG4Audio:
// Process AAC flow and return PTS and AUS.
pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
// Encode access units into MPEG-TS.
err = currentMpegtsMuxer.WriteAAC(aus, pts)
if err != nil {
return
}
}
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
currentMpegtsMuxer.Close()
currentMpegtsMuxer.FileName = fn.SetNumNTime()
err = currentMpegtsMuxer.Initialize()
if err != nil {
log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err)
return
}
log.Printf("[%v-%v]: new file for recording created: %v",
videoFormat, audioFormat, currentMpegtsMuxer.FileName)
}
}()
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "G711":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v-%v]: start recording", videoFormat, audioFormat)
// Create decoders.
h264RTPDec, err := h264Format.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
g711RTPDec, err := g711Format.CreateDecoder()
if err != nil {
log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err)
}
//// Setup MPEG-TS muxer.
//var aacFormat *format.MPEG4Audio
//
//currentMpegtsMuxer := formats.MpegtsMuxer{
// FileName: fn.SetNumNTime(),
// H264Format: h264Format,
// Mpeg4AudioFormat: aacFormat,
//}
//err = currentMpegtsMuxer.Initialize()
//if err != nil {
// return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
//}
// Setup all medias.
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err)
}
///////////////////////////////////////////////////////////
file, err := os.Create(
dir + "/data/" + cuttedURI + "/" + time.Now().Format("15-04-05_02-01-2006") + ".insit")
if err != nil {
fmt.Println("Ошибка создания файла:", err)
}
defer file.Close()
seg := storage.Segment{
Start: time.Now().Format("15-04-05_02-01-2006"),
Duration: strconv.Itoa(period),
Packets: storage.InterleavedPacket{},
}
// Записываем заголовок файла (например, streamID).
streamID := cuttedURI
if err := binary.Write(file, binary.LittleEndian, int32(len(streamID))); err != nil {
fmt.Println("Ошибка записи заголовка:", err)
}
if _, err := file.Write([]byte(streamID)); err != nil {
fmt.Println("Ошибка записи streamID:", err)
}
err = storage.WriteHeader(file, seg)
if err != nil {
return fmt.Errorf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
switch f := forma.(type) {
case *format.H264:
// Process H264 flow and return PTS and AU.
pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
if au != nil {
seg.Packets.Type = storage.PacketTypeH264
seg.Packets.Pts = pts
seg.Packets.H264AUs = au
// Записываем сегмент с interleaved пакетами.
if err := storage.WriteInterleavedPacket(file, seg); err != nil {
fmt.Println("Ошибка записи сегмента:", err)
return
}
}
//// Encode the access unit into MPEG-TS.
//err = currentMpegtsMuxer.WriteH264(pts, au)
//if err != nil {
// return
//}
case *format.G711:
// Process G711 flow and returns PTS and AU.
pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err)
return
}
if au != nil {
lpcmSamples := formats.ConvertG711ToLPCM(au, f.MULaw)
seg.Packets.Type = storage.PacketTypeH264
seg.Packets.Pts = pts
seg.Packets.LPCMSamples = lpcmSamples
// Записываем сегмент с interleaved пакетами.
if err := storage.WriteInterleavedPacket(file, seg); err != nil {
fmt.Println("Ошибка записи сегмента:", err)
return
}
}
//// Convert G711 to AAC.
//au, err = formats.ConvertLPCMToAAC(lpcmSamples)
//if err != nil {
// log.Printf("[%v-%v]: converting to AAC frame error: %v\n", videoFormat, audioFormat, err)
//}
//
//// Encode the access unit into MPEG-TS.
//err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pts)
//if err != nil {
// return
}
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
/*
// Logic for rotation files.
currentMpegtsMuxer.Close()
currentMpegtsMuxer.FileName = fn.SetNumNTime()
err = currentMpegtsMuxer.Initialize()
if err != nil {
log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err)
return
}
*/
file.Close()
file, err = os.Create(
dir + "/data/" + cuttedURI + "/" + time.Now().Format("15-04-05_02-01-2006") + ".insit")
if err != nil {
fmt.Println("Ошибка создания файла:", err)
}
seg = storage.Segment{
Start: time.Now().Format("15-04-05_02-01-2006"),
Duration: strconv.Itoa(period),
Packets: storage.InterleavedPacket{},
}
// Записываем заголовок файла (например, streamID).
streamID = cuttedURI
if err := binary.Write(file, binary.LittleEndian, int32(len(streamID))); err != nil {
fmt.Println("Ошибка записи заголовка:", err)
}
if _, err := file.Write([]byte(streamID)); err != nil {
fmt.Println("Ошибка записи streamID:", err)
}
err = storage.WriteHeader(file, seg)
if err != nil {
log.Printf("[%v-%v]: write header error: %w", videoFormat, audioFormat, err)
}
log.Printf("[%v-%v]: new file for recording created: %v",
videoFormat, audioFormat, seg.Start+".insit")
}
}()
/*
panic(c.Wait())
case videoFormat == "H264" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
h264RTPDec, err := h264Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup H264 -> RGBA decoder.
h264Dec := &formats.H264Decoder{}
err = h264Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer h264Dec.Close()
// if SPS and PPS are present into the SDP, send them to the decoder
if h264Format.SPS != nil {
h264Dec.Decode([][]byte{h264Format.SPS})
}
if h264Format.PPS != nil {
h264Dec.Decode([][]byte{h264Format.PPS})
}
// Setup media.
_, err = c.Setup(desc.BaseURL, h264Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
firstRandomAccess := false
// Process input rtp packets.
c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) {
// Process H264 flow and return PTS and IMG.
pts, img, err := formats.ProcessH264RGBA(
&c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
}
}()
panic(c.Wait())
case videoFormat == "H265" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
h265RTPDec, err := h265Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup H264 -> RGBA decoder.
h265Dec := &formats.H265Decoder{}
err = h265Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer h265Dec.Close()
// If VPS, SPS and PPS are present into the SDP, send them to the decoder.
if h265Format.VPS != nil {
h265Dec.Decode([][]byte{h265Format.VPS})
}
if h265Format.SPS != nil {
h265Dec.Decode([][]byte{h265Format.SPS})
}
if h265Format.PPS != nil {
h265Dec.Decode([][]byte{h265Format.PPS})
}
// Setup media.
_, err = c.Setup(desc.BaseURL, h265Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
firstRandomAccess := false
// Process input rtp packets.
c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) {
// Process H265 flow and return PTS and IMG.
pts, img, err := formats.ProcessH265RGBA(
&c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "LPCM":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
lpcmRTPDec, err := lpcmFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0)
if err != nil {
log.Printf("[%v]: setup media error: %v\n", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) {
// Process LPCM flow and return PTS and SAMPLES.
pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples))
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
}
}()
panic(c.Wait())
case videoFormat == "MJPEG" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
mjpegRTPDec, err := mjpegFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) {
// Process MJPEG flow and return PTS and IMG.
pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", videoFormat, err)
return
}
log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "AAC":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
aacRTPDec, err := aacFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, aacMedia, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) {
// Process AAC flow and return PTS and AUS.
pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
for _, au := range aus {
log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au))
}
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
}
}()
panic(c.Wait())
case videoFormat == "" && audioFormat == "OPUS":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", audioFormat)
// Create decoder.
opusRTPDec, err := opusFormat.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", audioFormat, err)
}
// Setup media.
_, err = c.Setup(desc.BaseURL, opusMedia, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) {
// Process OPUS flow and return PTS and OP.
pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op))
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
}
}()
panic(c.Wait())
case videoFormat == "VP8" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
vp8RTPDec, err := vp8Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup VP8 -> RGBA decoder.
vp8Dec := &formats.VPDecoder{}
err = vp8Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer vp8Dec.Close()
// Setup media.
_, err = c.Setup(desc.BaseURL, vp8Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) {
// Process VP8 flow and return PTS and IMG.
pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
}
}()
panic(c.Wait())
case videoFormat == "VP9" && audioFormat == "":
// Wait for the next period.
storage.WaitPeriod(period)
log.Printf("[%v]: start recording", videoFormat)
// Create decoder.
vp9RTPDec, err := vp9Format.CreateDecoder()
if err != nil {
log.Printf("[%v]: create decoder error: %v\n", videoFormat, err)
}
// Setup VP9 -> RGBA decoder.
vp9Dec := &formats.VPDecoder{}
err = vp9Dec.Initialize()
if err != nil {
log.Printf("[%v]: init decoder error: %v\n", videoFormat, err)
}
defer vp9Dec.Close()
// Setup media.
_, err = c.Setup(desc.BaseURL, vp9Media, 0, 0)
if err != nil {
return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err)
}
// Process input rtp packets.
c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) {
// Process VP9 flow and return PTS and IMG.
pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat)
if err != nil {
log.Printf("[%v]: process packet error: %v\n", audioFormat, err)
return
}
log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max)
})
// Start playing.
_, err = c.Play(nil)
if err != nil {
return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err)
}
// Create ticker for rotation files.
ticker := time.NewTicker(time.Duration(period) * time.Second)
defer ticker.Stop()
// Rotate files.
go func() {
for range ticker.C {
// Logic for rotation files.
/*
currentMpegtsMuxer.close()
currentMpegtsMuxer.fileName = fn.SetNumNTime()
err = currentMpegtsMuxer.initialize()
if err != nil {
panic(err)
}
log.Println("New file for recording created:", currentMpegtsMuxer.fileName)
}
}()
*/
panic(c.Wait())
}
return nil
}
// changeDomain changes domain if a camera was flipped to another domain.
func changeDomain(dir string, period int, link string, err error) error {
err2 := errors.New("404 (Not found)")
if errors.As(err, &err2) {
if strings.Contains(link, "video-1") {
err = RTSP(dir, period, strings.Replace(link, "video-1", "video-2", 1))
if err != nil {
return err
}
} else {
err = RTSP(dir, period, strings.Replace(link, "video-2", "video-1", 1))
if err != nil {
return err
}
}
} else {
panic(err)
}
return nil
}