Skip to content

Commit 31e3883

Browse files
authored
Only reject connection promise if it was triggered by a call to connect() (#359)
* Only reject connection promise if it was triggered by a call to connect() * changeset * immediately return promise from connect function, clean up failed connection attempts * address comments * only reject promise in disconnect if it's a reconnection
1 parent 72f7abb commit 31e3883

File tree

3 files changed

+155
-150
lines changed

3 files changed

+155
-150
lines changed

.changeset/breezy-chefs-know.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'livekit-client': patch
3+
---
4+
5+
Only reject connection promise if it was triggered by a call to connect()

src/room/Room.ts

Lines changed: 143 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,12 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
9999
/** used for aborting pending connections to a LiveKit server */
100100
private abortController?: AbortController;
101101

102+
/** future holding client initiated connection attempt */
102103
private connectFuture?: Future<void>;
103104

105+
/** future holding sdk initiated reconnection attempt */
106+
private reconnectFuture?: Future<void>;
107+
104108
/**
105109
* Creates a new Room, the primary construct for a LiveKit session.
106110
* @param options
@@ -156,12 +160,17 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
156160
.on(EngineEvent.ActiveSpeakersUpdate, this.handleActiveSpeakersUpdate)
157161
.on(EngineEvent.DataPacketReceived, this.handleDataPacket)
158162
.on(EngineEvent.Resuming, () => {
163+
if (!this.reconnectFuture) {
164+
this.reconnectFuture = new Future();
165+
}
159166
if (this.setAndEmitConnectionState(ConnectionState.Reconnecting)) {
160167
this.emit(RoomEvent.Reconnecting);
161168
}
162169
})
163170
.on(EngineEvent.Resumed, () => {
164171
this.setAndEmitConnectionState(ConnectionState.Connected);
172+
this.reconnectFuture?.resolve();
173+
this.reconnectFuture = undefined;
165174
this.emit(RoomEvent.Reconnected);
166175
this.updateSubscriptions();
167176
})
@@ -193,157 +202,159 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
193202
return DeviceManager.getInstance().getDevices(kind, requestPermissions);
194203
}
195204

196-
connect = async (url: string, token: string, opts?: RoomConnectOptions): Promise<void> => {
205+
connect = (url: string, token: string, opts?: RoomConnectOptions): Promise<void> => {
197206
if (this.state === ConnectionState.Connected) {
198207
// when the state is reconnecting or connected, this function returns immediately
199208
log.warn(`already connected to room ${this.name}`);
200-
return;
209+
return Promise.resolve();
201210
}
202211

203212
if (this.connectFuture) {
204213
return this.connectFuture.promise;
214+
} else if (this.reconnectFuture) {
215+
this.connectFuture = this.reconnectFuture;
216+
return this.connectFuture.promise;
205217
}
206-
this.setAndEmitConnectionState(ConnectionState.Connecting);
218+
const connectPromise = new Promise<void>(async (resolve, reject) => {
219+
this.setAndEmitConnectionState(ConnectionState.Connecting);
220+
if (!this.abortController || this.abortController.signal.aborted) {
221+
this.abortController = new AbortController();
222+
}
207223

208-
if (!this.abortController || this.abortController.signal.aborted) {
209-
this.abortController = new AbortController();
210-
}
224+
// recreate engine if previously disconnected
225+
this.createEngine();
211226

212-
// recreate engine if previously disconnected
213-
this.createEngine();
227+
this.acquireAudioContext();
214228

215-
this.acquireAudioContext();
229+
if (opts?.rtcConfig) {
230+
this.engine.rtcConfig = opts.rtcConfig;
231+
}
216232

217-
if (opts?.rtcConfig) {
218-
this.engine.rtcConfig = opts.rtcConfig;
219-
}
233+
this.connOptions = opts;
234+
235+
try {
236+
const joinResponse = await this.engine.join(
237+
url,
238+
token,
239+
{
240+
autoSubscribe: opts?.autoSubscribe,
241+
publishOnly: opts?.publishOnly,
242+
adaptiveStream:
243+
typeof this.options?.adaptiveStream === 'object'
244+
? true
245+
: this.options?.adaptiveStream,
246+
},
247+
this.abortController.signal,
248+
);
249+
log.debug(
250+
`connected to Livekit Server version: ${joinResponse.serverVersion}, region: ${joinResponse.serverRegion}`,
251+
);
220252

221-
this.connOptions = opts;
253+
if (!joinResponse.serverVersion) {
254+
throw new UnsupportedServer('unknown server version');
255+
}
222256

223-
try {
224-
const joinResponse = await this.engine.join(
225-
url,
226-
token,
227-
{
228-
autoSubscribe: opts?.autoSubscribe,
229-
publishOnly: opts?.publishOnly,
230-
adaptiveStream:
231-
typeof this.options?.adaptiveStream === 'object' ? true : this.options?.adaptiveStream,
232-
},
233-
this.abortController.signal,
234-
);
235-
log.debug(
236-
`connected to Livekit Server version: ${joinResponse.serverVersion}, region: ${joinResponse.serverRegion}`,
237-
);
257+
if (joinResponse.serverVersion === '0.15.1' && this.options.dynacast) {
258+
log.debug('disabling dynacast due to server version');
259+
// dynacast has a bug in 0.15.1, so we cannot use it then
260+
this.options.dynacast = false;
261+
}
238262

239-
if (!joinResponse.serverVersion) {
240-
throw new UnsupportedServer('unknown server version');
241-
}
263+
const pi = joinResponse.participant!;
264+
265+
this.localParticipant.sid = pi.sid;
266+
this.localParticipant.identity = pi.identity;
267+
268+
this.localParticipant.updateInfo(pi);
269+
// forward metadata changed for the local participant
270+
this.localParticipant
271+
.on(ParticipantEvent.ParticipantMetadataChanged, this.onLocalParticipantMetadataChanged)
272+
.on(ParticipantEvent.TrackMuted, this.onLocalTrackMuted)
273+
.on(ParticipantEvent.TrackUnmuted, this.onLocalTrackUnmuted)
274+
.on(ParticipantEvent.LocalTrackPublished, this.onLocalTrackPublished)
275+
.on(ParticipantEvent.LocalTrackUnpublished, this.onLocalTrackUnpublished)
276+
.on(ParticipantEvent.ConnectionQualityChanged, this.onLocalConnectionQualityChanged)
277+
.on(ParticipantEvent.MediaDevicesError, this.onMediaDevicesError)
278+
.on(
279+
ParticipantEvent.ParticipantPermissionsChanged,
280+
this.onLocalParticipantPermissionsChanged,
281+
);
242282

243-
if (joinResponse.serverVersion === '0.15.1' && this.options.dynacast) {
244-
log.debug('disabling dynacast due to server version');
245-
// dynacast has a bug in 0.15.1, so we cannot use it then
246-
this.options.dynacast = false;
247-
}
283+
// populate remote participants, these should not trigger new events
284+
joinResponse.otherParticipants.forEach((info) => {
285+
if (
286+
info.sid !== this.localParticipant.sid &&
287+
info.identity !== this.localParticipant.identity
288+
) {
289+
this.getOrCreateParticipant(info.sid, info);
290+
} else {
291+
log.warn('received info to create local participant as remote participant', {
292+
info,
293+
localParticipant: this.localParticipant,
294+
});
295+
}
296+
});
248297

249-
const pi = joinResponse.participant!;
250-
251-
this.localParticipant.sid = pi.sid;
252-
this.localParticipant.identity = pi.identity;
253-
254-
this.localParticipant.updateInfo(pi);
255-
// forward metadata changed for the local participant
256-
this.localParticipant
257-
.on(ParticipantEvent.ParticipantMetadataChanged, this.onLocalParticipantMetadataChanged)
258-
.on(ParticipantEvent.TrackMuted, this.onLocalTrackMuted)
259-
.on(ParticipantEvent.TrackUnmuted, this.onLocalTrackUnmuted)
260-
.on(ParticipantEvent.LocalTrackPublished, this.onLocalTrackPublished)
261-
.on(ParticipantEvent.LocalTrackUnpublished, this.onLocalTrackUnpublished)
262-
.on(ParticipantEvent.ConnectionQualityChanged, this.onLocalConnectionQualityChanged)
263-
.on(ParticipantEvent.MediaDevicesError, this.onMediaDevicesError)
264-
.on(
265-
ParticipantEvent.ParticipantPermissionsChanged,
266-
this.onLocalParticipantPermissionsChanged,
267-
);
298+
this.name = joinResponse.room!.name;
299+
this.sid = joinResponse.room!.sid;
300+
this.metadata = joinResponse.room!.metadata;
301+
this.emit(RoomEvent.SignalConnected);
302+
} catch (err) {
303+
this.recreateEngine();
304+
this.handleDisconnect(this.options.stopLocalTrackOnUnpublish);
305+
reject(new ConnectionError('could not establish signal connection'));
306+
}
268307

269-
// populate remote participants, these should not trigger new events
270-
joinResponse.otherParticipants.forEach((info) => {
271-
if (
272-
info.sid !== this.localParticipant.sid &&
273-
info.identity !== this.localParticipant.identity
274-
) {
275-
this.getOrCreateParticipant(info.sid, info);
276-
} else {
277-
log.warn('received info to create local participant as remote participant', {
278-
info,
279-
localParticipant: this.localParticipant,
280-
});
308+
// don't return until ICE connected
309+
const connectTimeout = setTimeout(() => {
310+
// timeout
311+
this.recreateEngine();
312+
this.handleDisconnect(this.options.stopLocalTrackOnUnpublish);
313+
reject(new ConnectionError('could not connect PeerConnection after timeout'));
314+
}, maxICEConnectTimeout);
315+
const abortHandler = () => {
316+
log.warn('closing engine');
317+
clearTimeout(connectTimeout);
318+
this.recreateEngine();
319+
this.handleDisconnect(this.options.stopLocalTrackOnUnpublish);
320+
reject(new ConnectionError('room connection has been cancelled'));
321+
};
322+
if (this.abortController?.signal.aborted) {
323+
abortHandler();
324+
}
325+
this.abortController?.signal.addEventListener('abort', abortHandler);
326+
327+
this.engine.once(EngineEvent.Connected, () => {
328+
clearTimeout(connectTimeout);
329+
this.abortController?.signal.removeEventListener('abort', abortHandler);
330+
// also hook unload event
331+
if (isWeb()) {
332+
window.addEventListener('beforeunload', this.onBeforeUnload);
333+
navigator.mediaDevices?.addEventListener('devicechange', this.handleDeviceChange);
281334
}
335+
this.setAndEmitConnectionState(ConnectionState.Connected);
336+
resolve();
282337
});
283-
284-
this.name = joinResponse.room!.name;
285-
this.sid = joinResponse.room!.sid;
286-
this.metadata = joinResponse.room!.metadata;
287-
this.emit(RoomEvent.SignalConnected);
288-
} catch (err) {
289-
this.recreateEngine();
290-
this.setAndEmitConnectionState(
291-
ConnectionState.Disconnected,
292-
new ConnectionError('could not establish signal connection'),
293-
);
294-
throw err;
295-
}
296-
297-
// don't return until ICE connected
298-
const connectTimeout = setTimeout(() => {
299-
// timeout
300-
this.recreateEngine();
301-
this.setAndEmitConnectionState(
302-
ConnectionState.Disconnected,
303-
new ConnectionError('could not connect PeerConnection after timeout'),
304-
);
305-
}, maxICEConnectTimeout);
306-
const abortHandler = () => {
307-
log.warn('closing engine');
308-
clearTimeout(connectTimeout);
309-
this.recreateEngine();
310-
this.setAndEmitConnectionState(
311-
ConnectionState.Disconnected,
312-
new ConnectionError('room connection has been cancelled'),
313-
);
314-
};
315-
if (this.abortController?.signal.aborted) {
316-
abortHandler();
317-
}
318-
this.abortController?.signal.addEventListener('abort', abortHandler);
319-
320-
this.engine.once(EngineEvent.Connected, () => {
321-
clearTimeout(connectTimeout);
322-
this.abortController?.signal.removeEventListener('abort', abortHandler);
323-
// also hook unload event
324-
if (isWeb()) {
325-
window.addEventListener('beforeunload', this.onBeforeUnload);
326-
navigator.mediaDevices?.addEventListener('devicechange', this.handleDeviceChange);
327-
}
328-
this.setAndEmitConnectionState(ConnectionState.Connected);
329338
});
339+
this.connectFuture = new Future(connectPromise);
330340

331-
if (this.connectFuture) {
332-
/** @ts-ignore */
333-
return this.connectFuture.promise;
334-
}
341+
this.connectFuture.promise.finally(() => (this.connectFuture = undefined));
342+
343+
return this.connectFuture.promise;
335344
};
336345

