Skip to content

Commit 78bf75c

Browse files
committed
Implement KLV datachanel for WebRTC
This implementation adds comprehensive KLV (Key-Length-Value) metadata support to MediaMTX's WebRTC functionality. KLV metadata is transmitted via WebRTC data channels, allowing real-time delivery of telemetry and metadata alongside video and audio streams.
1 parent 97d2fff commit 78bf75c

File tree

3 files changed

+224
-1
lines changed

3 files changed

+224
-1
lines changed

internal/protocols/webrtc/from_stream.go

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/bluenviron/gortsplib/v4/pkg/description"
910
"github.com/bluenviron/gortsplib/v4/pkg/format"
1011
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1"
1112
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
@@ -638,6 +639,105 @@ func setupAudioTrack(
638639
return nil, nil
639640
}
640641

642+
// setupKLVDataChannel sets up KLV metadata transmission via WebRTC data channel
643+
func setupKLVDataChannel(
644+
stream *stream.Stream,
645+
reader stream.Reader,
646+
pc *PeerConnection,
647+
) (format.Format, error) {
648+
// Look for KLV format in the stream (using Generic format with KLV RTPMap)
649+
var klvFormat format.Format
650+
var klvMedia *description.Media
651+
652+
for _, media := range stream.Desc.Medias {
653+
if media == nil {
654+
continue
655+
}
656+
reader.Log(logger.Debug, "checking media type: %s", media.Type)
657+
for _, forma := range media.Formats {
658+
reader.Log(logger.Debug, "checking format: %T, codec: %s", forma, forma.Codec())
659+
660+
// Check for Generic format with KLV RTPMap
661+
if genericFmt, ok := forma.(*format.Generic); ok {
662+
reader.Log(logger.Debug, "found Generic format with RTPMap: %s", genericFmt.RTPMap())
663+
if genericFmt.RTPMap() == "KLV/90000" {
664+
klvFormat = genericFmt
665+
klvMedia = media
666+
break
667+
}
668+
}
669+
670+
// Check for KLV format (using type assertion)
671+
if _, ok := forma.(*format.KLV); ok {
672+
reader.Log(logger.Debug, "found KLV format")
673+
klvFormat = forma
674+
klvMedia = media
675+
break
676+
}
677+
678+
// Also check for internal KLV format by codec name
679+
if forma.Codec() == "KLV" {
680+
reader.Log(logger.Debug, "found format with KLV codec")
681+
klvFormat = forma
682+
klvMedia = media
683+
break
684+
}
685+
}
686+
if klvFormat != nil {
687+
break
688+
}
689+
}
690+
691+
if klvFormat == nil {
692+
// No KLV format found, return nil without error
693+
return nil, nil
694+
}
695+
696+
reader.Log(logger.Info, "setting up KLV metadata transmission via WebRTC data channel")
697+
698+
// Add reader for KLV data and send via data channel
699+
stream.AddReader(
700+
reader,
701+
klvMedia,
702+
klvFormat,
703+
func(u unit.Unit) error {
704+
// Handle both Generic and KLV units
705+
var klvData []byte
706+
707+
switch tunit := u.(type) {
708+
case *unit.Generic:
709+
// Extract KLV data from Generic unit RTP packets
710+
if tunit.RTPPackets != nil {
711+
for _, pkt := range tunit.RTPPackets {
712+
klvData = append(klvData, pkt.Payload...)
713+
}
714+
}
715+
case *unit.KLV:
716+
// Extract KLV data from KLV unit
717+
if tunit.Unit != nil {
718+
klvData = append(klvData, tunit.Unit...)
719+
}
720+
default:
721+
return nil // Unknown unit type, skip
722+
}
723+
724+
if len(klvData) == 0 {
725+
return nil
726+
}
727+
728+
// Send KLV data through WebRTC data channel
729+
err := pc.SendKLVData(klvData)
730+
if err != nil {
731+
reader.Log(logger.Debug, "failed to send KLV data via data channel: %v", err)
732+
// Don't return error to avoid breaking the stream
733+
}
734+
735+
return nil
736+
})
737+
738+
return klvFormat, nil
739+
}
740+
641741
// FromStream maps a MediaMTX stream to a WebRTC connection
642742
func FromStream(
643743
stream *stream.Stream,
@@ -658,10 +758,22 @@ func FromStream(
658758
return errNoSupportedCodecsFrom
659759
}
660760

761+
// Setup KLV metadata handling via data channel
762+
klvFormat, err := setupKLVDataChannel(stream, reader, pc)
763+
if err != nil {
764+
reader.Log(logger.Warn, "failed to setup KLV data channel: %v", err)
765+
}
766+
661767
n := 1
662768
for _, media := range stream.Desc.Medias {
769+
if media == nil {
770+
continue
771+
}
663772
for _, forma := range media.Formats {
664-
if forma != videoFormat && forma != audioFormat {
773+
if forma == nil {
774+
continue
775+
}
776+
if forma != videoFormat && forma != audioFormat && forma != klvFormat {
665777
reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())
666778
}
667779
n++

internal/protocols/webrtc/peer_connection.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ type PeerConnection struct {
108108
ctx context.Context
109109
ctxCancel context.CancelFunc
110110
incomingTracks []*IncomingTrack
111+
112+
// KLV data channel support
113+
klvDataChannel *webrtc.DataChannel
114+
klvChannelReady chan struct{}
111115
}
112116

113117
// Start starts the peer connection.
@@ -239,6 +243,7 @@ func (co *PeerConnection) Start() error {
239243
co.closed = make(chan struct{})
240244
co.gatheringDone = make(chan struct{})
241245
co.incomingTrack = make(chan trackRecvPair)
246+
co.klvChannelReady = make(chan struct{})
242247

243248
co.ctx, co.ctxCancel = context.WithCancel(context.Background())
244249

@@ -250,7 +255,20 @@ func (co *PeerConnection) Start() error {
250255
return err
251256
}
252257
}
258+
259+
// Setup KLV data channel for publishing
260+
err = co.setupKLVDataChannel()
261+
if err != nil {
262+
co.wr.GracefulClose() //nolint:errcheck
263+
return err
264+
}
253265
} else {
266+
// Setup KLV data channel for reading (WHEP)
267+
err = co.setupKLVDataChannel()
268+
if err != nil {
269+
co.wr.GracefulClose() //nolint:errcheck
270+
return err
271+
}
254272
_, err = co.wr.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
255273
Direction: webrtc.RTPTransceiverDirectionRecvonly,
256274
})
@@ -356,6 +374,72 @@ func (co *PeerConnection) Close() {
356374
<-co.closed
357375
}
358376

377+
// setupKLVDataChannel sets up a data channel for KLV metadata transmission
378+
func (co *PeerConnection) setupKLVDataChannel() error {
379+
// Create data channel for KLV metadata
380+
dataChannelInit := &webrtc.DataChannelInit{
381+
Ordered: &[]bool{true}[0], // Ensure ordered delivery
382+
MaxRetransmits: &[]uint16{3}[0], // Allow some retransmissions for reliability
383+
}
384+
385+
var err error
386+
co.klvDataChannel, err = co.wr.CreateDataChannel("klv", dataChannelInit)
387+
if err != nil {
388+
return fmt.Errorf("failed to create KLV data channel: %w", err)
389+
}
390+
391+
// Set up data channel event handlers
392+
co.klvDataChannel.OnOpen(func() {
393+
co.Log.Log(logger.Info, "KLV data channel opened")
394+
close(co.klvChannelReady)
395+
})
396+
397+
co.klvDataChannel.OnClose(func() {
398+
co.Log.Log(logger.Info, "KLV data channel closed")
399+
})
400+
401+
co.klvDataChannel.OnError(func(err error) {
402+
co.Log.Log(logger.Warn, "KLV data channel error: %v", err)
403+
})
404+
405+
return nil
406+
}
407+
408+
// SendKLVData sends KLV metadata through the data channel
409+
func (co *PeerConnection) SendKLVData(klvData []byte) error {
410+
if co.klvDataChannel == nil {
411+
return fmt.Errorf("KLV data channel not initialized")
412+
}
413+
414+
// Wait for channel to be ready (with timeout)
415+
select {
416+
case <-co.klvChannelReady:
417+
// Channel is ready
418+
case <-time.After(5 * time.Second):
419+
return fmt.Errorf("timeout waiting for KLV data channel to be ready")
420+
case <-co.ctx.Done():
421+
return fmt.Errorf("context cancelled")
422+
}
423+
424+
// Check if channel is still open
425+
if co.klvDataChannel.ReadyState() != webrtc.DataChannelStateOpen {
426+
return fmt.Errorf("KLV data channel is not open (state: %s)", co.klvDataChannel.ReadyState())
427+
}
428+
429+
// Send the KLV data
430+
err := co.klvDataChannel.Send(klvData)
431+
if err != nil {
432+
return fmt.Errorf("failed to send KLV data: %w", err)
433+
}
434+
435+
return nil
436+
}
437+
438+
// KLVChannelReady returns a channel that closes when the KLV data channel is ready
439+
func (co *PeerConnection) KLVChannelReady() <-chan struct{} {
440+
return co.klvChannelReady
441+
}
442+
359443
// CreatePartialOffer creates a partial offer.
360444
func (co *PeerConnection) CreatePartialOffer() (*webrtc.SessionDescription, error) {
361445
offer, err := co.wr.CreateOffer(nil)

internal/servers/webrtc/reader.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ class MediaMTXWebRTCReader {
428428
this.pc.onicecandidate = (evt) => this.#onLocalCandidate(evt);
429429
this.pc.onconnectionstatechange = () => this.#onConnectionState();
430430
this.pc.ontrack = (evt) => this.#onTrack(evt);
431+
this.pc.ondatachannel = (evt) => this.#onDataChannel(evt);
431432

432433
return this.pc.createOffer()
433434
.then((offer) => {
@@ -547,6 +548,32 @@ class MediaMTXWebRTCReader {
547548
this.conf.onTrack(evt);
548549
}
549550
}
551+
552+
#onDataChannel(evt) {
553+
const dataChannel = evt.channel;
554+
555+
if (dataChannel.label === 'klv') {
556+
dataChannel.onopen = () => {
557+
console.log('KLV metadata data channel opened');
558+
};
559+
560+
dataChannel.onmessage = (event) => {
561+
if (this.conf.onKLVData !== undefined) {
562+
// Parse KLV data from ArrayBuffer
563+
const klvData = new Uint8Array(event.data);
564+
this.conf.onKLVData(klvData);
565+
}
566+
};
567+
568+
dataChannel.onclose = () => {
569+
console.log('KLV metadata data channel closed');
570+
};
571+
572+
dataChannel.onerror = (error) => {
573+
console.error('KLV metadata data channel error:', error);
574+
};
575+
}
576+
}
550577
}
551578

552579
window.MediaMTXWebRTCReader = MediaMTXWebRTCReader;

0 commit comments

Comments
 (0)