Skip to content

Commit d89d7b9

Browse files
committed
Implement data streams
1 parent ac6b7a6 commit d89d7b9

File tree

26 files changed

+1685
-49
lines changed

26 files changed

+1685
-49
lines changed

.changeset/wise-cherries-speak.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": minor
3+
---
4+
5+
Implement data streams feature

.idea/dictionaries/davidliu.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2025 LiveKit, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.livekit.android.coroutines
18+
19+
import kotlinx.coroutines.cancel
20+
import kotlinx.coroutines.coroutineScope
21+
import kotlinx.coroutines.currentCoroutineContext
22+
import kotlinx.coroutines.flow.Flow
23+
import kotlinx.coroutines.flow.collect
24+
import kotlinx.coroutines.flow.flow
25+
import kotlinx.coroutines.flow.takeWhile
26+
import kotlinx.coroutines.launch
27+
import kotlin.coroutines.cancellation.CancellationException
28+
29+
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit?>): Flow<T> = flow {
30+
try {
31+
coroutineScope {
32+
launch {
33+
signal.takeWhile { it == null }.collect()
34+
this@coroutineScope.cancel()
35+
}
36+
37+
collect {
38+
emit(it)
39+
}
40+
}
41+
} catch (e: CancellationException) {
42+
// ignore
43+
}
44+
}
45+
46+
fun <T> Flow<T>.cancelOnSignal(signal: Flow<Unit?>): Flow<T> = flow {
47+
coroutineScope {
48+
launch {
49+
signal.takeWhile { it == null }.collect()
50+
currentCoroutineContext().cancel()
51+
}
52+
53+
collect {
54+
emit(it)
55+
}
56+
currentCoroutineContext().cancel()
57+
}
58+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2025 LiveKit, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.livekit.android.dagger
18+
19+
import dagger.Binds
20+
import dagger.Module
21+
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManager
22+
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManagerImpl
23+
import io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManager
24+
import io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManagerImpl
25+
26+
/**
27+
* @suppress
28+
*/
29+
@Module
30+
abstract class InternalBindsModule {
31+
@Binds
32+
abstract fun incomingDataStreamManager(manager: IncomingDataStreamManagerImpl): IncomingDataStreamManager
33+
34+
@Binds
35+
abstract fun outgoingDataStreamManager(manager: OutgoingDataStreamManagerImpl): OutgoingDataStreamManager
36+
}

livekit-android-sdk/src/main/java/io/livekit/android/dagger/LiveKitComponent.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 LiveKit, Inc.
2+
* Copyright 2023-2025 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@ import javax.inject.Singleton
3838
OverridesModule::class,
3939
AudioHandlerModule::class,
4040
MemoryModule::class,
41+
InternalBindsModule::class,
4142
],
4243
)
4344
interface LiveKitComponent {

livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import io.livekit.android.util.LKLog
3838
import io.livekit.android.util.flowDelegate
3939
import io.livekit.android.util.nullSafe
4040
import io.livekit.android.util.withCheckLock
41+
import io.livekit.android.webrtc.DataChannelManager
4142
import io.livekit.android.webrtc.RTCStatsGetter
4243
import io.livekit.android.webrtc.copy
4344
import io.livekit.android.webrtc.isConnected
@@ -163,6 +164,10 @@ internal constructor(
163164
private var reliableDataChannelSub: DataChannel? = null
164165
private var lossyDataChannel: DataChannel? = null
165166
private var lossyDataChannelSub: DataChannel? = null
167+
private var reliableDataChannelManager: DataChannelManager? = null
168+
private var reliableDataChannelSubManager: DataChannelManager? = null
169+
private var lossyDataChannelManager: DataChannelManager? = null
170+
private var lossyDataChannelSubManager: DataChannelManager? = null
166171

167172
private var isSubscriberPrimary = false
168173
private var isClosed = true
@@ -402,19 +407,17 @@ internal constructor(
402407
subscriber?.closeBlocking()
403408
subscriber = null
404409

405-
fun DataChannel?.completeDispose() {
406-
this?.unregisterObserver()
407-
this?.close()
408-
this?.dispose()
409-
}
410-
411-
reliableDataChannel?.completeDispose()
410+
reliableDataChannelManager?.dispose()
411+
reliableDataChannelManager = null
412412
reliableDataChannel = null
413-
reliableDataChannelSub?.completeDispose()
413+
reliableDataChannelSubManager?.dispose()
414+
reliableDataChannelSubManager = null
414415
reliableDataChannelSub = null
415-
lossyDataChannel?.completeDispose()
416+
lossyDataChannelManager?.dispose()
417+
lossyDataChannelManager = null
416418
lossyDataChannel = null
417-
lossyDataChannelSub?.completeDispose()
419+
lossyDataChannelSubManager?.dispose()
420+
lossyDataChannelSubManager = null
418421
lossyDataChannelSub = null
419422
isSubscriberPrimary = false
420423
}
@@ -630,6 +633,22 @@ internal constructor(
630633
channel.send(buf)
631634
}
632635

636+
internal suspend fun waitForBufferStatusLow(kind: LivekitModels.DataPacket.Kind) {
637+
ensurePublisherConnected(kind)
638+
val manager = when (kind) {
639+
LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager
640+
LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager
641+
LivekitModels.DataPacket.Kind.UNRECOGNIZED -> {
642+
throw IllegalArgumentException()
643+
}
644+
}
645+
646+
if (manager == null) {
647+
throw IllegalStateException("Not connected!")
648+
}
649+
manager.waitForBufferedAmountLow(DATA_CHANNEL_LOW_THRESHOLD.toLong())
650+
}
651+
633652
private suspend fun ensurePublisherConnected(kind: LivekitModels.DataPacket.Kind) {
634653
if (!isSubscriberPrimary) {
635654
return
@@ -798,6 +817,7 @@ internal constructor(
798817
fun onTranscriptionReceived(transcription: LivekitModels.Transcription)
799818
fun onLocalTrackSubscribed(trackSubscribed: LivekitRtc.TrackSubscribed)
800819
fun onRpcPacketReceived(dp: LivekitModels.DataPacket)
820+
fun onDataStreamPacket(dp: LivekitModels.DataPacket)
801821
}
802822

803823
companion object {
@@ -813,11 +833,13 @@ internal constructor(
813833
*/
814834
@VisibleForTesting
815835
const val LOSSY_DATA_CHANNEL_LABEL = "_lossy"
816-
internal const val MAX_DATA_PACKET_SIZE = 15360 // 15 KB
836+
internal const val MAX_DATA_PACKET_SIZE = 15 * 1024 // 15 KB
817837
private const val MAX_RECONNECT_RETRIES = 10
818838
private const val MAX_RECONNECT_TIMEOUT = 60 * 1000
819839
private const val MAX_ICE_CONNECT_TIMEOUT_MS = 20000
820840

841+
private const val DATA_CHANNEL_LOW_THRESHOLD = 64 * 1024 // 64 KB
842+
821843
internal val CONN_CONSTRAINTS = MediaConstraints().apply {
822844
with(optional) {
823845
add(MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true"))
@@ -1075,16 +1097,11 @@ internal constructor(
10751097
LKLog.v { "invalid value for data packet" }
10761098
}
10771099

1078-
LivekitModels.DataPacket.ValueCase.STREAM_HEADER -> {
1079-
// TODO
1080-
}
1081-
1082-
LivekitModels.DataPacket.ValueCase.STREAM_CHUNK -> {
1083-
// TODO
1084-
}
1085-
1086-
LivekitModels.DataPacket.ValueCase.STREAM_TRAILER -> {
1087-
// TODO
1100+
LivekitModels.DataPacket.ValueCase.STREAM_HEADER,
1101+
LivekitModels.DataPacket.ValueCase.STREAM_CHUNK,
1102+
LivekitModels.DataPacket.ValueCase.STREAM_TRAILER,
1103+
-> {
1104+
listener?.onDataStreamPacket(dp)
10881105
}
10891106
}
10901107
}

livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import io.livekit.android.e2ee.E2EEOptions
4242
import io.livekit.android.events.*
4343
import io.livekit.android.memory.CloseableManager
4444
import io.livekit.android.renderer.TextureViewRenderer
45+
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManager
4546
import io.livekit.android.room.metrics.collectMetrics
4647
import io.livekit.android.room.network.NetworkCallbackManagerFactory
4748
import io.livekit.android.room.participant.*
@@ -106,7 +107,8 @@ constructor(
106107
private val regionUrlProviderFactory: RegionUrlProvider.Factory,
107108
private val connectionWarmer: ConnectionWarmer,
108109
private val audioRecordPrewarmer: AudioRecordPrewarmer,
109-
) : RTCEngine.Listener, ParticipantListener {
110+
private val incomingDataStreamManager: IncomingDataStreamManager,
111+
) : RTCEngine.Listener, ParticipantListener, IncomingDataStreamManager by incomingDataStreamManager {
110112

111113
private lateinit var coroutineScope: CoroutineScope
112114
private val eventBus = BroadcastEventBus<RoomEvent>()
@@ -907,6 +909,7 @@ constructor(
907909
name = null
908910
isRecording = false
909911
sidToIdentity.clear()
912+
incomingDataStreamManager.clearOpenStreams()
910913
}
911914

912915
private fun sendSyncState() {
@@ -1190,6 +1193,28 @@ constructor(
11901193
participant?.onDataReceived(data, topic)
11911194
}
11921195

1196+
/**
1197+
* @suppress
1198+
*/
1199+
override fun onDataStreamPacket(dp: LivekitModels.DataPacket) {
1200+
when (dp.valueCase) {
1201+
LivekitModels.DataPacket.ValueCase.STREAM_HEADER -> {
1202+
incomingDataStreamManager.handleStreamHeader(dp.streamHeader, Participant.Identity(dp.participantIdentity))
1203+
}
1204+
1205+
LivekitModels.DataPacket.ValueCase.STREAM_CHUNK -> {
1206+
incomingDataStreamManager.handleDataChunk(dp.streamChunk)
1207+
}
1208+
1209+
LivekitModels.DataPacket.ValueCase.STREAM_TRAILER -> {
1210+
incomingDataStreamManager.handleStreamTrailer(dp.streamTrailer)
1211+
}
1212+
1213+
// Ignore other cases.
1214+
else -> {}
1215+
}
1216+
}
1217+
11931218
/**
11941219
* @suppress
11951220
*/
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2025 LiveKit, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.livekit.android.room.datastream
18+
19+
sealed class StreamException(message: String? = null) : Exception(message) {
20+
class AlreadyOpenedException : StreamException()
21+
class AbnormalEndException(message: String?) : StreamException(message)
22+
class DecodeFailedException : StreamException()
23+
class LengthExceededException : StreamException()
24+
class IncompleteException : StreamException()
25+
class TerminatedException : StreamException()
26+
class UnknownStreamException : StreamException()
27+
class NotDirectoryException : StreamException()
28+
class FileInfoUnavailableException : StreamException()
29+
}

0 commit comments

Comments
 (0)