337346
/**
338347
* disconnects the room, emits [[RoomEvent.Disconnected]]
339348
*/
340349
disconnect = async (stopTracks = true) => {
341350
log.info('disconnect from room', { identity: this.localParticipant.identity });
342-
if (this.state === ConnectionState.Connecting) {
351+
if (this.state === ConnectionState.Connecting || this.state === ConnectionState.Reconnecting) {
343352
// try aborting pending connection attempt
344353
log.warn('abort connection attempt');
345354
this.abortController?.abort();
346-
return;
355+
// in case the abort controller didn't manage to cancel the connection attempt, reject the connect promise explicitly
356+
this.connectFuture?.reject(new ConnectionError('Client initiated disconnect'));
357+
this.connectFuture = undefined;
347358
}
348359
// send leave
349360
if (this.engine?.client.isConnected) {
@@ -353,7 +364,6 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
353364
if (this.engine) {
354365
this.engine.close();
355366
}
356-
357367
this.handleDisconnect(stopTracks, DisconnectReason.CLIENT_INITIATED);
358368
/* @ts-ignore */
359369
this.engine = undefined;
@@ -571,6 +581,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
571581
}
572582

573583
private handleRestarting = () => {
584+
if (!this.reconnectFuture) {
585+
this.reconnectFuture = new Future();
586+
}
574587
// also unwind existing participants & existing subscriptions
575588
for (const p of this.participants.values()) {
576589
this.handleParticipantDisconnected(p.sid, p);
@@ -587,6 +600,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
587600
});
588601
this.setAndEmitConnectionState(ConnectionState.Connected);
589602
this.emit(RoomEvent.Reconnected);
603+
this.reconnectFuture?.resolve();
604+
this.reconnectFuture = undefined;
590605

591606
// rehydrate participants
592607
if (joinResponse.participant) {
@@ -630,6 +645,13 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
630645
if (this.state === ConnectionState.Disconnected) {
631646
return;
632647
}
648+
// reject potentially ongoing reconnection attempt
649+
if (this.connectFuture === this.reconnectFuture) {
650+
this.connectFuture?.reject(undefined);
651+
this.connectFuture = undefined;
652+
this.reconnectFuture = undefined;
653+
}
654+
633655
this.participants.forEach((p) => {
634656
p.tracks.forEach((pub) => {
635657
p.unpublishTrack(pub.trackSid);
@@ -1030,35 +1052,11 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
10301052
}
10311053
}
10321054

1033-
private setAndEmitConnectionState(state: ConnectionState, error?: Error): boolean {
1055+
private setAndEmitConnectionState(state: ConnectionState): boolean {
10341056
if (state === this.state) {
10351057
// unchanged
10361058
return false;
10371059
}
1038-
switch (state) {
1039-
case ConnectionState.Connecting:
1040-
case ConnectionState.Reconnecting:
1041-
if (!this.connectFuture) {
1042-
// reuse existing connect future if possible
1043-
this.connectFuture = new Future<void>();
1044-
}
1045-
break;
1046-
case ConnectionState.Connected:
1047-
if (this.connectFuture) {
1048-
this.connectFuture.resolve();
1049-
this.connectFuture = undefined;
1050-
}
1051-
break;
1052-
case ConnectionState.Disconnected:
1053-
if (this.connectFuture) {
1054-
error ??= new Error('disconnected from Room');
1055-
this.connectFuture.reject(error);
1056-
this.connectFuture = undefined;
1057-
}
1058-
break;
1059-
default:
1060-
// nothing
1061-
}
10621060
this.state = state;
10631061
this.emit(RoomEvent.ConnectionStateChanged, this.state);
10641062
return true;

0 commit comments

Comments
 (0)