From c59702fcff75a6ac6432f7bcfb2de1ea2828a53b Mon Sep 17 00:00:00 2001 From: Sergey Petrov Date: Thu, 20 Mar 2025 09:58:31 +0500 Subject: [PATCH] Added th opportunity to get more formats. --- writer/cmd/main.go | 4 +- writer/go.mod | 5 +- writer/go.sum | 3 + writer/internal/config/source.yaml | 62 +- writer/internal/ingest/formats/aac.go | 48 + writer/internal/ingest/formats/av1.go | 71 ++ writer/internal/ingest/formats/av1_decoder.go | 161 +++ .../internal/ingest/{rtsp => formats}/g711.go | 17 +- .../pcm_to_aac.go => formats/g711_to_aac.go} | 4 +- .../ingest/formats/h264-aac_muxer.go} | 97 +- writer/internal/ingest/formats/h264.go | 96 ++ .../internal/ingest/formats/h264_decoder.go | 161 +++ writer/internal/ingest/formats/h265.go | 69 ++ .../internal/ingest/formats/h265_decoder.go | 161 +++ writer/internal/ingest/formats/lpcm.go | 48 + writer/internal/ingest/formats/mjpeg.go | 57 + writer/internal/ingest/formats/opus.go | 48 + .../ingest/formats/vp8-vp9_decoder.go | 153 +++ writer/internal/ingest/formats/vp8.go | 61 + writer/internal/ingest/formats/vp9.go | 62 + writer/internal/ingest/rtsp/h264.go | 43 - writer/internal/ingest/rtsp/rtsp.go | 1089 ++++++++++++++--- writer/internal/storage/editor.go | 2 +- 23 files changed, 2241 insertions(+), 281 deletions(-) create mode 100644 writer/internal/ingest/formats/aac.go create mode 100644 writer/internal/ingest/formats/av1.go create mode 100644 writer/internal/ingest/formats/av1_decoder.go rename writer/internal/ingest/{rtsp => formats}/g711.go (57%) rename writer/internal/ingest/{rtsp/pcm_to_aac.go => formats/g711_to_aac.go} (92%) rename writer/{pkg/converter/mpegts_muxer.go => internal/ingest/formats/h264-aac_muxer.go} (50%) create mode 100644 writer/internal/ingest/formats/h264.go create mode 100644 writer/internal/ingest/formats/h264_decoder.go create mode 100644 writer/internal/ingest/formats/h265.go create mode 100644 writer/internal/ingest/formats/h265_decoder.go create mode 100644 writer/internal/ingest/formats/lpcm.go create mode 100644 writer/internal/ingest/formats/mjpeg.go create mode 100644 writer/internal/ingest/formats/opus.go create mode 100644 writer/internal/ingest/formats/vp8-vp9_decoder.go create mode 100644 writer/internal/ingest/formats/vp8.go create mode 100644 writer/internal/ingest/formats/vp9.go delete mode 100644 writer/internal/ingest/rtsp/h264.go diff --git a/writer/cmd/main.go b/writer/cmd/main.go index ea6632d..c4ff8d2 100644 --- a/writer/cmd/main.go +++ b/writer/cmd/main.go @@ -19,10 +19,10 @@ func main() { // Connect to each camera. for _, link := range c { - log.Printf("start recording on camera: %s\n", link) + log.Printf("process camera:\n %s\n", link) go func() { - err = rtsp.RTSP(*directory, 60, link) + err = rtsp.StartWriter(*directory, 60, link) if err != nil { log.Printf("procRTSP function error for camera %s: %s", link, err.Error()) } diff --git a/writer/go.mod b/writer/go.mod index 9408495..d90b0e0 100644 --- a/writer/go.mod +++ b/writer/go.mod @@ -6,16 +6,18 @@ require ( git.insit.tech/sas/rtsp_proxy v0.0.0-20250310124520-82fa76149f4e github.com/Eyevinn/mp4ff v0.47.0 github.com/bluenviron/gortsplib/v4 v4.12.3 + github.com/bluenviron/mediacommon v1.14.0 github.com/bluenviron/mediacommon/v2 v2.0.0 github.com/gen2brain/aac-go v0.0.0-20230119102159-ef1e76509d21 + github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 github.com/pion/rtp v1.8.12 github.com/zaf/g711 v1.4.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( github.com/asticode/go-astikit v0.52.0 // indirect github.com/asticode/go-astits v1.13.0 // indirect - github.com/bluenviron/mediacommon v1.14.0 // indirect github.com/cyub/ringbuffer v0.0.0-20221202135829-35445cc89929 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grafov/m3u8 v0.12.1 // indirect @@ -24,5 +26,4 @@ require ( github.com/pion/sdp/v3 v3.0.10 // indirect golang.org/x/net v0.37.0 // indirect golang.org/x/sys v0.31.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/writer/go.sum b/writer/go.sum index 784180c..44b79b8 100644 --- a/writer/go.sum +++ b/writer/go.sum @@ -26,6 +26,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s= github.com/grafov/m3u8 v0.12.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080= +github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 h1:K7bmEmIesLcvCW0Ic2rCk6LtP5++nTnPmrO8mg5umlA= +github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302/go.mod h1:YQQXrWHN3JEvCtw5ImyTCcPeU/ZLo/YMA+TpB64XdrU= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= @@ -56,6 +58,7 @@ golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/writer/internal/config/source.yaml b/writer/internal/config/source.yaml index c46964e..5b2d1d5 100644 --- a/writer/internal/config/source.yaml +++ b/writer/internal/config/source.yaml @@ -1,36 +1,36 @@ #camera_1: rtsp://intercom-video-1.insit.ru/camera01-centr-pl_slavy -#camera_2: rtsp://intercom-video-1.insit.ru/camera03-centr-administraciya +camera_2: rtsp://intercom-video-1.insit.ru/camera03-centr-administraciya #camera_3: rtsp://intercom-video-1.insit.ru/camera08-centr-skver_kalinina -#camera_4: rtsp://intercom-video-1.insit.ru/camera04-centr-pobedy -#camera_5: rtsp://intercom-video-1.insit.ru/camera11-centr-dk_kirova -#camera_6: rtsp://intercom-video-1.insit.ru/camera13-centr-pobedy_sutyagina -#camera_7: rtsp://intercom-video-1.insit.ru/camera09-centr-nalogovaya +camera_4: rtsp://intercom-video-1.insit.ru/camera04-centr-pobedy +camera_5: rtsp://intercom-video-1.insit.ru/camera11-centr-dk_kirova +camera_6: rtsp://intercom-video-1.insit.ru/camera13-centr-pobedy_sutyagina +camera_7: rtsp://intercom-video-1.insit.ru/camera09-centr-nalogovaya #camera_8: rtsp://intercom-video-1.insit.ru/camera59-centr-prktslavy -#camera_9: rtsp://intercom-video-1.insit.ru/camera10-centr-kommunisticheskiy_ilycha -#camera_10: rtsp://intercom-video-1.insit.ru/camera16-centr-sportschool2 -#camera_11: rtsp://intercom-video-1.insit.ru/camera40-center-kommunisticheskiy -#camera_12: rtsp://intercom-video-1.insit.ru/camera12-centr-skver_pavshih_geroev -#camera_13: rtsp://intercom-video-1.insit.ru/camera39-center-kommunisticheskiy -#camera_14: rtsp://intercom-video-1.insit.ru/camera14-centr-stadion_himik -#camera_15: rtsp://intercom-video-1.insit.ru/camera41-center-kuznecova_borby -#camera_16: rtsp://intercom-video-1.insit.ru/camera83-gorod-Kommunistichesky -#camera_17: rtsp://intercom-video-1.insit.ru/camera54-centr-kirova-kalinina -#camera_18: rtsp://intercom-video-1.insit.ru/camera53-centr-pobedy_slavy -#camera_19: rtsp://intercom-video-1.insit.ru/camera44-center-skver_temnika -#camera_20: rtsp://intercom-video-1.insit.ru/camera28-oktyabrskiy-severnaya23a -#camera_21: rtsp://intercom-video-1.insit.ru/dp-qamumnrlkizuypnetljzzkjqamdoti -#camera_22: rtsp://intercom-video-1.insit.ru/dp-ohusuxzcvzsnpzzvkpyhddnwxuyeyc -#camera_23: rtsp://intercom-video-1.insit.ru/dp-bflbwjulvgfzurmcpejklrfvqairns -#camera_24: rtsp://intercom-video-1.insit.ru/dp-ajiymmjyytokybrpganfcxlfyjcdbgezphn -#camera_25: rtsp://intercom-video-2.insit.ru/dp-pobedi6a-ii-2125126423 -#camera_26: rtsp://intercom-video-1.insit.ru/dp-pobedi11-ii-2108117729 -#camera_27: rtsp://intercom-video-2.insit.ru/dp-pobedi11-i-2108117197 -#camera_28: rtsp://intercom-video-2.insit.ru/dp-nfhapwbfjpqkmaymfeipraxtzcpedk -#camera_29: rtsp://intercom-video-1.insit.ru/dp-swcixufwlheiwwrcvsrbkmhqzqvbxz +camera_9: rtsp://intercom-video-1.insit.ru/camera10-centr-kommunisticheskiy_ilycha +camera_10: rtsp://intercom-video-1.insit.ru/camera16-centr-sportschool2 +camera_11: rtsp://intercom-video-1.insit.ru/camera40-center-kommunisticheskiy +camera_12: rtsp://intercom-video-1.insit.ru/camera12-centr-skver_pavshih_geroev +camera_13: rtsp://intercom-video-1.insit.ru/camera39-center-kommunisticheskiy +camera_14: rtsp://intercom-video-1.insit.ru/camera14-centr-stadion_himik +camera_15: rtsp://intercom-video-1.insit.ru/camera41-center-kuznecova_borby +camera_16: rtsp://intercom-video-1.insit.ru/camera83-gorod-Kommunistichesky +camera_17: rtsp://intercom-video-1.insit.ru/camera54-centr-kirova-kalinina +camera_18: rtsp://intercom-video-1.insit.ru/camera53-centr-pobedy_slavy +camera_19: rtsp://intercom-video-1.insit.ru/camera44-center-skver_temnika +camera_20: rtsp://intercom-video-1.insit.ru/camera28-oktyabrskiy-severnaya23a +camera_21: rtsp://intercom-video-1.insit.ru/dp-qamumnrlkizuypnetljzzkjqamdoti +camera_22: rtsp://intercom-video-1.insit.ru/dp-ohusuxzcvzsnpzzvkpyhddnwxuyeyc +camera_23: rtsp://intercom-video-1.insit.ru/dp-bflbwjulvgfzurmcpejklrfvqairns +camera_24: rtsp://intercom-video-1.insit.ru/dp-ajiymmjyytokybrpganfcxlfyjcdbgezphn +camera_25: rtsp://intercom-video-2.insit.ru/dp-pobedi6a-ii-2125126423 +camera_26: rtsp://intercom-video-1.insit.ru/dp-pobedi11-ii-2108117729 +camera_27: rtsp://intercom-video-2.insit.ru/dp-pobedi11-i-2108117197 +camera_28: rtsp://intercom-video-2.insit.ru/dp-nfhapwbfjpqkmaymfeipraxtzcpedk +camera_29: rtsp://intercom-video-1.insit.ru/dp-swcixufwlheiwwrcvsrbkmhqzqvbxz #camera_30: rtsp://intercom-video-1.insit.ru/dp-nerrjszqrbhjvqmfxskunejafdiihj -#camera_31: rtsp://intercom-video-1.insit.ru/dp-aiwukyujwonohpjyzeniispqqullyr -#camera_32: rtsp://intercom-video-1.insit.ru/dp-woxvkbynctgfbuztsalttgburbpvjf -#camera_33: rtsp://intercom-video-1.insit.ru/dp-fdzbasqehtptsuhxnjeqqnlrixfahcgvlcr -#camera_34: rtsp://intercom-video-1.insit.ru/dp-exyeqscyamrbkwkjifagouyprtsdoe -#camera_35: rtsp://intercom-video-2.insit.ru/dp-sutyagina3a-iv-uujtwbsjekv +camera_31: rtsp://intercom-video-1.insit.ru/dp-aiwukyujwonohpjyzeniispqqullyr +camera_32: rtsp://intercom-video-1.insit.ru/dp-woxvkbynctgfbuztsalttgburbpvjf +camera_33: rtsp://intercom-video-1.insit.ru/dp-fdzbasqehtptsuhxnjeqqnlrixfahcgvlcr +camera_34: rtsp://intercom-video-1.insit.ru/dp-exyeqscyamrbkwkjifagouyprtsdoe +camera_35: rtsp://intercom-video-2.insit.ru/dp-sutyagina3a-iv-uujtwbsjekv camera_36: rtsp://intercom-video-1.insit.ru/dp-wyshispseamhqmnhkqwkbarshnrvni \ No newline at end of file diff --git a/writer/internal/ingest/formats/aac.go b/writer/internal/ingest/formats/aac.go new file mode 100644 index 0000000..5ce032c --- /dev/null +++ b/writer/internal/ingest/formats/aac.go @@ -0,0 +1,48 @@ +package formats + +import ( + "errors" + "fmt" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4audio" + "github.com/pion/rtp" +) + +// FindAACFormat finds the AAC media and format. +func FindAACFormat(desc *description.Session) (*format.MPEG4Audio, *description.Media, error) { + var aacFormat *format.MPEG4Audio + aacMedia := desc.FindFormat(&aacFormat) + if aacMedia == nil { + return nil, nil, errors.New("media AAC not found") + } + + return aacFormat, aacMedia, nil +} + +// ProcessAAC processes AAC flow and returns PTS and AUS. +func ProcessAAC( + c *gortsplib.Client, + aacMedia *description.Media, + aacRTPDec *rtpmpeg4audio.Decoder, + pkt *rtp.Packet, + t string, +) ( + int64, + [][]byte, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(aacMedia, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract access unit from RTP packets. + aus, err := aacRTPDec.Decode(pkt) + if err != nil { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + + return pts, aus, nil +} diff --git a/writer/internal/ingest/formats/av1.go b/writer/internal/ingest/formats/av1.go new file mode 100644 index 0000000..962228e --- /dev/null +++ b/writer/internal/ingest/formats/av1.go @@ -0,0 +1,71 @@ +package formats + +import ( + "C" + "errors" + "fmt" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/av1" + "github.com/pion/rtp" + "image" +) + +// FindAV1Format finds the AV1 media and format. +func FindAV1Format(desc *description.Session) (*format.AV1, *description.Media, error) { + var av1Format *format.AV1 + av1Media := desc.FindFormat(&av1Format) + if av1Media == nil { + return nil, nil, errors.New("media AV1 not found") + } + + return av1Format, av1Media, nil +} + +// ProcessAV1 processes AV1 flow and returns PTS and IMG. +func ProcessAV1( + c *gortsplib.Client, + av1Media *description.Media, + av1RTPDec *rtpav1.Decoder, + pkt *rtp.Packet, + av1Dec *AV1Decoder, + firstRandomReceived bool, + t string, +) ( + int64, + *image.RGBA, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(av1Media, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract AV1 temporal units from RTP packets. + tu, err := av1RTPDec.Decode(pkt) + if err != rtpav1.ErrNonStartingPacketAndNoPrevious && err != rtpav1.ErrMorePacketsNeeded { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + + // Wait for a random access unit. + IsRandomAccess2, _ := av1.IsRandomAccess(tu) + if !firstRandomReceived && !IsRandomAccess2 { + return 0, nil, fmt.Errorf("[%v]: waiting for a random access unit\n", t) + } + firstRandomReceived = true + + // Convert AV1 temporal units into RGBA frames. + img, err := av1Dec.Decode(tu) + if err != nil { + return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + } + + // Wait for a frame. + if img == nil { + return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + } + + return pts, img, nil +} diff --git a/writer/internal/ingest/formats/av1_decoder.go b/writer/internal/ingest/formats/av1_decoder.go new file mode 100644 index 0000000..e15ac86 --- /dev/null +++ b/writer/internal/ingest/formats/av1_decoder.go @@ -0,0 +1,161 @@ +package formats + +import ( + "fmt" + "image" + "runtime" + "unsafe" + + "github.com/bluenviron/mediacommon/v2/pkg/codecs/av1" +) + +// #cgo pkg-config: libavcodec libavutil libswscale +// #include +// #include +// #include +import "C" + +func frameDataAV1(frame *C.AVFrame) **C.uint8_t { + return (**C.uint8_t)(unsafe.Pointer(&frame.data[0])) +} + +func frameLineSizeAV1(frame *C.AVFrame) *C.int { + return (*C.int)(unsafe.Pointer(&frame.linesize[0])) +} + +// AV1Decoder is a wrapper around FFmpeg's AV1 decoder. +type AV1Decoder struct { + codecCtx *C.AVCodecContext + yuv420Frame *C.AVFrame + rgbaFrame *C.AVFrame + rgbaFramePtr []uint8 + swsCtx *C.struct_SwsContext +} + +// Initialize initializes a AV1Decoder. +func (d *AV1Decoder) Initialize() error { + codec := C.avcodec_find_decoder(C.AV_CODEC_ID_AV1) + if codec == nil { + return fmt.Errorf("avcodec_find_decoder() failed") + } + + d.codecCtx = C.avcodec_alloc_context3(codec) + if d.codecCtx == nil { + return fmt.Errorf("avcodec_alloc_context3() failed") + } + + res := C.avcodec_open2(d.codecCtx, codec, nil) + if res < 0 { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("avcodec_open2() failed") + } + + d.yuv420Frame = C.av_frame_alloc() + if d.yuv420Frame == nil { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("av_frame_alloc() failed") + } + + return nil +} + +// Close closes the decoder. +func (d *AV1Decoder) Close() { + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + if d.rgbaFrame != nil { + C.av_frame_free(&d.rgbaFrame) + } + + C.av_frame_free(&d.yuv420Frame) + C.avcodec_close(d.codecCtx) +} + +func (d *AV1Decoder) reinitDynamicStuff() error { + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + if d.rgbaFrame != nil { + C.av_frame_free(&d.rgbaFrame) + } + + d.rgbaFrame = C.av_frame_alloc() + if d.rgbaFrame == nil { + return fmt.Errorf("av_frame_alloc() failed") + } + + d.rgbaFrame.format = C.AV_PIX_FMT_RGBA + d.rgbaFrame.width = d.yuv420Frame.width + d.rgbaFrame.height = d.yuv420Frame.height + d.rgbaFrame.color_range = C.AVCOL_RANGE_JPEG + + res := C.av_frame_get_buffer(d.rgbaFrame, 1) + if res < 0 { + return fmt.Errorf("av_frame_get_buffer() failed") + } + + d.swsCtx = C.sws_getContext(d.yuv420Frame.width, d.yuv420Frame.height, int32(d.yuv420Frame.format), + d.rgbaFrame.width, d.rgbaFrame.height, (int32)(d.rgbaFrame.format), C.SWS_BILINEAR, nil, nil, nil) + if d.swsCtx == nil { + return fmt.Errorf("sws_getContext() failed") + } + + rgbaFrameSize := C.av_image_get_buffer_size((int32)(d.rgbaFrame.format), d.rgbaFrame.width, d.rgbaFrame.height, 1) + d.rgbaFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.rgbaFrame.data[0]))[:rgbaFrameSize:rgbaFrameSize] + return nil +} + +// Decode decodes a RGBA image from AV1. +func (d *AV1Decoder) Decode(tu [][]byte) (*image.RGBA, error) { + // encode temporal unit into bytestream + bs, err := av1.Bitstream(tu).Marshal() + if err != nil { + return nil, err + } + + // send temporal unit to decoder + var pkt C.AVPacket + ptr := &bs[0] + var p runtime.Pinner + p.Pin(ptr) + pkt.data = (*C.uint8_t)(ptr) + pkt.size = (C.int)(len(bs)) + res := C.avcodec_send_packet(d.codecCtx, &pkt) + p.Unpin() + if res < 0 { + return nil, nil + } + + // receive frame if available + res = C.avcodec_receive_frame(d.codecCtx, d.yuv420Frame) + if res < 0 { + return nil, nil + } + + // if frame size has changed, allocate needed objects + if d.rgbaFrame == nil || d.rgbaFrame.width != d.yuv420Frame.width || d.rgbaFrame.height != d.yuv420Frame.height { + err := d.reinitDynamicStuff() + if err != nil { + return nil, err + } + } + + // convert color space from YUV420 to RGBA + res = C.sws_scale(d.swsCtx, frameDataAV1(d.yuv420Frame), frameLineSizeAV1(d.yuv420Frame), + 0, d.yuv420Frame.height, frameDataAV1(d.rgbaFrame), frameLineSizeAV1(d.rgbaFrame)) + if res < 0 { + return nil, fmt.Errorf("sws_scale() failed") + } + + // embed frame into an image.RGBA + return &image.RGBA{ + Pix: d.rgbaFramePtr, + Stride: 4 * (int)(d.rgbaFrame.width), + Rect: image.Rectangle{ + Max: image.Point{(int)(d.rgbaFrame.width), (int)(d.rgbaFrame.height)}, + }, + }, nil +} diff --git a/writer/internal/ingest/rtsp/g711.go b/writer/internal/ingest/formats/g711.go similarity index 57% rename from writer/internal/ingest/rtsp/g711.go rename to writer/internal/ingest/formats/g711.go index c9d4454..d0bcea3 100644 --- a/writer/internal/ingest/rtsp/g711.go +++ b/writer/internal/ingest/formats/g711.go @@ -1,4 +1,4 @@ -package rtsp +package formats import ( "errors" @@ -8,11 +8,10 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm" "github.com/pion/rtp" - "log" ) -// CheckG711Format finds the G711 media and format. -func CheckG711Format(desc *description.Session) (*format.G711, *description.Media, error) { +// FindG711Format finds the G711 media and format. +func FindG711Format(desc *description.Session) (*format.G711, *description.Media, error) { var g711Format *format.G711 g711Media := desc.FindFormat(&g711Format) if g711Media == nil { @@ -22,20 +21,20 @@ func CheckG711Format(desc *description.Session) (*format.G711, *description.Medi return g711Format, g711Media, nil } -// ProcG711 processes G711 flow and returns PTS and AU. -func ProcG711(c *gortsplib.Client, g711Media *description.Media, g711RTPDec *rtplpcm.Decoder, pkt *rtp.Packet) ( +// ProcessG711 processes G711 flow and returns PTS and AU. +func ProcessG711(c *gortsplib.Client, g711Media *description.Media, g711RTPDec *rtplpcm.Decoder, pkt *rtp.Packet, t string, +) ( int64, []byte, error) { // Decode timestamp. pts, ok := c.PacketPTS2(g711Media, pkt) if !ok { - log.Printf("waiting for timestamp\n") - return 0, nil, nil + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) } // Extract access unit from RTP packets. au, err := g711RTPDec.Decode(pkt) if err != nil { - return 0, nil, fmt.Errorf("decoding G711 RTP packet: %w", err) + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) } return pts, au, nil diff --git a/writer/internal/ingest/rtsp/pcm_to_aac.go b/writer/internal/ingest/formats/g711_to_aac.go similarity index 92% rename from writer/internal/ingest/rtsp/pcm_to_aac.go rename to writer/internal/ingest/formats/g711_to_aac.go index 2b1ace7..c1eaad1 100644 --- a/writer/internal/ingest/rtsp/pcm_to_aac.go +++ b/writer/internal/ingest/formats/g711_to_aac.go @@ -1,4 +1,4 @@ -package rtsp +package formats import ( "bytes" @@ -7,7 +7,7 @@ import ( "github.com/gen2brain/aac-go" ) -// ConvertG711ToAAC converts PCM to AAC. +// ConvertG711ToAAC converts G711 to AAC. func ConvertG711ToAAC(g711Samples []byte, mulaw bool) ([]byte, error) { var pcmSamples []byte if mulaw { diff --git a/writer/pkg/converter/mpegts_muxer.go b/writer/internal/ingest/formats/h264-aac_muxer.go similarity index 50% rename from writer/pkg/converter/mpegts_muxer.go rename to writer/internal/ingest/formats/h264-aac_muxer.go index 9fba0dc..92a23b5 100644 --- a/writer/pkg/converter/mpegts_muxer.go +++ b/writer/internal/ingest/formats/h264-aac_muxer.go @@ -1,13 +1,14 @@ -package converter +package formats import ( "bufio" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "os" "sync" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediacommon/v2/pkg/codecs/h264" - "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" ) func multiplyAndDivide(v, m, d int64) int64 { @@ -22,19 +23,18 @@ type MpegtsMuxer struct { H264Format *format.H264 Mpeg4AudioFormat *format.MPEG4Audio - f *os.File - b *bufio.Writer - w *mpegts.Writer - h264Track *mpegts.Track - // mpeg4AudioTrack *mpegts.Track - dtsExtractor *h264.DTSExtractor - mutex sync.Mutex + f *os.File + b *bufio.Writer + w *mpegts.Writer + h264Track *mpegts.Track + mpeg4AudioTrack *mpegts.Track + dtsExtractor *h264.DTSExtractor2 + mutex sync.Mutex } // Initialize initializes a MpegtsMuxer. func (e *MpegtsMuxer) Initialize() error { var err error - e.f, err = os.Create(e.FileName) if err != nil { return err @@ -45,27 +45,21 @@ func (e *MpegtsMuxer) Initialize() error { Codec: &mpegts.CodecH264{}, } - //e.mpeg4AudioTrack = &mpegts.Track{ - // Codec: &mpegts.CodecMPEG4Audio{ - // // Config: *e.mpeg4AudioFormat.Config, - // }, - //} + e.mpeg4AudioTrack = &mpegts.Track{ + Codec: &mpegts.CodecMPEG4Audio{ + Config: mpeg4audio.Config{}, + }, + } - e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.h264Track}) // add e.mpeg4AudioTrack + e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.h264Track, e.mpeg4AudioTrack}) return nil } // Close closes all the MpegtsMuxer resources. func (e *MpegtsMuxer) Close() { - err := e.b.Flush() - if err != nil { - panic(err) - } - err = e.f.Close() - if err != nil { - panic(err) - } + e.b.Flush() + e.f.Close() } // WriteH264 writes a H264 access unit into MPEG-TS. @@ -79,29 +73,27 @@ func (e *MpegtsMuxer) WriteH264(au [][]byte, pts int64) error { idrPresent := false for _, nalu := range au { - if len(nalu) != 0 { - typ := h264.NALUType(nalu[0] & 0x1F) - switch typ { - case h264.NALUTypeSPS: - e.H264Format.SPS = nalu - continue + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS: + e.H264Format.SPS = nalu + continue - case h264.NALUTypePPS: - e.H264Format.PPS = nalu - continue + case h264.NALUTypePPS: + e.H264Format.PPS = nalu + continue - case h264.NALUTypeAccessUnitDelimiter: - continue + case h264.NALUTypeAccessUnitDelimiter: + continue - case h264.NALUTypeIDR: - idrPresent = true + case h264.NALUTypeIDR: + idrPresent = true - case h264.NALUTypeNonIDR: - nonIDRPresent = true - } - - filteredAU = append(filteredAU, nalu) + case h264.NALUTypeNonIDR: + nonIDRPresent = true } + + filteredAU = append(filteredAU, nalu) } au = filteredAU @@ -120,7 +112,7 @@ func (e *MpegtsMuxer) WriteH264(au [][]byte, pts int64) error { if !idrPresent { return nil } - e.dtsExtractor = h264.NewDTSExtractor() + e.dtsExtractor = h264.NewDTSExtractor2() } dts, err := e.dtsExtractor.Extract(au, pts) @@ -129,14 +121,13 @@ func (e *MpegtsMuxer) WriteH264(au [][]byte, pts int64) error { } // Encode into MPEG-TS. - return e.w.WriteH264(e.h264Track, pts, dts, au) + return e.w.WriteH2642(e.h264Track, pts, dts, au) } -// writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS. -//func (e *MpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts int64) error { -// e.mutex.Lock() -// defer e.mutex.Unlock() -// -// return e.w.WriteMPEG4Audio( -// e.mpeg4AudioTrack, multiplyAndDivide(pts, 90000, int64(e.Mpeg4AudioFormat.ClockRate())), aus) -//} +// WriteAAC writes MPEG-4 audio access units into MPEG-TS. +func (e *MpegtsMuxer) WriteAAC(aus [][]byte, pts int64) error { + e.mutex.Lock() + defer e.mutex.Unlock() + + return e.w.WriteMPEG4Audio(e.mpeg4AudioTrack, multiplyAndDivide(pts, 90000, int64(8000)), aus) +} diff --git a/writer/internal/ingest/formats/h264.go b/writer/internal/ingest/formats/h264.go new file mode 100644 index 0000000..9cb6670 --- /dev/null +++ b/writer/internal/ingest/formats/h264.go @@ -0,0 +1,96 @@ +package formats + +import ( + "errors" + "fmt" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtph264" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/h264" + "github.com/pion/rtp" + "image" +) + +// FindH264Format finds the H264 media and format. +func FindH264Format(desc *description.Session) (*format.H264, *description.Media, error) { + var h264Format *format.H264 + h264Media := desc.FindFormat(&h264Format) + if h264Media == nil { + return nil, nil, errors.New("media H264 not found") + } + return h264Format, h264Media, nil +} + +// ProcessH264 processes H264 flow and returns PTS and AU. +func ProcessH264( + c *gortsplib.Client, + h264Media *description.Media, + h264RTPDec *rtph264.Decoder, + pkt *rtp.Packet, + t string, +) ( + int64, + [][]byte, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(h264Media, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract access unit from RTP packets. + au, err := h264RTPDec.Decode(pkt) + if err != nil { + if err != rtph264.ErrNonStartingPacketAndNoPrevious && err != rtph264.ErrMorePacketsNeeded { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + } + + return pts, au, nil +} + +// ProcessH264RGBA processes H264 flow and returns PTS and IMG. +func ProcessH264RGBA( + c *gortsplib.Client, + h264Media *description.Media, + h264RTPDec *rtph264.Decoder, + h264Dec *H264Decoder, + pkt *rtp.Packet, + firstRandomAccess bool, + t string, +) ( + int64, + *image.RGBA, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(h264Media, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract access units from RTP packets. + au, err := h264RTPDec.Decode(pkt) + if err != rtph264.ErrNonStartingPacketAndNoPrevious && err != rtph264.ErrMorePacketsNeeded { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + + // Wait for a random access unit. + if !firstRandomAccess && !h264.IsRandomAccess(au) { + return 0, nil, nil + } + firstRandomAccess = true + + // Convert H264 access units into RGBA frames. + img, err := h264Dec.Decode(au) + if err != nil { + return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + } + + // Wait for a frame. + if img == nil { + return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + } + + return pts, img, nil +} diff --git a/writer/internal/ingest/formats/h264_decoder.go b/writer/internal/ingest/formats/h264_decoder.go new file mode 100644 index 0000000..42e651c --- /dev/null +++ b/writer/internal/ingest/formats/h264_decoder.go @@ -0,0 +1,161 @@ +package formats + +import ( + "fmt" + "image" + "runtime" + "unsafe" + + "github.com/bluenviron/mediacommon/v2/pkg/codecs/h264" +) + +// #cgo pkg-config: libavcodec libavutil libswscale +// #include +// #include +// #include +import "C" + +func frameDataH264(frame *C.AVFrame) **C.uint8_t { + return (**C.uint8_t)(unsafe.Pointer(&frame.data[0])) +} + +func frameLineSizeH264(frame *C.AVFrame) *C.int { + return (*C.int)(unsafe.Pointer(&frame.linesize[0])) +} + +// H264Decoder is a wrapper around FFmpeg's H264 decoder. +type H264Decoder struct { + codecCtx *C.AVCodecContext + yuv420Frame *C.AVFrame + rgbaFrame *C.AVFrame + rgbaFramePtr []uint8 + swsCtx *C.struct_SwsContext +} + +// Initialize initializes a H264Decoder. +func (d *H264Decoder) Initialize() error { + codec := C.avcodec_find_decoder(C.AV_CODEC_ID_H264) + if codec == nil { + return fmt.Errorf("avcodec_find_decoder() failed") + } + + d.codecCtx = C.avcodec_alloc_context3(codec) + if d.codecCtx == nil { + return fmt.Errorf("avcodec_alloc_context3() failed") + } + + res := C.avcodec_open2(d.codecCtx, codec, nil) + if res < 0 { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("avcodec_open2() failed") + } + + d.yuv420Frame = C.av_frame_alloc() + if d.yuv420Frame == nil { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("av_frame_alloc() failed") + } + + return nil +} + +// Close closes the decoder. +func (d *H264Decoder) Close() { + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + if d.rgbaFrame != nil { + C.av_frame_free(&d.rgbaFrame) + } + + C.av_frame_free(&d.yuv420Frame) + C.avcodec_close(d.codecCtx) +} + +func (d *H264Decoder) reinitDynamicStuff() error { + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + if d.rgbaFrame != nil { + C.av_frame_free(&d.rgbaFrame) + } + + d.rgbaFrame = C.av_frame_alloc() + if d.rgbaFrame == nil { + return fmt.Errorf("av_frame_alloc() failed") + } + + d.rgbaFrame.format = C.AV_PIX_FMT_RGBA + d.rgbaFrame.width = d.yuv420Frame.width + d.rgbaFrame.height = d.yuv420Frame.height + d.rgbaFrame.color_range = C.AVCOL_RANGE_JPEG + + res := C.av_frame_get_buffer(d.rgbaFrame, 1) + if res < 0 { + return fmt.Errorf("av_frame_get_buffer() failed") + } + + d.swsCtx = C.sws_getContext(d.yuv420Frame.width, d.yuv420Frame.height, int32(d.yuv420Frame.format), + d.rgbaFrame.width, d.rgbaFrame.height, (int32)(d.rgbaFrame.format), C.SWS_BILINEAR, nil, nil, nil) + if d.swsCtx == nil { + return fmt.Errorf("sws_getContext() failed") + } + + rgbaFrameSize := C.av_image_get_buffer_size((int32)(d.rgbaFrame.format), d.rgbaFrame.width, d.rgbaFrame.height, 1) + d.rgbaFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.rgbaFrame.data[0]))[:rgbaFrameSize:rgbaFrameSize] + return nil +} + +// Decode decodes a RGBA image from H264. +func (d *H264Decoder) Decode(au [][]byte) (*image.RGBA, error) { + // Encode access unit into Annex-B. + annexb, err := h264.AnnexB(au).Marshal() + if err != nil { + return nil, err + } + + // Send access unit to decoder. + var pkt C.AVPacket + ptr := &annexb[0] + var p runtime.Pinner + p.Pin(ptr) + pkt.data = (*C.uint8_t)(ptr) + pkt.size = (C.int)(len(annexb)) + res := C.avcodec_send_packet(d.codecCtx, &pkt) + p.Unpin() + if res < 0 { + return nil, nil + } + + // Receive frame if available. + res = C.avcodec_receive_frame(d.codecCtx, d.yuv420Frame) + if res < 0 { + return nil, nil + } + + // If frame size has changed, allocate needed objects. + if d.rgbaFrame == nil || d.rgbaFrame.width != d.yuv420Frame.width || d.rgbaFrame.height != d.yuv420Frame.height { + err := d.reinitDynamicStuff() + if err != nil { + return nil, err + } + } + + // Convert color space from YUV420 to RGBA. + res = C.sws_scale(d.swsCtx, frameDataH264(d.yuv420Frame), frameLineSizeH264(d.yuv420Frame), + 0, d.yuv420Frame.height, frameDataH264(d.rgbaFrame), frameLineSizeH264(d.rgbaFrame)) + if res < 0 { + return nil, fmt.Errorf("sws_scale() failed") + } + + // Embed frame into an image.RGBA. + return &image.RGBA{ + Pix: d.rgbaFramePtr, + Stride: 4 * (int)(d.rgbaFrame.width), + Rect: image.Rectangle{ + Max: image.Point{(int)(d.rgbaFrame.width), (int)(d.rgbaFrame.height)}, + }, + }, nil +} diff --git a/writer/internal/ingest/formats/h265.go b/writer/internal/ingest/formats/h265.go new file mode 100644 index 0000000..8718fa7 --- /dev/null +++ b/writer/internal/ingest/formats/h265.go @@ -0,0 +1,69 @@ +package formats + +import ( + "errors" + "fmt" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtph265" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/h265" + "github.com/pion/rtp" + "image" +) + +// FindH265Format finds the H265 media and format. +func FindH265Format(desc *description.Session) (*format.H265, *description.Media, error) { + var h265Format *format.H265 + h265Media := desc.FindFormat(&h265Format) + if h265Media == nil { + return nil, nil, errors.New("media H265 not found") + } + + return h265Format, h265Media, nil +} + +// ProcessH265RGBA processes H265 flow and returns PTS and IMG. +func ProcessH265RGBA( + c *gortsplib.Client, + h265Media *description.Media, + h265RTPDec *rtph265.Decoder, + h265Dec *H265Decoder, + pkt *rtp.Packet, + firstRandomAccess bool, + t string, +) ( + int64, + *image.RGBA, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(h265Media, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract access units from RTP packets. + au, err := h265RTPDec.Decode(pkt) + if err != rtph265.ErrNonStartingPacketAndNoPrevious && err != rtph265.ErrMorePacketsNeeded { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + + // Wait for a random access unit. + if !firstRandomAccess && !h265.IsRandomAccess(au) { + return 0, nil, nil + } + firstRandomAccess = true + + // Convert H265 access units into RGBA frames. + img, err := h265Dec.Decode(au) + if err != nil { + return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + } + + // Wait for a frame. + if img == nil { + return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + } + + return pts, img, nil +} diff --git a/writer/internal/ingest/formats/h265_decoder.go b/writer/internal/ingest/formats/h265_decoder.go new file mode 100644 index 0000000..ee47987 --- /dev/null +++ b/writer/internal/ingest/formats/h265_decoder.go @@ -0,0 +1,161 @@ +package formats + +import ( + "fmt" + "image" + "runtime" + "unsafe" + + "github.com/bluenviron/mediacommon/v2/pkg/codecs/h264" +) + +// #cgo pkg-config: libavcodec libavutil libswscale +// #include +// #include +// #include +import "C" + +func frameDataH265(frame *C.AVFrame) **C.uint8_t { + return (**C.uint8_t)(unsafe.Pointer(&frame.data[0])) +} + +func frameLineSizeH265(frame *C.AVFrame) *C.int { + return (*C.int)(unsafe.Pointer(&frame.linesize[0])) +} + +// H265Decoder is a wrapper around FFmpeg's H265 decoder. +type H265Decoder struct { + codecCtx *C.AVCodecContext + yuv420Frame *C.AVFrame + rgbaFrame *C.AVFrame + rgbaFramePtr []uint8 + swsCtx *C.struct_SwsContext +} + +// Initialize initializes a H265Decoder. +func (d *H265Decoder) Initialize() error { + codec := C.avcodec_find_decoder(C.AV_CODEC_ID_H265) + if codec == nil { + return fmt.Errorf("avcodec_find_decoder() failed") + } + + d.codecCtx = C.avcodec_alloc_context3(codec) + if d.codecCtx == nil { + return fmt.Errorf("avcodec_alloc_context3() failed") + } + + res := C.avcodec_open2(d.codecCtx, codec, nil) + if res < 0 { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("avcodec_open2() failed") + } + + d.yuv420Frame = C.av_frame_alloc() + if d.yuv420Frame == nil { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("av_frame_alloc() failed") + } + + return nil +} + +// Close closes the decoder. +func (d *H265Decoder) Close() { + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + if d.rgbaFrame != nil { + C.av_frame_free(&d.rgbaFrame) + } + + C.av_frame_free(&d.yuv420Frame) + C.avcodec_close(d.codecCtx) +} + +func (d *H265Decoder) reinitDynamicStuff() error { + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + if d.rgbaFrame != nil { + C.av_frame_free(&d.rgbaFrame) + } + + d.rgbaFrame = C.av_frame_alloc() + if d.rgbaFrame == nil { + return fmt.Errorf("av_frame_alloc() failed") + } + + d.rgbaFrame.format = C.AV_PIX_FMT_RGBA + d.rgbaFrame.width = d.yuv420Frame.width + d.rgbaFrame.height = d.yuv420Frame.height + d.rgbaFrame.color_range = C.AVCOL_RANGE_JPEG + + res := C.av_frame_get_buffer(d.rgbaFrame, 1) + if res < 0 { + return fmt.Errorf("av_frame_get_buffer() failed") + } + + d.swsCtx = C.sws_getContext(d.yuv420Frame.width, d.yuv420Frame.height, int32(d.yuv420Frame.format), + d.rgbaFrame.width, d.rgbaFrame.height, (int32)(d.rgbaFrame.format), C.SWS_BILINEAR, nil, nil, nil) + if d.swsCtx == nil { + return fmt.Errorf("sws_getContext() failed") + } + + rgbaFrameSize := C.av_image_get_buffer_size((int32)(d.rgbaFrame.format), d.rgbaFrame.width, d.rgbaFrame.height, 1) + d.rgbaFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.rgbaFrame.data[0]))[:rgbaFrameSize:rgbaFrameSize] + return nil +} + +// Decode decodes a RGBA image from H265. +func (d *H265Decoder) Decode(au [][]byte) (*image.RGBA, error) { + // encode access unit into Annex-B + annexb, err := h264.AnnexB(au).Marshal() + if err != nil { + return nil, err + } + + // send access unit to decoder + var pkt C.AVPacket + ptr := &annexb[0] + var p runtime.Pinner + p.Pin(ptr) + pkt.data = (*C.uint8_t)(ptr) + pkt.size = (C.int)(len(annexb)) + res := C.avcodec_send_packet(d.codecCtx, &pkt) + p.Unpin() + if res < 0 { + return nil, nil + } + + // receive frame if available + res = C.avcodec_receive_frame(d.codecCtx, d.yuv420Frame) + if res < 0 { + return nil, nil + } + + // if frame size has changed, reallocate needed objects + if d.rgbaFrame == nil || d.rgbaFrame.width != d.yuv420Frame.width || d.rgbaFrame.height != d.yuv420Frame.height { + err := d.reinitDynamicStuff() + if err != nil { + return nil, err + } + } + + // convert color space from YUV420 to RGBA + res = C.sws_scale(d.swsCtx, frameDataH265(d.yuv420Frame), frameLineSizeH265(d.yuv420Frame), + 0, d.yuv420Frame.height, frameDataH265(d.rgbaFrame), frameLineSizeH265(d.rgbaFrame)) + if res < 0 { + return nil, fmt.Errorf("sws_scale() failed") + } + + // embed frame into an image.RGBA + return &image.RGBA{ + Pix: d.rgbaFramePtr, + Stride: 4 * (int)(d.rgbaFrame.width), + Rect: image.Rectangle{ + Max: image.Point{(int)(d.rgbaFrame.width), (int)(d.rgbaFrame.height)}, + }, + }, nil +} diff --git a/writer/internal/ingest/formats/lpcm.go b/writer/internal/ingest/formats/lpcm.go new file mode 100644 index 0000000..25471d0 --- /dev/null +++ b/writer/internal/ingest/formats/lpcm.go @@ -0,0 +1,48 @@ +package formats + +import ( + "errors" + "fmt" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm" + "github.com/pion/rtp" +) + +// FindLPCMFormat finds the LPCM media and format. +func FindLPCMFormat(desc *description.Session) (*format.LPCM, *description.Media, error) { + var lpcmFormat *format.LPCM + lpcmMedia := desc.FindFormat(&lpcmFormat) + if lpcmMedia == nil { + return nil, nil, errors.New("media LPCM not found") + } + + return lpcmFormat, lpcmMedia, nil +} + +// ProcessLPCM processes LPCM flow and returns PTS and SAMPLES. +func ProcessLPCM( + c *gortsplib.Client, + lpcmMedia *description.Media, + lpcmRTPDec *rtplpcm.Decoder, + pkt *rtp.Packet, + t string, +) ( + int64, + []byte, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(lpcmMedia, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract LPCM samples from RTP packets. + samples, err := lpcmRTPDec.Decode(pkt) + if err != nil { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + + return pts, samples, err +} diff --git a/writer/internal/ingest/formats/mjpeg.go b/writer/internal/ingest/formats/mjpeg.go new file mode 100644 index 0000000..15e4b83 --- /dev/null +++ b/writer/internal/ingest/formats/mjpeg.go @@ -0,0 +1,57 @@ +package formats + +import ( + "bytes" + "errors" + "fmt" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmjpeg" + "github.com/pion/rtp" + "image" + "image/jpeg" +) + +// FindMJPEGFormat finds the MJPEG media and format. +func FindMJPEGFormat(desc *description.Session) (*format.MJPEG, *description.Media, error) { + var mjpegFormat *format.MJPEG + mjpegMedia := desc.FindFormat(&mjpegFormat) + if mjpegMedia == nil { + return nil, nil, errors.New("media MJPEG not found") + } + + return mjpegFormat, mjpegMedia, nil +} + +// ProcessMJPEGRGBA processes MJPEG flow and returns PTS and IMG. +func ProcessMJPEGRGBA( + c *gortsplib.Client, + mjpegMedia *description.Media, + mjpegRTPDec *rtpmjpeg.Decoder, + pkt *rtp.Packet, + t string, +) ( + int64, + image.Image, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(mjpegMedia, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract JPEG images from RTP packets. + enc, err := mjpegRTPDec.Decode(pkt) + if err != rtpmjpeg.ErrNonStartingPacketAndNoPrevious && err != rtpmjpeg.ErrMorePacketsNeeded { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + + // Convert JPEG images into RGBA frames. + img, err := jpeg.Decode(bytes.NewReader(enc)) + if err != nil { + return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + } + + return pts, img, nil +} diff --git a/writer/internal/ingest/formats/opus.go b/writer/internal/ingest/formats/opus.go new file mode 100644 index 0000000..b446ee1 --- /dev/null +++ b/writer/internal/ingest/formats/opus.go @@ -0,0 +1,48 @@ +package formats + +import ( + "errors" + "fmt" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpsimpleaudio" + "github.com/pion/rtp" +) + +// FindOPUSFormat finds the OPUS media and format. +func FindOPUSFormat(desc *description.Session) (*format.Opus, *description.Media, error) { + var opusFormat *format.Opus + opusMedia := desc.FindFormat(&opusFormat) + if opusMedia == nil { + return nil, nil, errors.New("media OPUS not found") + } + + return opusFormat, opusMedia, nil +} + +// ProcessOPUS processes OPUS flow and returns PTS and OP. +func ProcessOPUS( + c *gortsplib.Client, + opusMedia *description.Media, + opusRTPDec *rtpsimpleaudio.Decoder, + pkt *rtp.Packet, + t string, +) ( + int64, + []byte, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(opusMedia, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract Opus packets from RTP packets. + op, err := opusRTPDec.Decode(pkt) + if err != nil { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + + return pts, op, nil +} diff --git a/writer/internal/ingest/formats/vp8-vp9_decoder.go b/writer/internal/ingest/formats/vp8-vp9_decoder.go new file mode 100644 index 0000000..71c9d8e --- /dev/null +++ b/writer/internal/ingest/formats/vp8-vp9_decoder.go @@ -0,0 +1,153 @@ +package formats + +import ( + "fmt" + "image" + "runtime" + "unsafe" +) + +// #cgo pkg-config: libavcodec libavutil libswscale +// #include +// #include +// #include +import "C" + +func frameDataVP(frame *C.AVFrame) **C.uint8_t { + return (**C.uint8_t)(unsafe.Pointer(&frame.data[0])) +} + +func frameLineSizeVP(frame *C.AVFrame) *C.int { + return (*C.int)(unsafe.Pointer(&frame.linesize[0])) +} + +// VP8Decoder is a wrapper around FFmpeg's VP8 decoder. +type VPDecoder struct { + codecCtx *C.AVCodecContext + yuv420Frame *C.AVFrame + rgbaFrame *C.AVFrame + rgbaFramePtr []uint8 + swsCtx *C.struct_SwsContext +} + +// Initialize initializes a VP8Decoder. +func (d *VPDecoder) Initialize() error { + codec := C.avcodec_find_decoder(C.AV_CODEC_ID_VP8) + if codec == nil { + return fmt.Errorf("avcodec_find_decoder() failed") + } + + d.codecCtx = C.avcodec_alloc_context3(codec) + if d.codecCtx == nil { + return fmt.Errorf("avcodec_alloc_context3() failed") + } + + res := C.avcodec_open2(d.codecCtx, codec, nil) + if res < 0 { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("avcodec_open2() failed") + } + + d.yuv420Frame = C.av_frame_alloc() + if d.yuv420Frame == nil { + C.avcodec_close(d.codecCtx) + return fmt.Errorf("av_frame_alloc() failed") + } + + return nil +} + +// Close closes the decoder. +func (d *VPDecoder) Close() { + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + if d.rgbaFrame != nil { + C.av_frame_free(&d.rgbaFrame) + } + + C.av_frame_free(&d.yuv420Frame) + C.avcodec_close(d.codecCtx) +} + +func (d *VPDecoder) reinitDynamicStuff() error { + if d.swsCtx != nil { + C.sws_freeContext(d.swsCtx) + } + + if d.rgbaFrame != nil { + C.av_frame_free(&d.rgbaFrame) + } + + d.rgbaFrame = C.av_frame_alloc() + if d.rgbaFrame == nil { + return fmt.Errorf("av_frame_alloc() failed") + } + + d.rgbaFrame.format = C.AV_PIX_FMT_RGBA + d.rgbaFrame.width = d.yuv420Frame.width + d.rgbaFrame.height = d.yuv420Frame.height + d.rgbaFrame.color_range = C.AVCOL_RANGE_JPEG + + res := C.av_frame_get_buffer(d.rgbaFrame, 1) + if res < 0 { + return fmt.Errorf("av_frame_get_buffer() failed") + } + + d.swsCtx = C.sws_getContext(d.yuv420Frame.width, d.yuv420Frame.height, int32(d.yuv420Frame.format), + d.rgbaFrame.width, d.rgbaFrame.height, (int32)(d.rgbaFrame.format), C.SWS_BILINEAR, nil, nil, nil) + if d.swsCtx == nil { + return fmt.Errorf("sws_getContext() failed") + } + + rgbaFrameSize := C.av_image_get_buffer_size((int32)(d.rgbaFrame.format), d.rgbaFrame.width, d.rgbaFrame.height, 1) + d.rgbaFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.rgbaFrame.data[0]))[:rgbaFrameSize:rgbaFrameSize] + return nil +} + +// Decode decodes a RGBA image from VP8. +func (d *VPDecoder) Decode(au []byte) (*image.RGBA, error) { + // send access unit to decoder + var pkt C.AVPacket + ptr := &au[0] + var p runtime.Pinner + p.Pin(ptr) + pkt.data = (*C.uint8_t)(ptr) + pkt.size = (C.int)(len(au)) + res := C.avcodec_send_packet(d.codecCtx, &pkt) + p.Unpin() + if res < 0 { + return nil, nil + } + + // receive frame if available + res = C.avcodec_receive_frame(d.codecCtx, d.yuv420Frame) + if res < 0 { + return nil, nil + } + + // if frame size has changed, allocate needed objects + if d.rgbaFrame == nil || d.rgbaFrame.width != d.yuv420Frame.width || d.rgbaFrame.height != d.yuv420Frame.height { + err := d.reinitDynamicStuff() + if err != nil { + return nil, err + } + } + + // convert color space from YUV420 to RGBA + res = C.sws_scale(d.swsCtx, frameDataVP(d.yuv420Frame), frameLineSizeVP(d.yuv420Frame), + 0, d.yuv420Frame.height, frameDataVP(d.rgbaFrame), frameLineSizeVP(d.rgbaFrame)) + if res < 0 { + return nil, fmt.Errorf("sws_scale() failed") + } + + // embed frame into an image.RGBA + return &image.RGBA{ + Pix: d.rgbaFramePtr, + Stride: 4 * (int)(d.rgbaFrame.width), + Rect: image.Rectangle{ + Max: image.Point{(int)(d.rgbaFrame.width), (int)(d.rgbaFrame.height)}, + }, + }, nil +} diff --git a/writer/internal/ingest/formats/vp8.go b/writer/internal/ingest/formats/vp8.go new file mode 100644 index 0000000..3c1efde --- /dev/null +++ b/writer/internal/ingest/formats/vp8.go @@ -0,0 +1,61 @@ +package formats + +import ( + "errors" + "fmt" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8" + "github.com/pion/rtp" + "image" +) + +// FindVP8Format finds the VP8 media and format. +func FindVP8Format(desc *description.Session) (*format.VP8, *description.Media, error) { + var vp8Format *format.VP8 + vp8Media := desc.FindFormat(&vp8Format) + if vp8Media == nil { + return nil, nil, errors.New("media VP8 not found") + } + + return vp8Format, vp8Media, nil +} + +// ProcessVP8RGBA processes VP8 flow and returns PTS and IMG. +func ProcessVP8RGBA( + c *gortsplib.Client, + vp8Media *description.Media, + vp8RTPDec *rtpvp8.Decoder, + vp8Dec *VPDecoder, + pkt *rtp.Packet, + t string, +) ( + int64, + *image.RGBA, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(vp8Media, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract access units from RTP packets. + au, err := vp8RTPDec.Decode(pkt) + if err != rtpvp8.ErrNonStartingPacketAndNoPrevious && err != rtpvp8.ErrMorePacketsNeeded { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + + // Convert VP8 access units into RGBA frames. + img, err := vp8Dec.Decode(au) + if err != nil { + return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + } + + // Wait for a frame. + if img == nil { + return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + } + + return pts, img, nil +} diff --git a/writer/internal/ingest/formats/vp9.go b/writer/internal/ingest/formats/vp9.go new file mode 100644 index 0000000..842e4eb --- /dev/null +++ b/writer/internal/ingest/formats/vp9.go @@ -0,0 +1,62 @@ +package formats + +import ( + "errors" + "fmt" + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9" + "github.com/pion/rtp" + "image" +) + +// FindVP9Format finds the VP9 media and format. +func FindVP9Format(desc *description.Session) (*format.VP9, *description.Media, error) { + var vp9Format *format.VP9 + vp9Media := desc.FindFormat(&vp9Format) + if vp9Media == nil { + return nil, nil, errors.New("media VP9 not found") + } + + return vp9Format, vp9Media, nil +} + +// ProcessVP9RGBA processes VP9 flow and returns PTS and IMG. +func ProcessVP9RGBA( + c *gortsplib.Client, + vp9Media *description.Media, + vp9RTPDec *rtpvp9.Decoder, + vp9Dec *VPDecoder, + pkt *rtp.Packet, + t string, +) ( + int64, + *image.RGBA, + error) { + // Decode timestamp. + pts, ok := c.PacketPTS2(vp9Media, pkt) + if !ok { + return 0, nil, fmt.Errorf("[%v]: waiting for timestamp\n", t) + } + + // Extract access units from RTP packets. + au, err := vp9RTPDec.Decode(pkt) + if err != rtpvp8.ErrNonStartingPacketAndNoPrevious && err != rtpvp8.ErrMorePacketsNeeded { + return 0, nil, fmt.Errorf("[%v]: decoding RTP packet error: %w", t, err) + } + + // Convert VP8 access units into RGBA frames. + img, err := vp9Dec.Decode(au) + if err != nil { + return 0, nil, fmt.Errorf("[%v]: convert into RGBA frames error\n", t) + } + + // Wait for a frame. + if img == nil { + return 0, nil, fmt.Errorf("[%v]: frame not found\n", t) + } + + return pts, img, nil +} diff --git a/writer/internal/ingest/rtsp/h264.go b/writer/internal/ingest/rtsp/h264.go deleted file mode 100644 index 885fcad..0000000 --- a/writer/internal/ingest/rtsp/h264.go +++ /dev/null @@ -1,43 +0,0 @@ -package rtsp - -import ( - "errors" - "fmt" - "github.com/bluenviron/gortsplib/v4" - "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/format/rtph264" - "github.com/pion/rtp" - "log" -) - -// CheckH264Format finds the H264 media and format. -func CheckH264Format(desc *description.Session) (*format.H264, *description.Media, error) { - var h264Format *format.H264 - h264Media := desc.FindFormat(&h264Format) - if h264Media == nil { - return nil, nil, errors.New("media H264 not found") - } - return h264Format, h264Media, nil -} - -// ProcH264 processes H264 flow and returns PTS and AU. -func ProcH264(c *gortsplib.Client, h264Media *description.Media, h264RTPDec *rtph264.Decoder, pkt *rtp.Packet) ( - int64, [][]byte, error) { - // Decode timestamp. - pts, ok := c.PacketPTS2(h264Media, pkt) - if !ok { - log.Printf("waiting for timestamp\n") - return 0, nil, nil - } - - // Extract access unit from RTP packets. - au, err := h264RTPDec.Decode(pkt) - if err != nil { - if err != rtph264.ErrNonStartingPacketAndNoPrevious && err != rtph264.ErrMorePacketsNeeded { - return 0, nil, fmt.Errorf("decoding H264 RTP packet: %w", err) - } - } - - return pts, au, nil -} diff --git a/writer/internal/ingest/rtsp/rtsp.go b/writer/internal/ingest/rtsp/rtsp.go index c27ef3b..e5057a5 100644 --- a/writer/internal/ingest/rtsp/rtsp.go +++ b/writer/internal/ingest/rtsp/rtsp.go @@ -3,30 +3,29 @@ package rtsp import ( "errors" "fmt" + "log" + "os" + "strings" + "time" + + "git.insit.tech/psa/rtsp_reader-writer/writer/internal/ingest/formats" "git.insit.tech/psa/rtsp_reader-writer/writer/internal/storage" - "git.insit.tech/psa/rtsp_reader-writer/writer/pkg/converter" "git.insit.tech/sas/rtsp_proxy/protos/gens" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" "github.com/pion/rtp" _ "github.com/zaf/g711" - "log" - "os" - "strings" - "time" ) // StartWriter starts the program. -func StartWriter(dir string, period int, URI string) error { - err := RTSP(dir, period, URI) +func StartWriter(dir string, period int, link string) error { + err := RTSP(dir, period, link) if err != nil { - - // Temporary solution for inner cameras. - // // Change domain if a camera was flipped to another domain. - err = changeDomain(dir, period, URI, err) + err = changeDomain(dir, period, link, err) if err != nil { return fmt.Errorf("change domain error: %w", err) } @@ -37,20 +36,6 @@ func StartWriter(dir string, period int, URI string) error { // RTSP processes RTSP protocol. func RTSP(dir string, period int, link string) error { - resolutions := []string{"1280x720"} - - // Create file name structure and directory for files. - cuttedURI := storage.CutURI(link) - fn := storage.CreateFileName(dir, resolutions, cuttedURI, period) - - err := os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755) - if err != nil { - return fmt.Errorf("mkdirall error: %w", err) - } - - // Create M3U8 playlist. - go gens.MediaPlaylistGenerator(dir+"/data/"+cuttedURI, "", fn.Duration, resolutions) - // Connect to the server. c := gortsplib.Client{ UserAgent: "PSA", @@ -67,167 +52,995 @@ func RTSP(dir string, period int, link string) error { } defer c.Close() - desc, r, err := c.Describe(u) + desc, res, err := c.Describe(u) if err != nil || desc == nil { log.Printf("medias not found for camera [%s]: %v", link, err) return nil } - /////////////// + // Create file name structure and directory for files. + resolution := storage.FindResolution(res.Body) + resolutions := []string{resolution} + + cuttedURI := storage.CutURI(link) + fn := storage.CreateFileName(dir, resolutions, cuttedURI, period) + + err = os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755) + if err != nil { + return fmt.Errorf("mkdirall error: %w", err) + } + + // Create M3U8 playlist. + go gens.MediaPlaylistGenerator(dir+"/data/"+cuttedURI, "", fn.Duration, resolutions) // Find formats. - //var run bool - h264Format, h264Media, err := CheckH264Format(desc) - if err != nil { - log.Printf("H264 format not found: %v", err) - //run = true + var audioFormat string + var videoFormat string + + av1Format, av1Media, err := formats.FindAV1Format(desc) + if av1Format != nil { + log.Println("[av1]: format found") + videoFormat = "AV1" + } else { + log.Println(err) } - //g711Format, g711Media, err := CheckG711Format(desc) - //if err != nil { - // log.Printf("G711 format not found: %v", err) - // // run = true - //} - - // Initialising variable for AAC. - // var mpeg4AudioFormat *format.MPEG4Audio - - //////////////////////////////////////////////////////////////////////////////////////// - // Create RTP -> H264 decoder. - h264RTPDec, err := h264Format.CreateDecoder() - if err != nil { - log.Printf("create H264 decoder error: %v", err) + g711Format, g711Media, err := formats.FindG711Format(desc) + if g711Format != nil { + log.Println("[g711]: format found") + audioFormat = "G711" + } else { + log.Println(err) } - // Create RTP -> H264 decoder. - //g711RTPDec, err := g711Format.CreateDecoder() - //if err != nil { - // log.Printf("create G711 decoder error: %v", err) - //} - - //////////////////////////////////////////////////////////////////////////////////////// - - // Wait for the next period. - storage.WaitPeriod(period) - log.Println("Start recording") - - //////////////////////////////////////////////////////////////////////////////////////// - - // Setup MPEG-TS muxer. - currentMpegtsMuxer := converter.MpegtsMuxer{ - FileName: fn.SetNumNTime(), - H264Format: h264Format, - // Mpeg4AudioFormat: mpeg4AudioFormat, + h264Format, h264Media, err := formats.FindH264Format(desc) + if h264Format != nil { + log.Println("[h264]: format found") + videoFormat = "H264" + } else { + log.Println(err) } - err = currentMpegtsMuxer.Initialize() - if err != nil { - panic(err) - } - defer currentMpegtsMuxer.Close() - - //////////////////////////////////////////////////////////////////////////////////////// - - // Setup all medias. - err = c.SetupAll(desc.BaseURL, desc.Medias) - if err != nil { - panic(err) + aacFormat, aacMedia, err := formats.FindAACFormat(desc) + if aacFormat != nil { + log.Println("[aac]: format found") + audioFormat = "AAC" + } else { + log.Println(err) } - //////////////////////////////////////////////////////////////////////////////////////// + h265Format, h265Media, err := formats.FindH265Format(desc) + if h265Format != nil { + log.Println("[h265]: format found") + videoFormat = "H265" + } else { + log.Println(err) + } - // Called when a H264/RTP or G711/RTP packet arrives. - c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { - switch forma.(type) { - case *format.H264: - // Process H264 flow and return PTS and AU. - pts, au, err := ProcH264(&c, h264Media, h264RTPDec, pkt) + lpcmFormat, lpcmMedia, err := formats.FindLPCMFormat(desc) + if lpcmFormat != nil { + log.Println("[lpcm]: format found") + audioFormat = "LPCM" + } else { + log.Println(err) + } + + mjpegFormat, mjpegMedia, err := formats.FindMJPEGFormat(desc) + if mjpegFormat != nil { + log.Println("[mjpeg]: format found") + videoFormat = "MJPEG" + } else { + log.Println(err) + } + + opusFormat, opusMedia, err := formats.FindOPUSFormat(desc) + if opusFormat != nil { + log.Println("[opus]: format found") + audioFormat = "OPUS" + } else { + log.Println(err) + } + + vp8Format, vp8Media, err := formats.FindVP8Format(desc) + if vp8Format != nil { + log.Println("[vp8]: format found") + videoFormat = "VP8" + } else { + log.Println(err) + } + + vp9Format, vp9Media, err := formats.FindVP9Format(desc) + if vp9Format != nil { + log.Println("[vp9]: format found") + videoFormat = "VP9" + } else { + log.Println(err) + } + + // Start program according to gotten formats. + switch { + case videoFormat == "AV1" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) + + // Create decoder. + av1RTPDec, err := av1Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } + + av1Dec := &formats.AV1Decoder{} + err = av1Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer av1Dec.Close() + + // Setup media. + _, err = c.Setup(desc.BaseURL, av1Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } + + firstRandomReceived := false + + // Process input rtp packets. + c.OnPacketRTP(av1Media, av1Format, func(pkt *rtp.Packet) { + // Process AV1 flow and return PTS and IMG. + pts, img, err := formats.ProcessAV1(&c, av1Media, av1RTPDec, pkt, av1Dec, firstRandomReceived, videoFormat) if err != nil { - //log.Printf("%s: process H264 error: %s\n", cuttedURI, err) + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return } - // Encode the access unit into MPEG-TS. - err = currentMpegtsMuxer.WriteH264(au, pts) + log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) + + case videoFormat == "" && audioFormat == "G711": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) + + // Create decoder. + g711RTPDec, err := g711Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } + + // Setup media. + _, err = c.Setup(desc.BaseURL, g711Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w\n", audioFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(g711Media, g711Format, func(pkt *rtp.Packet) { + // Process G711 flow and return PTS and AU. + pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, audioFormat) if err != nil { - //log.Printf("%s: writing H264 packet: %s\n", cuttedURI, err) + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return } - case *format.G711: - //// Process G711 flow and returns PTS and AU. - //_, _, err := media.ProcG711(&c, g711Media, g711RTPDec, pkt) - //if err != nil { - // log.Printf("%s: process G711 error: %s\n", cuttedURI, err) - //} + // Decode samples (these are 16-bit, big endian LPCM samples). + if g711Format.MULaw { + g711.DecodeMulaw(au) + } else { + g711.DecodeAlaw(au) + } - //// Convert G711 to AAC. - //_, err = converter.ConvertG711ToAAC(au, f.MULaw) // take aacAu - //if err != nil { - // log.Printf("%s: converting G711 to AAC frame: %s\n", cuttedURI, err) - //} + log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(au)) + }) - /* - // Encode the access unit into MPEG-TS. - err = MpegtsMuxer.writeMPEG4Audio([][]byte{aacAu}, pts) + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) + + case videoFormat == "H264" && audioFormat == "AAC": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v-%v]: start recording", videoFormat, audioFormat) + + // Create decoders. + h264RTPDec, err := h264Format.CreateDecoder() + if err != nil { + log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + } + + aacRTPDec, err := aacFormat.CreateDecoder() + if err != nil { + log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + } + + // Setup MPEG-TS muxer. + currentMpegtsMuxer := formats.MpegtsMuxer{ + FileName: fn.SetNumNTime(), + H264Format: h264Format, + Mpeg4AudioFormat: aacFormat, + } + + err = currentMpegtsMuxer.Initialize() + if err != nil { + return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) + } + + // Setup all medias. + err = c.SetupAll(desc.BaseURL, desc.Medias) + if err != nil { + return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { + switch forma.(type) { + case *format.H264: + // Process H264 flow and return PTS and AU. + pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat) if err != nil { - log.Printf("MPEG-TS write error: %v", err) + log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) return } - */ + + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteH264(au, pts) + if err != nil { + return + } + + case *format.MPEG4Audio: + // Process AAC flow and return PTS and AUS. + pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) + if err != nil { + log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) + return + } + + // Encode access units into MPEG-TS. + err = currentMpegtsMuxer.WriteAAC(aus, pts) + if err != nil { + return + } + + } + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err) } - }) - //////////////////////////////////////////////////////////////////////////////////////// + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() - // Send PLAY request. - _, err = c.Play(nil) - if err != nil { - log.Fatalln("Ошибка запуска воспроизведения:", err) - } + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + currentMpegtsMuxer.Close() + currentMpegtsMuxer.FileName = fn.SetNumNTime() - // Create ticker for rotation files. - ticker := time.NewTicker(time.Duration(period) * time.Second) - defer ticker.Stop() + err = currentMpegtsMuxer.Initialize() + if err != nil { + log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) + return + } - // Rotate files. - go func() { - for range ticker.C { - currentMpegtsMuxer.Close() - currentMpegtsMuxer.FileName = fn.SetNumNTime() + log.Printf("[%v-%v]: new file for recording created: %v", + videoFormat, audioFormat, currentMpegtsMuxer.FileName) + } + }() - err = currentMpegtsMuxer.Initialize() + panic(c.Wait()) + + case videoFormat == "H264" && audioFormat == "G711": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v-%v]: start recording", videoFormat, audioFormat) + + // Create decoders. + h264RTPDec, err := h264Format.CreateDecoder() + if err != nil { + log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + } + + g711RTPDec, err := g711Format.CreateDecoder() + if err != nil { + log.Printf("[%v-%v]: create decoder error: %v\n", videoFormat, audioFormat, err) + } + + // Setup MPEG-TS muxer. + var aacFormat *format.MPEG4Audio + + currentMpegtsMuxer := formats.MpegtsMuxer{ + FileName: fn.SetNumNTime(), + H264Format: h264Format, + Mpeg4AudioFormat: aacFormat, + } + + err = currentMpegtsMuxer.Initialize() + if err != nil { + return fmt.Errorf("[%v-%v]: init muxer error: %w\n", videoFormat, audioFormat, err) + } + + // Setup all medias. + err = c.SetupAll(desc.BaseURL, desc.Medias) + if err != nil { + return fmt.Errorf("[%v-%v]: setup media error: %w\n", videoFormat, audioFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { + switch f := forma.(type) { + case *format.H264: + // Process H264 flow and return PTS and AU. + pts, au, err := formats.ProcessH264(&c, h264Media, h264RTPDec, pkt, videoFormat) + if err != nil { + log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) + return + } + + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteH264(au, pts) + if err != nil { + return + } + + case *format.G711: + // Process G711 flow and returns PTS and AU. + pts, au, err := formats.ProcessG711(&c, g711Media, g711RTPDec, pkt, videoFormat) + if err != nil { + log.Printf("[%v-%v]: process packet error: %v\n", videoFormat, audioFormat, err) + return + } + + // Convert G711 to AAC. + au, err = formats.ConvertG711ToAAC(au, f.MULaw) + if err != nil { + log.Printf("[%v-%v]: converting to AAC frame error: %v\n", videoFormat, audioFormat, err) + } + + // Encode the access unit into MPEG-TS. + err = currentMpegtsMuxer.WriteAAC([][]byte{au}, pts) + if err != nil { + return + } + } + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v-%v]: sending PLAY request erorr: %w", videoFormat, audioFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + currentMpegtsMuxer.Close() + currentMpegtsMuxer.FileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.Initialize() + if err != nil { + log.Printf("[%v-%v]: init muxer error: %v\n", videoFormat, audioFormat, err) + return + } + + log.Printf("[%v-%v]: new file for recording created: %v", + videoFormat, audioFormat, currentMpegtsMuxer.FileName) + } + }() + + panic(c.Wait()) + + case videoFormat == "H264" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) + + // Create decoder. + h264RTPDec, err := h264Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } + + // Setup H264 -> RGBA decoder. + h264Dec := &formats.H264Decoder{} + err = h264Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer h264Dec.Close() + + // if SPS and PPS are present into the SDP, send them to the decoder + if h264Format.SPS != nil { + h264Dec.Decode([][]byte{h264Format.SPS}) + } + if h264Format.PPS != nil { + h264Dec.Decode([][]byte{h264Format.PPS}) + } + + // Setup media. + _, err = c.Setup(desc.BaseURL, h264Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } + + firstRandomAccess := false + + // Process input rtp packets. + c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) { + // Process H264 flow and return PTS and IMG. + pts, img, err := formats.ProcessH264RGBA( + &c, h264Media, h264RTPDec, h264Dec, pkt, firstRandomAccess, videoFormat) if err != nil { - panic(err) + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return } - log.Println("New file for recording created:", currentMpegtsMuxer.FileName) - } - }() + log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + }) - // Restart program if client gets TEARDOWN request. - c.OnRequest = func(req *base.Request) { - if req.Method == base.Teardown { - log.Printf("got TEARDOWN request from server: %v", req) - - //err = procRTSP(period, URI) - //if err != nil { - // log.Fatalf("restart RTSP error: %s\n", err) - //} + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) + + case videoFormat == "H265" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) + + // Create decoder. + h265RTPDec, err := h265Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } + + // Setup H264 -> RGBA decoder. + h265Dec := &formats.H265Decoder{} + err = h265Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer h265Dec.Close() + + // If VPS, SPS and PPS are present into the SDP, send them to the decoder. + if h265Format.VPS != nil { + h265Dec.Decode([][]byte{h265Format.VPS}) + } + if h265Format.SPS != nil { + h265Dec.Decode([][]byte{h265Format.SPS}) + } + if h265Format.PPS != nil { + h265Dec.Decode([][]byte{h265Format.PPS}) + } + + // Setup media. + _, err = c.Setup(desc.BaseURL, h265Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } + + firstRandomAccess := false + + // Process input rtp packets. + c.OnPacketRTP(h265Media, h265Format, func(pkt *rtp.Packet) { + // Process H265 flow and return PTS and IMG. + pts, img, err := formats.ProcessH265RGBA( + &c, h265Media, h265RTPDec, h265Dec, pkt, firstRandomAccess, videoFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return + } + + log.Printf("[%v]: decoded frame with PTS %v and size %v\n", videoFormat, pts, img.Bounds().Max) + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) + + case videoFormat == "" && audioFormat == "LPCM": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) + + // Create decoder. + lpcmRTPDec, err := lpcmFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } + + // Setup media. + _, err = c.Setup(desc.BaseURL, lpcmMedia, 0, 0) + if err != nil { + log.Printf("[%v]: setup media error: %v\n", audioFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(lpcmMedia, lpcmFormat, func(pkt *rtp.Packet) { + // Process LPCM flow and return PTS and SAMPLES. + pts, samples, err := formats.ProcessLPCM(&c, lpcmMedia, lpcmRTPDec, pkt, audioFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } + + log.Printf("[%v]: decoded audio samples with PTS %v and size %d\n", audioFormat, pts, len(samples)) + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) + + case videoFormat == "MJPEG" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) + + // Create decoder. + mjpegRTPDec, err := mjpegFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } + + // Setup media. + _, err = c.Setup(desc.BaseURL, mjpegMedia, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(mjpegMedia, mjpegFormat, func(pkt *rtp.Packet) { + // Process MJPEG flow and return PTS and IMG. + pts, img, err := formats.ProcessMJPEGRGBA(&c, mjpegMedia, mjpegRTPDec, pkt, videoFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", videoFormat, err) + return + } + + log.Printf("[%v]: decoded image with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) + + case videoFormat == "" && audioFormat == "AAC": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) + + // Create decoder. + aacRTPDec, err := aacFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } + + // Setup media. + _, err = c.Setup(desc.BaseURL, aacMedia, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(aacMedia, aacFormat, func(pkt *rtp.Packet) { + // Process AAC flow and return PTS and AUS. + pts, aus, err := formats.ProcessAAC(&c, aacMedia, aacRTPDec, pkt, audioFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } + + for _, au := range aus { + log.Printf("[%v]: received access unit with PTS %v size %d\n", audioFormat, pts, len(au)) + } + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) + + case videoFormat == "" && audioFormat == "OPUS": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", audioFormat) + + // Create decoder. + opusRTPDec, err := opusFormat.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", audioFormat, err) + } + + // Setup media. + _, err = c.Setup(desc.BaseURL, opusMedia, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", audioFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(opusMedia, opusFormat, func(pkt *rtp.Packet) { + // Process OPUS flow and return PTS and OP. + pts, op, err := formats.ProcessOPUS(&c, opusMedia, opusRTPDec, pkt, audioFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } + + log.Printf("[%v]: received OPUS packet with PTS %v size %d\n", audioFormat, pts, len(op)) + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", audioFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) + + case videoFormat == "VP8" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) + + // Create decoder. + vp8RTPDec, err := vp8Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } + + // Setup VP8 -> RGBA decoder. + vp8Dec := &formats.VPDecoder{} + err = vp8Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer vp8Dec.Close() + + // Setup media. + _, err = c.Setup(desc.BaseURL, vp8Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(vp8Media, vp8Format, func(pkt *rtp.Packet) { + // Process VP8 flow and return PTS and IMG. + pts, img, err := formats.ProcessVP8RGBA(&c, vp8Media, vp8RTPDec, vp8Dec, pkt, videoFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } + + log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) + + case videoFormat == "VP9" && audioFormat == "": + // Wait for the next period. + storage.WaitPeriod(period) + log.Printf("[%v]: start recording", videoFormat) + + // Create decoder. + vp9RTPDec, err := vp9Format.CreateDecoder() + if err != nil { + log.Printf("[%v]: create decoder error: %v\n", videoFormat, err) + } + + // Setup VP9 -> RGBA decoder. + vp9Dec := &formats.VPDecoder{} + err = vp9Dec.Initialize() + if err != nil { + log.Printf("[%v]: init decoder error: %v\n", videoFormat, err) + } + defer vp9Dec.Close() + + // Setup media. + _, err = c.Setup(desc.BaseURL, vp9Media, 0, 0) + if err != nil { + return fmt.Errorf("[%v]: setup media error: %w", videoFormat, err) + } + + // Process input rtp packets. + c.OnPacketRTP(vp9Media, vp9Format, func(pkt *rtp.Packet) { + // Process VP9 flow and return PTS and IMG. + pts, img, err := formats.ProcessVP9RGBA(&c, vp9Media, vp9RTPDec, vp9Dec, pkt, videoFormat) + if err != nil { + log.Printf("[%v]: process packet error: %v\n", audioFormat, err) + return + } + + log.Printf("[%v]: decoded frame with PTS %v and size %v", videoFormat, pts, img.Bounds().Max) + }) + + // Start playing. + _, err = c.Play(nil) + if err != nil { + return fmt.Errorf("[%v]: sending PLAY request erorr: %w", videoFormat, err) + } + + // Create ticker for rotation files. + ticker := time.NewTicker(time.Duration(period) * time.Second) + defer ticker.Stop() + + // Rotate files. + go func() { + for range ticker.C { + // Logic for rotation files. + /* + currentMpegtsMuxer.close() + currentMpegtsMuxer.fileName = fn.SetNumNTime() + + err = currentMpegtsMuxer.initialize() + if err != nil { + panic(err) + } + + log.Println("New file for recording created:", currentMpegtsMuxer.fileName) + */ + } + }() + + panic(c.Wait()) } - panic(c.Wait()) + return nil } // changeDomain changes domain if a camera was flipped to another domain. -func changeDomain(directory string, period int, URI string, err error) error { +func changeDomain(dir string, period int, link string, err error) error { err2 := errors.New("404 (Not found)") if errors.As(err, &err2) { - if strings.Contains(URI, "video-1") { - err = RTSP(directory, period, strings.Replace(URI, "video-1", "video-2", 1)) + if strings.Contains(link, "video-1") { + err = RTSP(dir, period, strings.Replace(link, "video-1", "video-2", 1)) if err != nil { return err } } else { - err = RTSP(directory, period, strings.Replace(URI, "video-2", "video-1", 1)) + err = RTSP(dir, period, strings.Replace(link, "video-2", "video-1", 1)) if err != nil { return err } diff --git a/writer/internal/storage/editor.go b/writer/internal/storage/editor.go index b076304..afdd262 100644 --- a/writer/internal/storage/editor.go +++ b/writer/internal/storage/editor.go @@ -38,7 +38,7 @@ func WaitPeriod(period int) { } func FindResolution(Body []byte) string { - split := strings.Split(string(Body), "\n") + split := strings.Split(string(Body), "\r\n") for _, line := range split { if strings.Contains(line, "a=x-dimensions:") { s, _ := strings.CutPrefix(line, "a=x-dimensions:")