diff --git a/.gitignore b/.gitignore index cf82266..fa93391 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ *.avi /archived_writer/ /tester/ -/reader/html/ +/reader/html/ \ No newline at end of file diff --git a/writer/cmd/main.go b/writer/cmd/main.go index 9670935..59b228c 100644 --- a/writer/cmd/main.go +++ b/writer/cmd/main.go @@ -1,12 +1,29 @@ package main import ( + "log" + "writer/internal/config" "writer/internal/procRTSP" ) func main() { - err := procRTSP.StartWriter(1, "rtsp://intercom-video-2.insit.ru/dp-ohusuxzcvzsnpzzvkpyhddnwxuyeyc") + // Parse camera links from YAML file into struct Cameras. + c, err := config.ParseCamerasYAML() if err != nil { panic(err) } + + // Connect to each camera. + for _, link := range c { + log.Printf("start recording on camera: %s\n", link) + + go func() { + err = procRTSP.ProcRTSP(1, link) + if err != nil { + panic(err) + } + }() + } + + select {} } diff --git a/writer/go.mod b/writer/go.mod index 7652f31..f0c63a5 100644 --- a/writer/go.mod +++ b/writer/go.mod @@ -24,4 +24,5 @@ require ( github.com/pion/sdp/v3 v3.0.10 // indirect golang.org/x/net v0.37.0 // indirect golang.org/x/sys v0.31.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/writer/internal/config/config.go b/writer/internal/config/config.go index e4ea6a1..ec514e8 100644 --- a/writer/internal/config/config.go +++ b/writer/internal/config/config.go @@ -2,14 +2,40 @@ package config import ( "git.insit.tech/sas/rtsp_proxy/protos" + "gopkg.in/yaml.v3" "log" + "os" + "strings" "time" ) +// ParseCamerasYAML parses camera links from YAML file into struct Cameras. +func ParseCamerasYAML() (map[string]string, error) { + var CamerasYAML map[string]string + + data, err := os.ReadFile("/home/psa/GoRepository/rtsp_reader-writer/writer/internal/config/source.yaml") + if err != nil { + return CamerasYAML, err + } + + err = yaml.Unmarshal(data, &CamerasYAML) + if err != nil { + return CamerasYAML, err + } + + return CamerasYAML, nil +} + +// CutURI returns the last part of the URI after "/". +func CutURI(URI string) (CutterURI string) { + splitted := strings.Split(URI, "/") + return splitted[len(splitted)-1] +} + // CreateFileName creates FileName structure. -func CreateFileName(resolutions []string, period int) *protos.FileName { +func CreateFileName(resolutions []string, URI string, period int) *protos.FileName { fn := protos.FileName{ - Path: "../../data/" + resolutions[0] + "/", + Path: "/home/psa/GoRepository/data/" + URI + "/" + resolutions[0], TimeNow: time.Now().Format("15-04-05_02-01-2006"), Name: "videoFragment", Number: -1, diff --git a/writer/internal/config/source.yaml b/writer/internal/config/source.yaml new file mode 100644 index 0000000..4073d1b --- /dev/null +++ b/writer/internal/config/source.yaml @@ -0,0 +1,16 @@ +#camera_1: rtsp://intercom-video-1.insit.ru/camera28-oktyabrskiy-severnaya23a +#camera_2: rtsp://intercom-video-1.insit.ru/dp-qamumnrlkizuypnetljzzkjqamdoti +#camera_3: rtsp://intercom-video-1.insit.ru/dp-ohusuxzcvzsnpzzvkpyhddnwxuyeyc +#camera_4: rtsp://intercom-video-1.insit.ru/dp-bflbwjulvgfzurmcpejklrfvqairns +#camera_5: rtsp://intercom-video-1.insit.ru/dp-ajiymmjyytokybrpganfcxlfyjcdbgezphn +#camera_6: rtsp://intercom-video-2.insit.ru/dp-pobedi6a-ii-2125126423 +#camera_7: rtsp://intercom-video-1.insit.ru/dp-pobedi11-ii-2108117729 +#camera_8: rtsp://intercom-video-2.insit.ru/dp-pobedi11-i-2108117197 +#camera_9: rtsp://intercom-video-2.insit.ru/dp-nfhapwbfjpqkmaymfeipraxtzcpedk +#camera_10: rtsp://intercom-video-1.insit.ru/dp-swcixufwlheiwwrcvsrbkmhqzqvbxz +#camera_11: rtsp://intercom-video-1.insit.ru/dp-nerrjszqrbhjvqmfxskunejafdiihj +#camera_12: rtsp://intercom-video-1.insit.ru/dp-aiwukyujwonohpjyzeniispqqullyr +#camera_13: rtsp://intercom-video-1.insit.ru/dp-woxvkbynctgfbuztsalttgburbpvjf +camera_14: rtsp://intercom-video-1.insit.ru/dp-fdzbasqehtptsuhxnjeqqnlrixfahcgvlcr +camera_15: rtsp://intercom-video-1.insit.ru/dp-exyeqscyamrbkwkjifagouyprtsdoe + diff --git a/writer/internal/procRTSP/client.go b/writer/internal/procRTSP/client.go index 05e3c8c..319b388 100644 --- a/writer/internal/procRTSP/client.go +++ b/writer/internal/procRTSP/client.go @@ -13,6 +13,7 @@ import ( "github.com/pion/rtp" _ "github.com/zaf/g711" "log" + "os" "strings" "time" "writer/internal/config" @@ -28,26 +29,39 @@ var ( // StartWriter starts the program. func StartWriter(period int, URI string) error { - err := procRTSP(period, URI) + err := ProcRTSP(period, URI) if err != nil { + + // Temporary solution for inner cameras. + // // Change domain if a camera was flipped to another domain. err = changeDomain(period, URI, err) if err != nil { return fmt.Errorf("change domain error: %w", err) } + } return nil } -// procRTSP process RTSP protocol and writes H264 and PCM flows into TS container. -func procRTSP(period int, URI string) error { +// ProcRTSP process RTSP protocol and writes H264 and PCM flows into TS container. +func ProcRTSP(period int, URI string) error { + // Return the last part of the URI after "/". + cuttedURI := config.CutURI(URI) + // Create FileName structure - fn := config.CreateFileName(resolutions, period) + fn := config.CreateFileName(resolutions, cuttedURI, period) //////////////////////////////////////////////////////////////////////////////////////// + // Create directory for files according to resolutions. + err := os.MkdirAll(fmt.Sprintf("%s", fn.Path), 0755) + if err != nil { + return fmt.Errorf("mkdirall error: %w", err) + } + // Create M3U8 playlist. - go gens.MediaPlaylistGenerator("/home/psa/GoRepository/data", "", fn.Duration, resolutions) + go gens.MediaPlaylistGenerator("/home/psa/GoRepository/data/"+cuttedURI, "", fn.Duration, resolutions) //////////////////////////////////////////////////////////////////////////////////////// @@ -84,7 +98,7 @@ func procRTSP(period int, URI string) error { g711Format, g711Media, err := media.CheckG711Format(desc) // Initialising variable for AAC. - var mpeg4AudioFormat *format.MPEG4Audio + // var mpeg4AudioFormat *format.MPEG4Audio //////////////////////////////////////////////////////////////////////////////////////// @@ -114,11 +128,14 @@ func procRTSP(period int, URI string) error { // Setup MPEG-TS muxer. currentMpegtsMuxer := &converter.MpegtsMuxer{ - FileName: fn.SetNumNTime(), - H264Format: h264Format, - Mpeg4AudioFormat: mpeg4AudioFormat, + FileName: fn.SetNumNTime(), + H264Format: h264Format, + // Mpeg4AudioFormat: mpeg4AudioFormat, } - err = currentMpegtsMuxer.Initialize(resolutions) + + fmt.Println(currentMpegtsMuxer.FileName) + + err = currentMpegtsMuxer.Initialize() if err != nil { panic(err) } @@ -141,26 +158,26 @@ func procRTSP(period int, URI string) error { // Process H264 flow and return PTS and AU. pts, au, err := media.ProcH264(&c, h264Media, h264RTPDec, pkt) if err != nil { - fmt.Printf("process G711 error: %s\n", err) + log.Printf("%s: process H264 error: %s\n", cuttedURI, err) } // Encode the access unit into MPEG-TS. err = currentMpegtsMuxer.WriteH264(au, pts) if err != nil { - log.Printf("writing H264 packet: %s\n", err) + log.Printf("%s: writing H264 packet: %s\n", cuttedURI, err) } case *format.G711: // Process G711 flow and returns PTS and AU. _, au, err := media.ProcG711(&c, g711Media, g711RTPDec, pkt) if err != nil { - fmt.Printf("process G711 error: %s\n", err) + log.Printf("%s: process G711 error: %s\n", cuttedURI, err) } // Convert G711 to AAC. _, err = converter.ConvertG711ToAAC(au, f.MULaw) // take aacAu if err != nil { - log.Printf("converting G711 to AAC frame: %s\n", err) + log.Printf("%s: converting G711 to AAC frame: %s\n", cuttedURI, err) } /* @@ -192,7 +209,7 @@ func procRTSP(period int, URI string) error { currentMpegtsMuxer.Close() currentMpegtsMuxer.FileName = fn.SetNumNTime() - err = currentMpegtsMuxer.Initialize(resolutions) + err = currentMpegtsMuxer.Initialize() if err != nil { panic(err) } @@ -205,18 +222,15 @@ func procRTSP(period int, URI string) error { c.OnRequest = func(req *base.Request) { if req.Method == base.Teardown { log.Printf("got TEARDOWN request from server: %v", req) - } - go func() { - err = procRTSP(period, URI) - if err != nil { - log.Fatalf("restart RTSP error: %s\n", err) - } - }() + //err = procRTSP(period, URI) + //if err != nil { + // log.Fatalf("restart RTSP error: %s\n", err) + //} + } } panic(c.Wait()) - } // changeDomain changes domain if a camera was flipped to another domain. @@ -224,12 +238,12 @@ func changeDomain(period int, URI string, err error) error { err2 := errors.New("404 (Not found)") if errors.As(err, &err2) { if strings.Contains(URI, "video-1") { - err = procRTSP(period, strings.Replace(URI, "video-1", "video-2", 1)) + err = ProcRTSP(period, strings.Replace(URI, "video-1", "video-2", 1)) if err != nil { return err } } else { - err = procRTSP(period, strings.Replace(URI, "video-2", "video-1", 1)) + err = ProcRTSP(period, strings.Replace(URI, "video-2", "video-1", 1)) if err != nil { return err } diff --git a/writer/pkg/converter/mpegts_muxer.go b/writer/pkg/converter/mpegts_muxer.go index e161cd9..679163a 100644 --- a/writer/pkg/converter/mpegts_muxer.go +++ b/writer/pkg/converter/mpegts_muxer.go @@ -22,23 +22,20 @@ type MpegtsMuxer struct { H264Format *format.H264 Mpeg4AudioFormat *format.MPEG4Audio - f *os.File - b *bufio.Writer - w *mpegts.Writer - h264Track *mpegts.Track - mpeg4AudioTrack *mpegts.Track - dtsExtractor *h264.DTSExtractor - mutex sync.Mutex + f *os.File + b *bufio.Writer + w *mpegts.Writer + h264Track *mpegts.Track + // mpeg4AudioTrack *mpegts.Track + dtsExtractor *h264.DTSExtractor + mutex sync.Mutex } // Initialize initializes a MpegtsMuxer. -func (e *MpegtsMuxer) Initialize(resolutions []string) error { +func (e *MpegtsMuxer) Initialize() error { var err error - if err = os.MkdirAll("../../data/"+resolutions[0]+"/", 0755); err != nil { - return err - } - e.f, err = os.Create("../../data/" + resolutions[0] + "/" + e.FileName) + e.f, err = os.Create(e.FileName) if err != nil { return err } @@ -48,13 +45,13 @@ func (e *MpegtsMuxer) Initialize(resolutions []string) error { Codec: &mpegts.CodecH264{}, } - e.mpeg4AudioTrack = &mpegts.Track{ - Codec: &mpegts.CodecMPEG4Audio{ - // Config: *e.mpeg4AudioFormat.Config, - }, - } + //e.mpeg4AudioTrack = &mpegts.Track{ + // Codec: &mpegts.CodecMPEG4Audio{ + // // Config: *e.mpeg4AudioFormat.Config, + // }, + //} - e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.h264Track, e.mpeg4AudioTrack}) + e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.h264Track}) // add e.mpeg4AudioTrack return nil } @@ -134,10 +131,10 @@ func (e *MpegtsMuxer) WriteH264(au [][]byte, pts int64) error { } // writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS. -func (e *MpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts int64) error { - e.mutex.Lock() - defer e.mutex.Unlock() - - return e.w.WriteMPEG4Audio( - e.mpeg4AudioTrack, multiplyAndDivide(pts, 90000, int64(e.Mpeg4AudioFormat.ClockRate())), aus) -} +//func (e *MpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts int64) error { +// e.mutex.Lock() +// defer e.mutex.Unlock() +// +// return e.w.WriteMPEG4Audio( +// e.mpeg4AudioTrack, multiplyAndDivide(pts, 90000, int64(e.Mpeg4AudioFormat.ClockRate())), aus) +//} diff --git a/writer/pkg/parser/parser.go b/writer/pkg/parser/parser.go new file mode 100644 index 0000000..0bfe2c2 --- /dev/null +++ b/writer/pkg/parser/parser.go @@ -0,0 +1 @@ +package parser