Skip to content

Commit 10a4fd8

Browse files
authored
MatrixRTCSession: handle rate limit errors (#4494)
* MatrixRTCSession: handle rate limit errors * Lint * Handle ratelimiting for non-legacy state setting Each request must be retried, as the non-legacy flow involves a sequence of requests that must resolve in order. * Fix broken test * Check for MSC3757 instead of the unmerged MSC3779 * Move helper out of beforeEach * Test ratelimit errors
1 parent 98f7637 commit 10a4fd8

File tree

2 files changed

+110
-35
lines changed

2 files changed

+110
-35
lines changed

spec/unit/matrixrtc/MatrixRTCSession.spec.ts

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -472,14 +472,54 @@ describe("MatrixRTCSession", () => {
472472
const activeFocus = { type: "livekit", focus_selection: "oldest_membership" };
473473

474474
async function testJoin(useOwnedStateEvents: boolean): Promise<void> {
475-
const realSetTimeout = setTimeout;
476475
if (useOwnedStateEvents) {
477-
mockRoom.getVersion = jest.fn().mockReturnValue("org.matrix.msc3779.default");
476+
mockRoom.getVersion = jest.fn().mockReturnValue("org.matrix.msc3757.default");
478477
}
479478

480479
jest.useFakeTimers();
480+
481+
// preparing the delayed disconnect should handle ratelimiting
482+
const sendDelayedStateAttempt = new Promise<void>((resolve) => {
483+
const error = new MatrixError({ errcode: "M_LIMIT_EXCEEDED" });
484+
sendDelayedStateMock.mockImplementationOnce(() => {
485+
resolve();
486+
return Promise.reject(error);
487+
});
488+
});
489+
490+
// setting the membership state should handle ratelimiting (also with a retry-after value)
491+
const sendStateEventAttempt = new Promise<void>((resolve) => {
492+
const error = new MatrixError(
493+
{ errcode: "M_LIMIT_EXCEEDED" },
494+
429,
495+
undefined,
496+
undefined,
497+
new Headers({ "Retry-After": "1" }),
498+
);
499+
sendStateEventMock.mockImplementationOnce(() => {
500+
resolve();
501+
return Promise.reject(error);
502+
});
503+
});
504+
505+
// needed to advance the mock timers properly
506+
const scheduledDelayDisconnection = new Promise<void>((resolve) => {
507+
const originalFn: () => void = (sess as any).scheduleDelayDisconnection;
508+
(sess as any).scheduleDelayDisconnection = jest.fn(() => {
509+
originalFn.call(sess);
510+
resolve();
511+
});
512+
});
513+
481514
sess!.joinRoomSession([activeFocusConfig], activeFocus, { useLegacyMemberEvents: false });
482-
await Promise.race([sentStateEvent, new Promise((resolve) => realSetTimeout(resolve, 500))]);
515+
516+
await sendDelayedStateAttempt;
517+
jest.advanceTimersByTime(5000);
518+
519+
await sendStateEventAttempt.then(); // needed to resolve after resendIfRateLimited catches
520+
jest.advanceTimersByTime(1000);
521+
522+
await sentStateEvent;
483523
expect(client.sendStateEvent).toHaveBeenCalledWith(
484524
mockRoom!.roomId,
485525
EventType.GroupCallMemberPrefix,
@@ -493,9 +533,10 @@ describe("MatrixRTCSession", () => {
493533
} satisfies SessionMembershipData,
494534
`${!useOwnedStateEvents ? "_" : ""}@alice:example.org_AAAAAAA`,
495535
);
496-
await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]);
497-
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
536+
await sentDelayedState;
498537

538+
// should have prepared the heartbeat to keep delaying the leave event while still connected
539+
await scheduledDelayDisconnection;
499540
// should have tried updating the delayed leave to test that it wasn't replaced by own state
500541
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(1);
501542
// should update delayed disconnect

src/matrixrtc/MatrixRTCSession.ts

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import { randomString, secureRandomBase64Url } from "../randomstring.ts";
3434
import { EncryptionKeysEventContent } from "./types.ts";
3535
import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts";
3636
import { KnownMembership } from "../@types/membership.ts";
37-
import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
37+
import { HTTPError, MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
3838
import { MatrixEvent } from "../models/event.ts";
3939
import { isLivekitFocusActive } from "./LivekitFocus.ts";
4040
import { ExperimentalGroupCallRoomMemberState } from "../webrtc/groupCall.ts";
@@ -1031,39 +1031,39 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
10311031
const prepareDelayedDisconnection = async (): Promise<void> => {
10321032
try {
10331033
// TODO: If delayed event times out, re-join!
1034-
const res = await this.client._unstable_sendDelayedStateEvent(
1035-
this.room.roomId,
1036-
{
1037-
delay: 8000,
1038-
},
1039-
EventType.GroupCallMemberPrefix,
1040-
{}, // leave event
1041-
stateKey,
1034+
const res = await resendIfRateLimited(() =>
1035+
this.client._unstable_sendDelayedStateEvent(
1036+
this.room.roomId,
1037+
{
1038+
delay: 8000,
1039+
},
1040+
EventType.GroupCallMemberPrefix,
1041+
{}, // leave event
1042+
stateKey,
1043+
),
10421044
);
10431045
this.disconnectDelayId = res.delay_id;
10441046
} catch (e) {
1045-
// TODO: Retry if rate-limited
10461047
logger.error("Failed to prepare delayed disconnection event:", e);
10471048
}
10481049
};
10491050
await prepareDelayedDisconnection();
10501051
// Send join event _after_ preparing the delayed disconnection event
1051-
await this.client.sendStateEvent(
1052-
this.room.roomId,
1053-
EventType.GroupCallMemberPrefix,
1054-
newContent,
1055-
stateKey,
1052+
await resendIfRateLimited(() =>
1053+
this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey),
10561054
);
10571055
// If sending state cancels your own delayed state, prepare another delayed state
10581056
// TODO: Remove this once MSC4140 is stable & doesn't cancel own delayed state
10591057
if (this.disconnectDelayId !== undefined) {
10601058
try {
1061-
await this.client._unstable_updateDelayedEvent(
1062-
this.disconnectDelayId,
1063-
UpdateDelayedEventAction.Restart,
1059+
const knownDisconnectDelayId = this.disconnectDelayId;
1060+
await resendIfRateLimited(() =>
1061+
this.client._unstable_updateDelayedEvent(
1062+
knownDisconnectDelayId,
1063+
UpdateDelayedEventAction.Restart,
1064+
),
10641065
);
10651066
} catch (e) {
1066-
// TODO: Make embedded client include errcode, and retry only if not M_NOT_FOUND (or rate-limited)
10671067
logger.warn("Failed to update delayed disconnection event, prepare it again:", e);
10681068
this.disconnectDelayId = undefined;
10691069
await prepareDelayedDisconnection();
@@ -1076,23 +1076,27 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
10761076
let sentDelayedDisconnect = false;
10771077
if (this.disconnectDelayId !== undefined) {
10781078
try {
1079-
await this.client._unstable_updateDelayedEvent(
1080-
this.disconnectDelayId,
1081-
UpdateDelayedEventAction.Send,
1079+
const knownDisconnectDelayId = this.disconnectDelayId;
1080+
await resendIfRateLimited(() =>
1081+
this.client._unstable_updateDelayedEvent(
1082+
knownDisconnectDelayId,
1083+
UpdateDelayedEventAction.Send,
1084+
),
10821085
);
10831086
sentDelayedDisconnect = true;
10841087
} catch (e) {
1085-
// TODO: Retry if rate-limited
10861088
logger.error("Failed to send our delayed disconnection event:", e);
10871089
}
10881090
this.disconnectDelayId = undefined;
10891091
}
10901092
if (!sentDelayedDisconnect) {
1091-
await this.client.sendStateEvent(
1092-
this.room.roomId,
1093-
EventType.GroupCallMemberPrefix,
1094-
{},
1095-
this.makeMembershipStateKey(localUserId, localDeviceId),
1093+
await resendIfRateLimited(() =>
1094+
this.client.sendStateEvent(
1095+
this.room.roomId,
1096+
EventType.GroupCallMemberPrefix,
1097+
{},
1098+
this.makeMembershipStateKey(localUserId, localDeviceId),
1099+
),
10961100
);
10971101
}
10981102
}
@@ -1111,10 +1115,12 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
11111115

11121116
private readonly delayDisconnection = async (): Promise<void> => {
11131117
try {
1114-
await this.client._unstable_updateDelayedEvent(this.disconnectDelayId!, UpdateDelayedEventAction.Restart);
1118+
const knownDisconnectDelayId = this.disconnectDelayId!;
1119+
await resendIfRateLimited(() =>
1120+
this.client._unstable_updateDelayedEvent(knownDisconnectDelayId, UpdateDelayedEventAction.Restart),
1121+
);
11151122
this.scheduleDelayDisconnection();
11161123
} catch (e) {
1117-
// TODO: Retry if rate-limited
11181124
logger.error("Failed to delay our disconnection event:", e);
11191125
}
11201126
};
@@ -1162,3 +1168,31 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
11621168
this.sendEncryptionKeysEvent(newKeyIndex);
11631169
};
11641170
}
1171+
1172+
async function resendIfRateLimited<T>(func: () => Promise<T>, numRetriesAllowed: number = 1): Promise<T> {
1173+
// eslint-disable-next-line no-constant-condition
1174+
while (true) {
1175+
try {
1176+
return await func();
1177+
} catch (e) {
1178+
if (numRetriesAllowed > 0 && e instanceof HTTPError && e.isRateLimitError()) {
1179+
numRetriesAllowed--;
1180+
let resendDelay: number;
1181+
const defaultMs = 5000;
1182+
try {
1183+
resendDelay = e.getRetryAfterMs() ?? defaultMs;
1184+
logger.info(`Rate limited by server, retrying in ${resendDelay}ms`);
1185+
} catch (e) {
1186+
logger.warn(
1187+
`Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`,
1188+
e,
1189+
);
1190+
resendDelay = defaultMs;
1191+
}
1192+
await sleep(resendDelay);
1193+
} else {
1194+
throw e;
1195+
}
1196+
}
1197+
}
1198+
}

0 commit comments

Comments
 (0)