Skip to content

Commit 2e52f01

Browse files
authored
fix(worker): Remove rx subject in Runtime (#1242)
1 parent c47f383 commit 2e52f01

File tree

1 file changed

+9
-11
lines changed

1 file changed

+9
-11
lines changed

packages/worker/src/runtime.ts

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import * as v8 from 'node:v8';
33
import * as fs from 'node:fs';
44
import * as os from 'node:os';
55
import { Heap } from 'heap-js';
6-
import { Subject, firstValueFrom } from 'rxjs';
76
import * as native from '@temporalio/core-bridge';
87
import {
98
pollLogs,
@@ -134,7 +133,8 @@ export class Runtime {
134133
protected pendingCreations = 0;
135134
/** Track the registered native objects to automatically shutdown when all have been deregistered */
136135
protected readonly backRefs = new Set<TrackedNativeObject>();
137-
protected readonly stopPollingForLogs = new Subject<void>();
136+
protected stopPollingForLogs = false;
137+
protected stopPollingForLogsCallback?: () => void;
138138
protected readonly logPollPromise: Promise<void>;
139139
public readonly logger: Logger;
140140
protected readonly shutdownSignalCallbacks = new Set<() => void>();
@@ -269,8 +269,6 @@ export class Runtime {
269269
return;
270270
}
271271

272-
const stopPollingForLogs = firstValueFrom(this.stopPollingForLogs);
273-
274272
const poll = promisify(pollLogs);
275273
const doPoll = async () => {
276274
const logs = await poll(this.native);
@@ -286,14 +284,13 @@ export class Runtime {
286284
for (;;) {
287285
await doPoll();
288286
logger.flush();
289-
const stop = await Promise.race([
290-
stopPollingForLogs.then(() => true),
291-
new Promise<boolean>((resolve) => setTimeout(() => resolve(false), 3)),
292-
]);
293-
if (stop) {
294-
await doPoll();
287+
if (this.stopPollingForLogs) {
295288
break;
296289
}
290+
await new Promise<void>((resolve) => {
291+
setTimeout(resolve, 3);
292+
this.stopPollingForLogsCallback = resolve;
293+
});
297294
}
298295
} catch (error) {
299296
// Log using the original logger instead of buffering
@@ -479,7 +476,8 @@ export class Runtime {
479476
public async shutdown(): Promise<void> {
480477
delete Runtime._instance;
481478
this.teardownShutdownHook();
482-
this.stopPollingForLogs.next();
479+
this.stopPollingForLogs = true;
480+
this.stopPollingForLogsCallback?.();
483481
// This will effectively drain all logs
484482
await this.logPollPromise;
485483
await promisify(runtimeShutdown)(this.native);

0 commit comments

Comments
 (0)