Skip to content

Commit c24827f

Browse files
committed
Implement KLV datachanel for WebRTC
This implementation adds comprehensive KLV (Key-Length-Value) metadata support to MediaMTX's WebRTC functionality. KLV metadata is transmitted via WebRTC data channels, allowing real-time delivery of telemetry and metadata alongside video and audio streams.
1 parent 97d2fff commit c24827f

File tree

4 files changed

+465
-1
lines changed

4 files changed

+465
-1
lines changed

internal/protocols/webrtc/from_stream.go

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/bluenviron/gortsplib/v4/pkg/description"
910
"github.com/bluenviron/gortsplib/v4/pkg/format"
1011
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1"
1112
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
@@ -638,6 +639,111 @@ func setupAudioTrack(
638639
return nil, nil
639640
}
640641

642+
// setupKLVDataChannel sets up KLV metadata transmission via WebRTC data channel
643+
func setupKLVDataChannel(
644+
stream *stream.Stream,
645+
reader stream.Reader,
646+
pc *PeerConnection,
647+
) (format.Format, error) {
648+
// Look for KLV format in the stream (using Generic format with KLV RTPMap)
649+
var klvFormat format.Format
650+
var klvMedia *description.Media
651+
652+
for _, media := range stream.Desc.Medias {
653+
if media == nil {
654+
continue
655+
}
656+
for _, forma := range media.Formats {
657+
// Check for Generic format with KLV RTPMap
658+
if genericFmt, ok := forma.(*format.Generic); ok {
659+
if genericFmt.RTPMap() == "KLV/90000" {
660+
klvFormat = genericFmt
661+
klvMedia = media
662+
break
663+
}
664+
}
665+
666+
// Check for KLV format (using type assertion)
667+
if _, ok := forma.(*format.KLV); ok {
668+
klvFormat = forma
669+
klvMedia = media
670+
break
671+
}
672+
673+
// Also check for internal KLV format by codec name
674+
if forma.Codec() == "KLV" {
675+
klvFormat = forma
676+
klvMedia = media
677+
break
678+
}
679+
}
680+
if klvFormat != nil {
681+
break
682+
}
683+
}
684+
685+
if klvFormat == nil {
686+
// No KLV format found, return nil without error
687+
return nil, nil
688+
}
689+
690+
if reader != nil {
691+
reader.Log(logger.Info, "setting up KLV metadata transmission via WebRTC data channel")
692+
}
693+
694+
// Setup the actual WebRTC data channel (with error recovery)
695+
err := pc.setupKLVDataChannel()
696+
if err != nil {
697+
if reader != nil {
698+
reader.Log(logger.Debug, "KLV data channel creation failed: %v", err)
699+
}
700+
// Return nil format to indicate KLV is not available, but don't fail the entire setup
701+
return nil, nil
702+
}
703+
704+
// Add reader for KLV data and send via data channel
705+
stream.AddReader(
706+
reader,
707+
klvMedia,
708+
klvFormat,
709+
func(u unit.Unit) error {
710+
// Handle both Generic and KLV units
711+
var klvData []byte
712+
713+
switch tunit := u.(type) {
714+
case *unit.Generic:
715+
// Extract KLV data from Generic unit RTP packets
716+
if tunit.RTPPackets != nil {
717+
for _, pkt := range tunit.RTPPackets {
718+
klvData = append(klvData, pkt.Payload...)
719+
}
720+
}
721+
case *unit.KLV:
722+
// Extract KLV data from KLV unit
723+
if tunit.Unit != nil {
724+
klvData = append(klvData, tunit.Unit...)
725+
}
726+
default:
727+
return nil // Unknown unit type, skip
728+
}
729+
730+
if len(klvData) == 0 {
731+
return nil
732+
}
733+
734+
// Send KLV data through WebRTC data channel
735+
err := pc.SendKLVData(klvData)
736+
if err != nil {
737+
reader.Log(logger.Debug, "failed to send KLV data via data channel: %v", err)
738+
// Don't return error to avoid breaking the stream
739+
}
740+
741+
return nil
742+
})
743+
744+
return klvFormat, nil
745+
}
746+
641747
// FromStream maps a MediaMTX stream to a WebRTC connection
642748
func FromStream(
643749
stream *stream.Stream,
@@ -658,10 +764,26 @@ func FromStream(
658764
return errNoSupportedCodecsFrom
659765
}
660766

767+
// Setup KLV metadata handling via data channel (non-blocking)
768+
klvFormat, err := setupKLVDataChannel(stream, reader, pc)
769+
if err != nil {
770+
if reader != nil {
771+
reader.Log(logger.Debug, "KLV data channel setup skipped: %v", err)
772+
}
773+
// Don't treat KLV setup failure as a fatal error
774+
klvFormat = nil
775+
}
776+
661777
n := 1
662778
for _, media := range stream.Desc.Medias {
779+
if media == nil {
780+
continue
781+
}
663782
for _, forma := range media.Formats {
664-
if forma != videoFormat && forma != audioFormat {
783+
if forma == nil {
784+
continue
785+
}
786+
if forma != videoFormat && forma != audioFormat && forma != klvFormat {
665787
reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())
666788
}
667789
n++
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package webrtc
2+
3+
import (
4+
"testing"
5+
6+
"github.com/bluenviron/gortsplib/v4/pkg/description"
7+
"github.com/bluenviron/gortsplib/v4/pkg/format"
8+
"github.com/pion/rtp"
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/bluenviron/mediamtx/internal/logger"
12+
"github.com/bluenviron/mediamtx/internal/stream"
13+
"github.com/bluenviron/mediamtx/internal/unit"
14+
)
15+
16+
// mockLogger implements logger.Writer for testing
17+
type mockLogger struct{}
18+
19+
func (l *mockLogger) Log(_ logger.Level, _ string, _ ...interface{}) {}
20+
21+
// TestKLVFormatDetection tests that KLV formats are properly detected
22+
func TestKLVFormatDetection(t *testing.T) {
23+
tests := []struct {
24+
name string
25+
formats []format.Format
26+
expectsKLV bool
27+
}{
28+
{
29+
name: "KLV format detected",
30+
formats: []format.Format{
31+
&format.KLV{},
32+
},
33+
expectsKLV: true,
34+
},
35+
{
36+
name: "Generic KLV format detected",
37+
formats: []format.Format{
38+
&format.Generic{
39+
PayloadTyp: 96,
40+
RTPMa: "KLV/90000",
41+
},
42+
},
43+
expectsKLV: true,
44+
},
45+
{
46+
name: "No KLV format",
47+
formats: []format.Format{
48+
&format.H264{PayloadTyp: 96},
49+
},
50+
expectsKLV: false,
51+
},
52+
}
53+
54+
for _, tt := range tests {
55+
t.Run(tt.name, func(t *testing.T) {
56+
streamDesc := &description.Session{
57+
Medias: []*description.Media{
58+
{
59+
Type: description.MediaTypeApplication,
60+
Formats: tt.formats,
61+
},
62+
},
63+
}
64+
65+
mockStream := &stream.Stream{
66+
WriteQueueSize: 512,
67+
RTPMaxPayloadSize: 1450,
68+
Desc: streamDesc,
69+
GenerateRTPPackets: false,
70+
Parent: &mockLogger{},
71+
}
72+
73+
err := mockStream.Initialize()
74+
require.NoError(t, err)
75+
defer mockStream.Close()
76+
77+
// Test KLV detection logic (same as in setupKLVDataChannel)
78+
var klvFormat format.Format
79+
for _, media := range mockStream.Desc.Medias {
80+
for _, forma := range media.Formats {
81+
if genericFmt, ok := forma.(*format.Generic); ok {
82+
if genericFmt.RTPMap() == "KLV/90000" {
83+
klvFormat = genericFmt
84+
break
85+
}
86+
}
87+
if _, ok := forma.(*format.KLV); ok {
88+
klvFormat = forma
89+
break
90+
}
91+
if forma.Codec() == "KLV" {
92+
klvFormat = forma
93+
break
94+
}
95+
}
96+
if klvFormat != nil {
97+
break
98+
}
99+
}
100+
101+
if tt.expectsKLV {
102+
require.NotNil(t, klvFormat, "Expected to find KLV format")
103+
} else {
104+
require.Nil(t, klvFormat, "Expected not to find KLV format")
105+
}
106+
})
107+
}
108+
}
109+
110+
// TestKLVUnitHandling tests that different KLV unit types are handled correctly
111+
func TestKLVUnitHandling(t *testing.T) {
112+
tests := []struct {
113+
name string
114+
unit unit.Unit
115+
expectedData []byte
116+
}{
117+
{
118+
name: "KLV unit with data",
119+
unit: &unit.KLV{
120+
Unit: []byte{0x06, 0x0E, 0x2B, 0x34},
121+
},
122+
expectedData: []byte{0x06, 0x0E, 0x2B, 0x34},
123+
},
124+
{
125+
name: "KLV unit without data",
126+
unit: &unit.KLV{
127+
Unit: nil,
128+
},
129+
expectedData: nil,
130+
},
131+
{
132+
name: "Generic unit with RTP packets",
133+
unit: &unit.Generic{
134+
Base: unit.Base{
135+
RTPPackets: []*rtp.Packet{
136+
{Payload: []byte{0x06, 0x0E}},
137+
{Payload: []byte{0x2B, 0x34}},
138+
},
139+
},
140+
},
141+
expectedData: []byte{0x06, 0x0E, 0x2B, 0x34},
142+
},
143+
}
144+
145+
for _, tt := range tests {
146+
t.Run(tt.name, func(t *testing.T) {
147+
// Simulate the unit handling logic from setupKLVDataChannel
148+
var klvData []byte
149+
150+
switch tunit := tt.unit.(type) {
151+
case *unit.Generic:
152+
if tunit.RTPPackets != nil {
153+
for _, pkt := range tunit.RTPPackets {
154+
klvData = append(klvData, pkt.Payload...)
155+
}
156+
}
157+
case *unit.KLV:
158+
if tunit.Unit != nil {
159+
klvData = append(klvData, tunit.Unit...)
160+
}
161+
}
162+
163+
require.Equal(t, tt.expectedData, klvData)
164+
})
165+
}
166+
}
167+
168+
// TestSetupKLVDataChannelIntegration tests that KLV setup doesn't break normal WebRTC flow
169+
func TestSetupKLVDataChannelIntegration(t *testing.T) {
170+
// Test that setupKLVDataChannel can be called without breaking anything
171+
streamDesc := &description.Session{
172+
Medias: []*description.Media{
173+
{
174+
Type: description.MediaTypeApplication,
175+
Formats: []format.Format{
176+
&format.KLV{},
177+
},
178+
},
179+
},
180+
}
181+
182+
mockStream := &stream.Stream{
183+
WriteQueueSize: 512,
184+
RTPMaxPayloadSize: 1450,
185+
Desc: streamDesc,
186+
GenerateRTPPackets: false,
187+
Parent: &mockLogger{},
188+
}
189+
190+
err := mockStream.Initialize()
191+
require.NoError(t, err)
192+
defer mockStream.Close()
193+
194+
// Create a mock peer connection (nil is fine for this test)
195+
pc := &PeerConnection{}
196+
197+
// This should not panic or return an error, just return nil format
198+
klvFormat, err := setupKLVDataChannel(mockStream, &mockLogger{}, pc)
199+
require.NoError(t, err)
200+
require.Nil(t, klvFormat) // Should be nil due to defensive handling
201+
}

0 commit comments

Comments
 (0)