Skip to content

Commit 0b0b60e

Browse files
committed
Support token refresh
1 parent da82faf commit 0b0b60e

File tree

2 files changed

+86
-87
lines changed

2 files changed

+86
-87
lines changed

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 54 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -269,23 +269,21 @@ export abstract class AbstractRemote {
269269
*/
270270
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
271271
const bson = await this.getBSON();
272-
return await this.socketStreamInternal(options, bson);
273-
}
274-
275-
/**
276-
* Connects to the sync/stream websocket endpoint without decoding BSON in JavaScript.
277-
*/
278-
async socketStreamRaw(options: SocketSyncStreamOptions): Promise<DataStream<Buffer<ArrayBuffer>>> {
279-
return this.socketStreamInternal(options);
272+
return await this.socketStreamRaw(options, (data) => bson.deserialize(data), bson);
280273
}
281274

282275
/**
283276
* Returns a data stream of sync line data.
284277
*
285-
* @param bson A BSON encoder and decoder. When set, the data stream will emit parsed instances of
286-
* {@link StreamingSyncLine}. Otherwise, unparsed buffers will be emitted instead.
278+
* @param map Maps received payload frames to the typed event value.
279+
* @param bson A BSON encoder and decoder. When set, the data stream will be requested with a BSON payload
280+
* (required for compatibility with older sync services).
287281
*/
288-
private async socketStreamInternal(options: SocketSyncStreamOptions, bson?: typeof BSON): Promise<DataStream> {
282+
async socketStreamRaw<T>(
283+
options: SocketSyncStreamOptions,
284+
map: (buffer: Buffer) => T,
285+
bson?: typeof BSON
286+
): Promise<DataStream> {
289287
const { path, fetchStrategy = FetchStrategy.Buffered } = options;
290288
const mimeType = bson == null ? 'application/json' : 'application/bson';
291289

@@ -415,7 +413,7 @@ export abstract class AbstractRemote {
415413
return;
416414
}
417415

418-
stream.enqueueData(bson != null ? bson.deserialize(data) : data);
416+
stream.enqueueData(map(data));
419417
},
420418
onComplete: () => {
421419
stream.close();
@@ -455,43 +453,18 @@ export abstract class AbstractRemote {
455453
}
456454

457455
/**
458-
* Connects to the sync/stream http endpoint
456+
* Connects to the sync/stream http endpoint, parsing lines as JSON.
459457
*/
460458
async postStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
461-
const jsonS = await this.postStreamRaw(options);
462-
463-
const stream = new DataStream({
464-
logger: this.logger
465-
});
466-
467-
const r = jsonS.getReader();
468-
469-
const l = stream.registerListener({
470-
lowWater: async () => {
471-
try {
472-
const { done, value } = await r.read();
473-
// Exit if we're done
474-
if (done) {
475-
stream.close();
476-
l?.();
477-
return;
478-
}
479-
stream.enqueueData(JSON.parse(value));
480-
} catch (ex) {
481-
stream.close();
482-
throw ex;
483-
}
484-
},
485-
closed: () => {
486-
r.cancel();
487-
l?.();
488-
}
459+
return await this.postStreamRaw(options, (line) => {
460+
return JSON.parse(line) as StreamingSyncLine;
489461
});
490-
491-
return stream;
492462
}
493463

494-
async postStreamRaw(options: SyncStreamOptions): Promise<ReadableStream<string>> {
464+
/**
465+
* Connects to the sync/stream http endpoint, mapping and emitting each received string line.
466+
*/
467+
async postStreamRaw<T>(options: SyncStreamOptions, mapLine: (line: string) => T): Promise<DataStream<T>> {
495468
const { data, path, headers, abortSignal } = options;
496469

497470
const request = await this.buildRequest(path);
@@ -566,40 +539,52 @@ export abstract class AbstractRemote {
566539
const decoder = new TextDecoder();
567540
let buffer = '';
568541

569-
const outputStream = new ReadableStream<string>({
570-
pull: async (controller) => {
571-
let didCompleteLine = false;
542+
const stream = new DataStream<T>({
543+
logger: this.logger
544+
});
572545

573-
while (!didCompleteLine) {
574-
const { done, value } = await reader.read();
575-
if (done) {
576-
const remaining = buffer.trim();
577-
if (remaining.length != 0) {
578-
controller.enqueue(remaining);
579-
}
546+
const l = stream.registerListener({
547+
lowWater: async () => {
548+
try {
549+
let didCompleteLine = false;
550+
while (!didCompleteLine) {
551+
const { done, value } = await reader.read();
552+
if (done) {
553+
const remaining = buffer.trim();
554+
if (remaining.length != 0) {
555+
stream.enqueueData(mapLine(remaining));
556+
}
580557

581-
controller.close();
582-
await closeReader();
583-
return;
584-
}
558+
stream.close();
559+
await closeReader();
560+
return;
561+
}
585562

586-
const data = decoder.decode(value, { stream: true });
587-
buffer += data;
563+
const data = decoder.decode(value, { stream: true });
564+
buffer += data;
588565

589-
const lines = buffer.split('\n');
590-
for (var i = 0; i < lines.length - 1; i++) {
591-
var l = lines[i].trim();
592-
if (l.length > 0) {
593-
controller.enqueue(l);
594-
didCompleteLine = true;
566+
const lines = buffer.split('\n');
567+
for (var i = 0; i < lines.length - 1; i++) {
568+
var l = lines[i].trim();
569+
if (l.length > 0) {
570+
stream.enqueueData(mapLine(l));
571+
didCompleteLine = true;
572+
}
595573
}
596-
}
597574

598-
buffer = lines[lines.length - 1];
575+
buffer = lines[lines.length - 1];
576+
}
577+
} catch (ex) {
578+
stream.close();
579+
throw ex;
599580
}
581+
},
582+
closed: () => {
583+
closeReader();
584+
l?.();
600585
}
601586
});
602587

603-
return outputStream;
588+
return stream;
604589
}
605590
}

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,7 @@ The next upload iteration will be delayed.`);
837837

838838
const abortController = new AbortController();
839839
signal.addEventListener('abort', () => abortController.abort());
840+
let controlInvocations: DataStream<[string, ArrayBuffer | string | undefined]> | null = null;
840841

841842
async function connect(instr: EstablishSyncStream) {
842843
const syncOptions: SyncStreamOptions = {
@@ -846,26 +847,29 @@ The next upload iteration will be delayed.`);
846847
};
847848

848849
if (resolvedOptions.connectionMethod == SyncStreamConnectionMethod.HTTP) {
849-
const lines = await remote.postStreamRaw(syncOptions);
850-
for await (const syncLine of lines as any) {
851-
await control('line_text', syncLine);
852-
}
850+
controlInvocations = await remote.postStreamRaw(syncOptions, (line) => ['line_text', line]);
853851
} else {
854-
const stream = await remote.socketStreamRaw({ ...syncOptions, fetchStrategy: resolvedOptions.fetchStrategy });
855-
856-
try {
857-
while (!stream.closed) {
858-
const line = await stream.read();
859-
if (line == null) {
860-
return;
861-
}
852+
controlInvocations = await remote.socketStreamRaw(
853+
{
854+
...syncOptions,
855+
fetchStrategy: resolvedOptions.fetchStrategy
856+
},
857+
// TODO: Can we avoid the copy here?
858+
(buffer) => ['line_binary', new Uint8Array(buffer).buffer]
859+
);
860+
}
862861

863-
const copy = new Uint8Array(line);
864-
await control('line_binary', copy.buffer);
862+
try {
863+
while (!controlInvocations.closed) {
864+
const line = await controlInvocations.read();
865+
if (line == null) {
866+
return;
865867
}
866-
} finally {
867-
await stream.close();
868+
869+
await control(line[0], line[1]);
868870
}
871+
} finally {
872+
await controlInvocations.close();
869873
}
870874
}
871875

@@ -926,7 +930,17 @@ The next upload iteration will be delayed.`);
926930
if (instruction.FetchCredentials.did_expire) {
927931
remote.invalidateCredentials();
928932
} else {
929-
// TODO: Async prefetch
933+
remote.invalidateCredentials();
934+
935+
// Restart iteration after the credentials have been refreshed.
936+
remote.fetchCredentials().then(
937+
(_) => {
938+
controlInvocations?.enqueueData(['refreshed_token', undefined]);
939+
},
940+
(err) => {
941+
syncImplementation.logger.warn('Could not prefetch credentials', err);
942+
}
943+
);
930944
}
931945
} else if ('CloseSyncStream' in instruction) {
932946
abortController.abort();
@@ -949,7 +963,7 @@ The next upload iteration will be delayed.`);
949963

950964
try {
951965
this.notifyCompletedUploads = () => {
952-
control('completed_upload');
966+
controlInvocations?.enqueueData(['completed_upload', undefined]);
953967
};
954968

955969
await control('start', JSON.stringify(resolvedOptions.params));

0 commit comments

Comments
 (0)