diff --git a/internal/recorder/format_mpegts.go b/internal/recorder/format_mpegts.go index e5000ceb7b9..a32cafe5782 100644 --- a/internal/recorder/format_mpegts.go +++ b/internal/recorder/format_mpegts.go @@ -5,6 +5,7 @@ import ( "bytes" "fmt" "io" + "sync" "time" rtspformat "github.com/bluenviron/gortsplib/v4/pkg/format" @@ -60,9 +61,21 @@ type formatMPEGTS struct { mw *mpegts.Writer hasVideo bool currentSegment *formatMPEGTSSegment + mu sync.Mutex // protects all fields below } func (f *formatMPEGTS) initialize() bool { + // Check if this is an MPEG-TS passthrough stream + for _, media := range f.ri.stream.Desc.Medias { + for _, forma := range media.Formats { + if _, ok := forma.(*rtspformat.MPEGTS); ok { + f.ri.stream.AddReader(f.ri, media, forma, f.writePassthrough) + return true + } + } + } + + // Normal MPEG-TS muxing for non-passthrough streams var tracks []*mpegts.Track var setuppedFormats []rtspformat.Format setuppedFormatsMap := make(map[rtspformat.Format]struct{}) @@ -480,11 +493,83 @@ func (f *formatMPEGTS) initialize() bool { } func (f *formatMPEGTS) close() { + f.mu.Lock() + defer f.mu.Unlock() + + // Close the current segment if it exists if f.currentSegment != nil { f.currentSegment.close() //nolint:errcheck + f.currentSegment = nil + } + + // Flush any remaining data in the buffer + if f.bw != nil { + _ = f.bw.Flush() } } +func (f *formatMPEGTS) writePassthrough(u unit.Unit) error { + tunit := u.(*unit.Generic) + if len(tunit.RTPPackets) == 0 { + return nil + } + + f.mu.Lock() + defer f.mu.Unlock() + + // Initialize writers if needed + if f.dw == nil { + f.dw = &dynamicWriter{} + } + if f.bw == nil { + f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize) + } + + for _, pkt := range tunit.RTPPackets { + if pkt.Payload == nil { + continue + } + + // Create new segment if needed + if f.currentSegment == nil { + f.currentSegment = &formatMPEGTSSegment{ + f: f, + startDTS: timestampToDuration(int64(pkt.Timestamp), 90000), + startNTP: tunit.NTP, + } + f.currentSegment.initialize() + } + + // Write to current segment + _, err := f.currentSegment.Write(pkt.Payload) + if err != nil { + f.ri.Log(logger.Warn, "error writing to segment: %v", err) + continue + } + + // Update segment timing + f.currentSegment.lastDTS = timestampToDuration(int64(pkt.Timestamp), 90000) + + // Check if we need to rotate the segment + if (f.currentSegment.lastDTS - f.currentSegment.startDTS) >= f.ri.segmentDuration { + // Close the current segment + if err := f.currentSegment.close(); err != nil { + f.ri.Log(logger.Error, "error closing segment: %v", err) + } + f.currentSegment = nil + + // Flush the buffer writer + if f.bw != nil { + if err := f.bw.Flush(); err != nil { + f.ri.Log(logger.Error, "error flushing buffer: %v", err) + } + } + } + } + + return nil +} + func (f *formatMPEGTS) write( dts time.Duration, ntp time.Time, diff --git a/internal/recorder/recorder_test.go b/internal/recorder/recorder_test.go index f80899b57f5..147d5e313b5 100644 --- a/internal/recorder/recorder_test.go +++ b/internal/recorder/recorder_test.go @@ -1,12 +1,15 @@ package recorder import ( + "bytes" "fmt" "os" "path/filepath" "testing" "time" + "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4/pkg/description" rtspformat "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/v2/pkg/codecs/h265" @@ -543,6 +546,164 @@ func TestRecorderSkipTracksFull(t *testing.T) { } } +func TestRecorderMPEGTSPassthrough(t *testing.T) { + // Create a test MPEG-TS packet + testData := []byte{ + 0x47, 0x40, 0x00, 0x10, 0x00, // TS header with PID 0x1000 (video) + // Add some dummy payload - this is a PAT (Program Association Table) + 0x00, 0x00, 0xB0, 0x0D, 0x00, 0x00, 0xC1, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + } + + t.Run("basic", func(t *testing.T) { + // Create a temporary directory for the test + dir, err := os.MkdirTemp("", "mediamtx-agent") + require.NoError(t, err) + defer os.RemoveAll(dir) + + // Create the output directory + recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") + + // Create a stream with MPEG-TS format + mpegtsFormat := &rtspformat.MPEGTS{} + + desc := &description.Session{Medias: []*description.Media{ + { + Type: description.MediaTypeVideo, + Formats: []rtspformat.Format{mpegtsFormat}, + }, + }} + + strm := &stream.Stream{ + WriteQueueSize: 512, + UDPMaxPayloadSize: 1472, + Desc: desc, + GenerateRTPPackets: false, // Disable RTP packet generation + Parent: test.NilLogger, + } + err = strm.Initialize() + require.NoError(t, err) + + // Create a recorder with MPEG-TS format + r := &Recorder{ + PathFormat: recordPath, + Format: conf.RecordFormatMPEGTS, + PartDuration: 1 * time.Second, + SegmentDuration: 2 * time.Second, + PathName: "mypath", + Stream: strm, + Parent: test.NilLogger, + } + + // Create a buffered channel to receive data from the stream + + // Buffer channel to prevent blocking on data send + dataChan := make(chan []byte, 100) + + // Add reader before initializing to avoid race with recorder's own reader + rtpReader := func(u unit.Unit) error { + rtpPackets := u.GetRTPPackets() + + for _, pkt := range rtpPackets { + if pkt == nil { + continue + } + + // Send the payload to the channel + if len(pkt.Payload) > 0 { + select { + case dataChan <- pkt.Payload: + default: + // Drop if channel is full + } + } + } + + return nil + } + + strm.AddReader(r, desc.Medias[0], mpegtsFormat, rtpReader) + + // Initialize the recorder + r.Initialize() + + // Verify the recorder instance was created + require.NotNil(t, r.currentInstance, "Recorder instance is nil") + + // Start the reader after the recorder is fully initialized + strm.StartReader(r) + strm.WaitRunningReader() + + // Ensure cleanup happens in the correct order + t.Cleanup(func() { + time.Sleep(100 * time.Millisecond) // Allow time for writes to complete + r.Close() + strm.Close() + }) + + // Write multiple RTP packets to ensure we have enough data + for i := 0; i < 10; i++ { + // Create a new RTP packet for each iteration with updated timestamp and sequence number + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 33, // Standard MPEG-TS payload type (RFC 2250) + SequenceNumber: 100 + uint16(i), + Timestamp: 123456 + uint32(i*90000), // 1 second apart + SSRC: 0x9D8F, + }, + Payload: testData, + } + + // Use the MPEG-TS format directly + internalFormat := mpegtsFormat + + // Write the RTP packet to the stream + strm.WriteRTPPacket(desc.Medias[0], internalFormat, pkt, time.Now(), 0) + } + + // Check if the recorder instance was properly initialized + if r.currentInstance == nil { + t.Fatal("Recorder currentInstance is nil") + } + + // Give some time for all writes to complete + time.Sleep(200 * time.Millisecond) + + // Verify the recording was created + entries, err := os.ReadDir(filepath.Join(dir, "mypath")) + require.NoError(t, err) + + // Find the .ts file + var tsFile string + for _, entry := range entries { + if filepath.Ext(entry.Name()) == ".ts" { + tsFile = filepath.Join(dir, "mypath", entry.Name()) + break + } + } + + if tsFile == "" { + t.Fatalf("No .ts file found in %s. Directory contents: %v", filepath.Join(dir, "mypath"), entries) + } + + // Verify the file contains our test data + data, err := os.ReadFile(tsFile) + require.NoError(t, err) + require.Greater(t, len(data), 0, "Recorded file is empty") + require.True(t, bytes.Contains(data, testData), "Test data not found in output") + + // Give some time for all writes to complete before test ends + time.Sleep(200 * time.Millisecond) + }) +} + func TestRecorderFMP4SegmentSwitch(t *testing.T) { desc := &description.Session{Medias: []*description.Media{ {