Skip to content

Commit e06d10c

Browse files
authored
fix(worker): Avoid rxjs error while flushing logs on worker shutdown (#1238)
1 parent 96be791 commit e06d10c

File tree

1 file changed

+32
-27
lines changed

1 file changed

+32
-27
lines changed

packages/worker/src/runtime.ts

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ import * as v8 from 'node:v8';
33
import * as fs from 'node:fs';
44
import * as os from 'node:os';
55
import { Heap } from 'heap-js';
6-
import { BehaviorSubject, lastValueFrom, of } from 'rxjs';
7-
import { concatMap, delay, map, repeat, takeWhile } from 'rxjs/operators';
6+
import { Subject, firstValueFrom } from 'rxjs';
87
import * as native from '@temporalio/core-bridge';
98
import {
109
pollLogs,
@@ -107,7 +106,7 @@ export function makeTelemetryFilterString(options: MakeTelemetryFilterStringOpti
107106
}
108107

109108
/** A logger that buffers logs from both Node.js and Rust Core and emits logs in the right order */
110-
export class CoreLogger extends DefaultLogger {
109+
class BufferedLogger extends DefaultLogger {
111110
protected buffer = new Heap<LogEntry>((a, b) => Number(a.timestampNanos - b.timestampNanos));
112111

113112
constructor(protected readonly next: Logger) {
@@ -135,7 +134,7 @@ export class Runtime {
135134
protected pendingCreations = 0;
136135
/** Track the registered native objects to automatically shutdown when all have been deregistered */
137136
protected readonly backRefs = new Set<TrackedNativeObject>();
138-
protected readonly shouldPollForLogs = new BehaviorSubject<boolean>(false);
137+
protected readonly stopPollingForLogs = new Subject<void>();
139138
protected readonly logPollPromise: Promise<void>;
140139
public readonly logger: Logger;
141140
protected readonly shutdownSignalCallbacks = new Set<() => void>();
@@ -152,7 +151,7 @@ export class Runtime {
152151

153152
protected constructor(public readonly native: native.Runtime, public readonly options: CompiledRuntimeOptions) {
154153
if (this.isForwardingLogs()) {
155-
const logger = (this.logger = new CoreLogger(this.options.logger));
154+
const logger = (this.logger = new BufferedLogger(this.options.logger));
156155
this.logPollPromise = this.initLogPolling(logger);
157156
} else {
158157
this.logger = this.options.logger;
@@ -265,31 +264,37 @@ export class Runtime {
265264
};
266265
}
267266

268-
protected async initLogPolling(logger: CoreLogger): Promise<void> {
269-
this.shouldPollForLogs.next(true);
270-
267+
protected async initLogPolling(logger: BufferedLogger): Promise<void> {
271268
if (!this.isForwardingLogs()) {
272269
return;
273270
}
271+
272+
const stopPollingForLogs = firstValueFrom(this.stopPollingForLogs);
273+
274274
const poll = promisify(pollLogs);
275+
const doPoll = async () => {
276+
const logs = await poll(this.native);
277+
for (const log of logs) {
278+
const meta: Record<string | symbol, unknown> = {
279+
[LogTimestamp]: timeOfDayToBigint(log.timestamp),
280+
};
281+
logger.log(log.level, log.message, meta);
282+
}
283+
};
284+
275285
try {
276-
await lastValueFrom(
277-
of(this.shouldPollForLogs).pipe(
278-
map((subject) => subject.getValue()),
279-
takeWhile((shouldPoll) => shouldPoll),
280-
concatMap(() => poll(this.native)),
281-
map((logs) => {
282-
for (const log of logs) {
283-
logger.log(log.level, log.message, {
284-
[LogTimestamp]: timeOfDayToBigint(log.timestamp),
285-
});
286-
}
287-
logger.flush();
288-
}),
289-
delay(3), // Don't go wild polling as fast as possible
290-
repeat()
291-
)
292-
);
286+
for (;;) {
287+
await doPoll();
288+
logger.flush();
289+
const stop = await Promise.race([
290+
stopPollingForLogs.then(() => true),
291+
new Promise<boolean>((resolve) => setTimeout(() => resolve(false), 3)),
292+
]);
293+
if (stop) {
294+
await doPoll();
295+
break;
296+
}
297+
}
293298
} catch (error) {
294299
// Log using the original logger instead of buffering
295300
this.options.logger.warn('Error gathering forwarded logs from core', { error });
@@ -311,7 +316,7 @@ export class Runtime {
311316
*/
312317
flushLogs(): void {
313318
if (this.isForwardingLogs()) {
314-
const logger = this.logger as CoreLogger;
319+
const logger = this.logger as BufferedLogger;
315320
logger.flush();
316321
}
317322
}
@@ -474,7 +479,7 @@ export class Runtime {
474479
public async shutdown(): Promise<void> {
475480
delete Runtime._instance;
476481
this.teardownShutdownHook();
477-
this.shouldPollForLogs.next(false);
482+
this.stopPollingForLogs.next();
478483
// This will effectively drain all logs
479484
await this.logPollPromise;
480485
await promisify(runtimeShutdown)(this.native);

0 commit comments

Comments
 (0)