1571 lines
44 KiB
Go
1571 lines
44 KiB
Go
package rtsp
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/config"
|
|
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats"
|
|
logger "git.insit.tech/psa/rtsp_reader-writer/writer/internal/log"
|
|
"git.insit.tech/psa/rtsp_reader-writer/writer/internal/metrics"
|
|
"git.insit.tech/psa/rtsp_reader-writer/writer/pkg/storage"
|
|
log2 "git.insit.tech/sas/rtsp_proxy/core/log"
|
|
"github.com/bluenviron/gortsplib/v4"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/base"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
|
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
|
|
"github.com/bluenviron/mediacommon/v2/pkg/codecs/g711"
|
|
"github.com/pion/rtp"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// StartWriter starts the program.
|
|
func StartWriter() {
|
|
cams, err := config.ParseCamerasYAML()
|
|
if err != nil {
|
|
logger.Log.Fatal("func ParseCamerasYAML error:", zap.Error(err))
|
|
log.Println("func ParseCamerasYAML error: ")
|
|
}
|
|
|
|
for _, link := range cams {
|
|
logger.Log.Info("start process camera:", zap.String("link", link))
|
|
log.Println("start process camera: ", link)
|
|
|
|
go func() {
|
|
err = rtsp(config.Local, 60, link, -1)
|
|
if err != nil {
|
|
logger.Log.Error("procRTSP function error for camera:", zap.String("link", link), zap.Error(err))
|
|
log.Println("procRTSP function error for camera: ", link)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// rtsp processes RTSP protocol.
|
|
func rtsp(dir string, period int, link string, number int) error {
|
|
|
|
var marker bool
|
|
|
|
// Calculate the number of active cameras (number of active rtsp functions).
|
|
metrics.ActiveCameras.Inc()
|
|
defer metrics.ActiveCameras.Dec()
|
|
|
|
// Create data folder in the directory.
|
|
config.DirData = log2.DirCreator(dir, "vod")
|
|
|
|
// Create cam logger.
|
|
cutURI := LastPartURI(link)
|
|
cam := log2.CamLogging(fmt.Sprintf("%s/%s/log/writer-cam_%s.log", config.DirData, cutURI, strconv.FormatInt(time.Now().Unix(), 10)))
|
|
|
|
// Connect to the server.
|
|
c := gortsplib.Client{
|
|
UserAgent: "PSA",
|
|
}
|
|
|
|
u, err := base.ParseURL(link)
|
|
if err != nil {
|
|
cam.Error("parse URL error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
err = c.Start(u.Scheme, u.Host)
|
|
if err != nil {
|
|
cam.Error("connect to the server error:", zap.Error(err))
|
|
return err
|
|
}
|
|
defer c.Close()
|
|
|
|
desc, res, err := c.Describe(u)
|
|
if err != nil || desc == nil {
|
|
cam.Error("medias not found for camera:", zap.Error(err))
|
|
log.Println("medias not found for camera: ", cutURI)
|
|
changeDomain(dir, period, link, number+1, cam, err)
|
|
return nil
|
|
}
|
|
|
|
// Create file name structure and directory for files.
|
|
resolution := findResolution(res.Body)
|
|
resolutions := []string{resolution}
|
|
|
|
for i, itemRes := range resolutions {
|
|
switch itemRes {
|
|
case "1920-1080":
|
|
resolutions[i] = "tracks_v1"
|
|
case "1280-720":
|
|
resolutions[i] = "tracks_v2"
|
|
}
|
|
}
|
|
|
|
fn := storage.CreateFileName(config.DirData, resolutions, cutURI, period, number)
|
|
|
|
err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755)
|
|
if err != nil {
|
|
cam.Error("mkdirall error: %w", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Find formats.
|
|
var audioFormat string
|
|
var videoFormat string
|
|
|
|
av1Format, av1Media, err := formats.FindAV1Format(desc)
|
|
if av1Format != nil {
|
|
videoFormat = "AV1"
|
|
cam.Info("AV1 format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindAV1Format:", zap.Error(err))
|
|
}
|
|
|
|
g711Format, g711Media, err := formats.FindG711Format(desc)
|
|
if g711Format != nil {
|
|
audioFormat = "G711"
|
|
cam.Info("G711 format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindG711Format:", zap.Error(err))
|
|
}
|
|
|
|
h264Format, h264Media, err := formats.FindH264Format(desc)
|
|
if h264Format != nil {
|
|
videoFormat = "H264"
|
|
cam.Info("H264 format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindH264Format:", zap.Error(err))
|
|
}
|
|
|
|
aacFormat, aacMedia, err := formats.FindAACFormat(desc)
|
|
if aacFormat != nil {
|
|
audioFormat = "AAC"
|
|
cam.Info("AAC format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindAACFormat:", zap.Error(err))
|
|
}
|
|
|
|
h265Format, h265Media, err := formats.FindH265Format(desc)
|
|
if h265Format != nil {
|
|
videoFormat = "H265"
|
|
cam.Info("H265 format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindH265Format:", zap.Error(err))
|
|
}
|
|
|
|
lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc)
|
|
if lpcmFormat != nil {
|
|
audioFormat = "LPCM"
|
|
cam.Info("LPCM format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindLPCMFormat:", zap.Error(err))
|
|
}
|
|
|
|
mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc)
|
|
if mjpegFormat != nil {
|
|
videoFormat = "MJPEG"
|
|
cam.Info("MJPEG format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindLPCMFormat:", zap.Error(err))
|
|
}
|
|
|
|
opusFormat, opusMedia, err := formats.FindOPUSFormat(desc)
|
|
if opusFormat != nil {
|
|
audioFormat = "OPUS"
|
|
cam.Info("OPUS format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindOPUSFormat:", zap.Error(err))
|
|
}
|
|
|
|
vp8Format, vp8Media, err := formats.FindVP8Format(desc)
|
|
if vp8Format != nil {
|
|
videoFormat = "VP8"
|
|
cam.Info("VP8 format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindVP8Format:", zap.Error(err))
|
|
}
|
|
|
|
vp9Format, vp9Media, err := formats.FindVP9Format(desc)
|
|
if vp9Format != nil {
|
|
videoFormat = "VP9"
|
|
cam.Info("VP9 format found")
|
|
}
|
|
if err != nil {
|
|
cam.Info("func FindVP9Format:", zap.Error(err))
|
|
}
|
|
|
|
// Start program according to gotten formats.
|
|
switch {
|
|
case videoFormat == "AV1" && audioFormat == "":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
av1RTPDec, err := av1Format.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
av1Dec := &formats.AV1Decoder{}
|
|
err = av1Dec.Initialize()
|
|
if err != nil {
|
|
cam.Error("init decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
defer av1Dec.Close()
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, av1Media, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
firstRandomReceived := false
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(av1Media, av1Format, func(pkt *rtp.Packet) {
|
|
// Process AV1 flow and return PTS and IMG.
|
|
pts, _, err := formats.ProcessAV1(&c, av1Media, av1RTPDec, pkt, av1Dec, firstRandomReceived)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "" && audioFormat == "G711":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
g711RTPDec, err := g711Format.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, g711Media, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(g711Media, g711Format, func(pkt *rtp.Packet) {
|
|
// Process G711 flow and return PTS and AU.
|
|
pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
// Decode samples (these are 16-bit, big endian LPCM samples).
|
|
if g711Format.MULaw {
|
|
g711.DecodeMulaw(au)
|
|
} else {
|
|
g711.DecodeAlaw(au)
|
|
}
|
|
|
|
cam.Info("decoded audio samples:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "H264" && audioFormat == "AAC":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoders.
|
|
//h264RTPDec, err := h264Format.CreateDecoder()
|
|
//if err != nil {
|
|
// cam.Error("create decoder error:", zap.Error(err))
|
|
//}
|
|
//
|
|
//aacRTPDec, err := aacFormat.CreateDecoder()
|
|
//if err != nil {
|
|
// cam.Error("create decoder error:", zap.Error(err))
|
|
//}
|
|
|
|
//// Setup MPEG-TS muxer.
|
|
//currentMpegtsMuxer := formats.MpegtsMuxer{
|
|
// FileName: fn.SetNumNTime(),
|
|
// H264Format: h264Format,
|
|
// Mpeg4AudioFormat: aacFormat,
|
|
//}
|
|
|
|
//err = currentMpegtsMuxer.Initialize()
|
|
//if err != nil {
|
|
// return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
|
|
//}
|
|
|
|
// Setup all medias.
|
|
err = c.SetupAll(desc.BaseURL, desc.Medias)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
|
|
switch forma.(type) {
|
|
case *format.H264:
|
|
//// Process H264 flow and return PTS and AU.
|
|
//pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat)
|
|
//if err != nil {
|
|
// cam.Warn("process packet error:", zap.Error(err))
|
|
//}
|
|
//
|
|
//// Encode the access unit into MPEG-TS.
|
|
//err = currentMpegtsMuxer.WriteH264(pts, au)
|
|
//if err != nil {
|
|
// return
|
|
//}
|
|
|
|
case *format.MPEG4Audio:
|
|
//// Process AAC flow and return PTS and AUS.
|
|
//pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat)
|
|
//if err != nil {
|
|
// cam.Warn("process packet error:", zap.Error(err))
|
|
//}
|
|
//
|
|
//// Encode access units into MPEG-TS.
|
|
//err = currentMpegtsMuxer.WriteAAC(aus, pts)
|
|
//if err != nil {
|
|
// return
|
|
//}
|
|
|
|
}
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
//// Logic for rotation files.
|
|
//currentMpegtsMuxer.Close()
|
|
//currentMpegtsMuxer.FileName = fn.SetNumNTime()
|
|
//
|
|
//err = currentMpegtsMuxer.Initialize()
|
|
//if err != nil {
|
|
// log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err)
|
|
// return
|
|
//}
|
|
//
|
|
// cam.Info("new file for recording created")
|
|
// log.Println("new file for recording created")
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "H264" && audioFormat == "G711" || videoFormat == "H264" && audioFormat == "":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoders.
|
|
h264RTPDec, err := h264Format.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
//g711RTPDec, err := g711Format.CreateDecoder()
|
|
//if err != nil {
|
|
// cam.Error("create decoder error:", zap.Error(err))
|
|
// return err
|
|
//}
|
|
|
|
//// Setup MPEG-TS muxer.
|
|
//var aacFormat *format.MPEG4Audio
|
|
//
|
|
//currentMpegtsMuxer := formats.MpegtsMuxer{
|
|
// FileName: fn.SetNumNTime(),
|
|
// H264Format: h264Format,
|
|
// Mpeg4AudioFormat: aacFormat,
|
|
//}
|
|
|
|
//err = currentMpegtsMuxer.Initialize()
|
|
//if err != nil {
|
|
// return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err)
|
|
//}
|
|
|
|
// Setup all medias.
|
|
err = c.SetupAll(desc.BaseURL, desc.Medias)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
file, err := os.Create(fn.SetNumNTime("insit"))
|
|
if err != nil {
|
|
cam.Error("creating file error:", zap.Error(err))
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
seg := storage.Segment{
|
|
Date: strconv.FormatInt(time.Now().Unix(), 10),
|
|
Duration: strconv.Itoa(period),
|
|
Packet: storage.InterleavedPacket{},
|
|
}
|
|
|
|
// Write StreamID.
|
|
if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil {
|
|
cam.Error("write StreamID length error:", zap.Error(err))
|
|
return err
|
|
}
|
|
if _, err := file.Write([]byte(cutURI)); err != nil {
|
|
cam.Error("write StreamID error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Write header of the file.
|
|
err = storage.WriteHeader(file, seg)
|
|
if err != nil {
|
|
cam.Error("write header error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
|
|
switch forma.(type) {
|
|
case *format.H264:
|
|
// Process H264 flow and return PTS and AU.
|
|
pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
for _, nalu := range au {
|
|
typ := h264.NALUType(nalu[0] & 0x1F)
|
|
|
|
switch typ {
|
|
case h264.NALUTypeSPS:
|
|
continue
|
|
|
|
case h264.NALUTypePPS:
|
|
continue
|
|
|
|
case h264.NALUTypeAccessUnitDelimiter:
|
|
continue
|
|
|
|
case h264.NALUTypeIDR:
|
|
marker = true
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
}
|
|
|
|
if au != nil {
|
|
// Add appropriate lines to the interleaved packet.
|
|
seg.Packet.Type = storage.PacketTypeH264
|
|
seg.Packet.Pts = pts
|
|
seg.Packet.H264AUs = au
|
|
|
|
// Write segment with interleaved packets.
|
|
if err := storage.WriteInterleavedPacket(file, seg); err != nil {
|
|
cam.Error("write segment error:", zap.Error(err))
|
|
return
|
|
}
|
|
}
|
|
|
|
marker = false
|
|
|
|
//// Encode the access unit into MPEG-TS.
|
|
//err = currentMpegtsMuxer.WriteH264(pts, au)
|
|
//if err != nil {
|
|
// return
|
|
//}
|
|
|
|
case *format.G711:
|
|
// Process G711 flow and returns PTS and AU.
|
|
//pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt)
|
|
//if err != nil {
|
|
// cam.Warn("process packet error:", zap.Error(err))
|
|
//}
|
|
//
|
|
//if au != nil {
|
|
// // Convert G711 to LPCM.
|
|
// lpcmSamples := formats.ConvertG711ToLPCM(au, f.MULaw)
|
|
//
|
|
// // Add appropriate lines to the interleaved packet.
|
|
// seg.Packet.Type = storage.PacketTypeLPCM
|
|
// seg.Packet.Pts = pts
|
|
// seg.Packet.LPCMSamples = lpcmSamples
|
|
//
|
|
// // Write segment with interleaved packets.
|
|
// if err := storage.WriteInterleavedPacket(file, seg); err != nil {
|
|
// cam.Error("write segment error:", zap.Error(err))
|
|
// return
|
|
// }
|
|
//}
|
|
|
|
//// Convert G711 to AAC.
|
|
//au, err = formats.ConvertLPCMToAAC(lpcmSamples)
|
|
//if err != nil {
|
|
// log.Printf("[%v-%v]: converting to AAC frame error: %v\n", videoFormat, audioFormat, err)
|
|
//}
|
|
//
|
|
//// Encode the access unit into MPEG-TS.
|
|
//err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pts)
|
|
//if err != nil {
|
|
// return
|
|
}
|
|
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
fileChanged := false
|
|
for {
|
|
if marker {
|
|
file.Close()
|
|
|
|
file, err = os.Create(fn.SetNumNTime("insit"))
|
|
if err != nil {
|
|
cam.Error("creating file error:", zap.Error(err))
|
|
logger.Log.Error("creating file error:", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
seg = storage.Segment{
|
|
Date: strconv.FormatInt(time.Now().Unix(), 10),
|
|
Duration: strconv.Itoa(period),
|
|
Packet: storage.InterleavedPacket{},
|
|
}
|
|
|
|
// Write StreamID.
|
|
if err := binary.Write(file, binary.LittleEndian, int32(len(cutURI))); err != nil {
|
|
cam.Error("write StreamID length error:", zap.Error(err))
|
|
logger.Log.Error("write StreamID length error:", zap.Error(err))
|
|
return
|
|
}
|
|
if _, err := file.Write([]byte(cutURI)); err != nil {
|
|
cam.Error("write StreamID error:", zap.Error(err))
|
|
logger.Log.Error("write StreamID error:", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// Write header of the file.
|
|
err = storage.WriteHeader(file, seg)
|
|
if err != nil {
|
|
cam.Error("write header error:", zap.Error(err))
|
|
logger.Log.Error("write header error:", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
|
|
fileChanged = true
|
|
break
|
|
}
|
|
|
|
if fileChanged {
|
|
fileChanged = false
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "H264-" && audioFormat == "":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
h264RTPDec, err := h264Format.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Setup H264 -> RGBA decoder.
|
|
h264Dec := &formats.H264Decoder{}
|
|
err = h264Dec.Initialize()
|
|
if err != nil {
|
|
cam.Error("init decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
defer h264Dec.Close()
|
|
|
|
// if SPS and PPS are present into the SDP, send them to the decoder
|
|
if h264Format.SPS != nil {
|
|
h264Dec.Decode([][]byte{h264Format.SPS})
|
|
}
|
|
if h264Format.PPS != nil {
|
|
h264Dec.Decode([][]byte{h264Format.PPS})
|
|
}
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, h264Media, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
firstRandomAccess := false
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) {
|
|
// Process H264 flow and return PTS and IMG.
|
|
pts, _, err := formats.ProcessH264RGBA(
|
|
&c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "H265" && audioFormat == "":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
h265RTPDec, err := h265Format.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Setup H264 -> RGBA decoder.
|
|
h265Dec := &formats.H265Decoder{}
|
|
err = h265Dec.Initialize()
|
|
if err != nil {
|
|
cam.Error("init decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
defer h265Dec.Close()
|
|
|
|
// If VPS, SPS and PPS are present into the SDP, send them to the decoder.
|
|
if h265Format.VPS != nil {
|
|
h265Dec.Decode([][]byte{h265Format.VPS})
|
|
}
|
|
if h265Format.SPS != nil {
|
|
h265Dec.Decode([][]byte{h265Format.SPS})
|
|
}
|
|
if h265Format.PPS != nil {
|
|
h265Dec.Decode([][]byte{h265Format.PPS})
|
|
}
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, h265Media, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
firstRandomAccess := false
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) {
|
|
// Process H265 flow and return PTS and IMG.
|
|
pts, _, err := formats.ProcessH265RGBA(&c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "" && audioFormat == "LPCM":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
lpcmRTPDec, err := lpcmFormat.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) {
|
|
// Process LPCM flow and return PTS and SAMPLES.
|
|
pts, _, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
cam.Info("decoded audio samples:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "MJPEG" && audioFormat == "":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
mjpegRTPDec, err := mjpegFormat.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) {
|
|
// Process MJPEG flow and return PTS and IMG.
|
|
pts, _, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "" && audioFormat == "AAC":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
aacRTPDec, err := aacFormat.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, aacMedia, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) {
|
|
// Process AAC flow and return PTS and AUS.
|
|
pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
for _, _ = range aus {
|
|
cam.Info("received access unit:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
}
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "" && audioFormat == "OPUS":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
opusRTPDec, err := opusFormat.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, opusMedia, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) {
|
|
// Process OPUS flow and return PTS and OP.
|
|
pts, _, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
cam.Info("received OPUS packet:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "VP8" && audioFormat == "":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
vp8RTPDec, err := vp8Format.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Setup VP8 -> RGBA decoder.
|
|
vp8Dec := &formats.VPDecoder{}
|
|
err = vp8Dec.Initialize()
|
|
if err != nil {
|
|
cam.Error("init decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
defer vp8Dec.Close()
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, vp8Media, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) {
|
|
// Process VP8 flow and return PTS and IMG.
|
|
pts, _, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
case videoFormat == "VP9" && audioFormat == "":
|
|
// Wait for the next period.
|
|
waitPeriod(period, cam)
|
|
cam.Info("start recording")
|
|
|
|
// Create decoder.
|
|
vp9RTPDec, err := vp9Format.CreateDecoder()
|
|
if err != nil {
|
|
cam.Error("create decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Setup VP9 -> RGBA decoder.
|
|
vp9Dec := &formats.VPDecoder{}
|
|
err = vp9Dec.Initialize()
|
|
if err != nil {
|
|
cam.Error("init decoder error:", zap.Error(err))
|
|
return err
|
|
}
|
|
defer vp9Dec.Close()
|
|
|
|
// Setup media.
|
|
_, err = c.Setup(desc.BaseURL, vp9Media, 0, 0)
|
|
if err != nil {
|
|
cam.Error("setup media error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Process input rtp packets.
|
|
c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) {
|
|
// Process VP9 flow and return PTS and IMG.
|
|
pts, _, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt)
|
|
if err != nil {
|
|
cam.Warn("process packet error:", zap.Error(err))
|
|
}
|
|
|
|
cam.Info("decoded image:", zap.String("PTS", strconv.FormatInt(pts, 10)))
|
|
})
|
|
|
|
// Start playing.
|
|
_, err = c.Play(nil)
|
|
if err != nil {
|
|
cam.Error("sending PLAY request error:", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Set program if the client gets request.
|
|
c.OnRequest = func(req *base.Request) {
|
|
switch req.Method {
|
|
case base.Teardown:
|
|
cam.Warn("got TEARDOWN request from server:", zap.String("request", req.String()))
|
|
case base.Method(rune(base.StatusRequestTimeout)):
|
|
cam.Warn("got STATUS_REQUEST_TIMEOUT from server:", zap.String("request", req.String()))
|
|
case base.GetParameter:
|
|
cam.Info("got Get_Parameter from server:", zap.String("request", req.String()))
|
|
case "TCP timeout":
|
|
cam.Warn("got TCP timeout from server:", zap.String("request", req.String()))
|
|
case "EOF":
|
|
cam.Warn("got EOF from server:", zap.String("request", req.String()))
|
|
default:
|
|
cam.Warn("got method from server:", zap.String("request", req.String()))
|
|
}
|
|
}
|
|
|
|
// Create ticker for rotation files.
|
|
ticker := time.NewTicker(time.Duration(period) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Rotate files.
|
|
go func() {
|
|
for range ticker.C {
|
|
// Logic for rotation files.
|
|
/*
|
|
currentMpegtsMuxer.close()
|
|
currentMpegtsMuxer.fileName = fn.SetNumNTime()
|
|
|
|
err = currentMpegtsMuxer.initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
cam.Info("new file for recording created")
|
|
log.Println("new file for recording created")
|
|
*/
|
|
}
|
|
}()
|
|
|
|
if err = c.Wait(); err != nil {
|
|
cam.Error("c.Wait() error:", zap.Error(c.Wait()))
|
|
logger.Log.Error("c.Wait() error for camera:", zap.String("camera:", link), zap.Error(c.Wait()))
|
|
go func() {
|
|
time.Sleep(40 * time.Second)
|
|
rtsp(dir, period, link, fn.Number+1)
|
|
}()
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// changeDomain changes domain if a camera was flipped to another domain.
|
|
func changeDomain(dir string, period int, link string, number int, cam *zap.Logger, err error) {
|
|
err2 := errors.New("404 (Not found)")
|
|
if errors.As(err, &err2) {
|
|
if strings.Contains(link, "video-1") {
|
|
err = rtsp(dir, period, strings.Replace(link, "video-1", "video-2", 1), number)
|
|
if err != nil {
|
|
cam.Error("changeDomain rtsp error:", zap.Error(err))
|
|
logger.Log.Error("changeDomain rtsp error:", zap.Error(err))
|
|
log.Println("changeDomain rtsp error for camera: ", link)
|
|
return
|
|
}
|
|
} else {
|
|
err = rtsp(dir, period, strings.Replace(link, "video-2", "video-1", 1), number)
|
|
if err != nil {
|
|
cam.Error("changeDomain rtsp error:", zap.Error(err))
|
|
logger.Log.Error("changeDomain rtsp error:", zap.Error(err))
|
|
log.Println("changeDomain rtsp error for camera: ", link)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|