Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import javax.inject.Named
import javax.inject.Singleton
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

/**
Expand Down Expand Up @@ -324,12 +325,17 @@ internal constructor(
stream: String?,
builder: LivekitRtc.AddTrackRequest.Builder = LivekitRtc.AddTrackRequest.newBuilder(),
): LivekitModels.TrackInfo {
if (pendingTrackResolvers[cid] != null) {
throw TrackException.DuplicateTrackException("Track with same ID $cid has already been published!")
synchronized(pendingTrackResolvers) {
if (pendingTrackResolvers[cid] != null) {
throw TrackException.DuplicateTrackException("Track with same ID $cid has already been published!")
}
}

// Suspend until signal client receives message confirming track publication.
return suspendCoroutine { cont ->
pendingTrackResolvers[cid] = cont
synchronized(pendingTrackResolvers) {
pendingTrackResolvers[cid] = cont
}
client.sendAddTrack(
cid = cid,
name = name,
Expand Down Expand Up @@ -380,6 +386,7 @@ internal constructor(
lastRoomOptions = null
participantSid = null
regionUrlProvider = null
abortPendingPublishTracks()
closeResources(reason)
connectionState = ConnectionState.DISCONNECTED
}
Expand Down Expand Up @@ -416,6 +423,15 @@ internal constructor(
client.close(reason = reason)
}

private fun abortPendingPublishTracks() {
synchronized(pendingTrackResolvers) {
pendingTrackResolvers.values.forEach {
it.resumeWithException(TrackException.PublishException("pending track aborted"))
}
pendingTrackResolvers.clear()
}
}

/**
* reconnect Signal and PeerConnections
*/
Expand Down Expand Up @@ -907,7 +923,9 @@ internal constructor(
}

LKLog.v { "local track published $cid" }
val cont = pendingTrackResolvers.remove(cid)
val cont = synchronized(pendingTrackResolvers) {
pendingTrackResolvers.remove(cid)
}
if (cont == null) {
LKLog.d { "missing track resolver for: $cid" }
return
Expand All @@ -929,6 +947,7 @@ internal constructor(

override fun onClose(reason: String, code: Int) {
LKLog.i { "received close event: $reason, code: $code" }
abortPendingPublishTracks()
reconnect()
}

Expand All @@ -946,6 +965,9 @@ internal constructor(

override fun onLeave(leave: LeaveRequest) {
LKLog.d { "leave request received: reason = ${leave.reason.name}" }

abortPendingPublishTracks()

if (leave.hasRegions()) {
regionUrlProvider?.let {
it.setServerReportedRegions(RegionSettings.fromProto(leave.regions))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,13 +481,19 @@ internal constructor(
val builder = AddTrackRequest.newBuilder().apply {
this.requestConfig()
}
val trackInfo = engine.addTrack(
cid = cid,
name = options.name ?: track.name,
kind = track.kind.toProto(),
stream = options.stream,
builder = builder,
)

val trackInfo = try {
engine.addTrack(
cid = cid,
name = options.name ?: track.name,
kind = track.kind.toProto(),
stream = options.stream,
builder = builder,
)
} catch (e: Exception) {
publishListener?.onPublishFailure(TrackException.PublishException("Failed to publish track", e))
return null
}

if (options is VideoTrackPublishOptions) {
// server might not support the codec the client has requested, in that case, fallback
Expand Down
Loading