Skip to content

Commit ea1ce78

Browse files
authored
Add maxPollSeconds and pollIntervalMs options to flow worker (#90)
* feat: add configurable polling intervals and batch size options to EdgeWorker and related components - Introduced maxPollSeconds and pollIntervalMs options in EdgeWorker configuration - Updated StepTaskPoller to accept and utilize maxPollSeconds and pollIntervalMs - Extended createFlowWorker to support polling interval and max poll seconds - Modified tests to include new polling configuration options for better control and testing - Ensured default values are set when options are not provided - Updated helper functions to pass new configuration parameters for testing purposes * fix(core): move visibilityTimeout as last opt in pollForTasks * refactor(worker): add promise tracking for main loop to improve shutdown handling - Introduced mainLoopPromise to track the worker's main loop execution - Updated startOnlyOnce to prevent multiple concurrent starts - Enhanced stop method to await main loop completion and handle errors - Improved worker shutdown to ensure all tasks are finalized before stopping * fix: improve worker shutdown and task polling behavior - Make worker.stop() wait for the main loop promise to ensure proper shutdown - Adjust pollForTasks to make visibilityTimeout the last option, allowing it to be skipped if not needed * fix: remove unused import in integration test helpers - Deleted the import of createFakeLogger from _helpers.ts to clean up unused code - Minor cleanup to improve code clarity and maintainability
1 parent da63d9c commit ea1ce78

File tree

9 files changed

+68
-8
lines changed

9 files changed

+68
-8
lines changed

.changeset/bumpy-words-scream.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@pgflow/edge-worker': patch
3+
---
4+
5+
Make worker.stop() wait for the main loop promise

.changeset/dirty-moose-reply.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@pgflow/core': patch
3+
---
4+
5+
Make visibilityTimeout the last option to pollForTasks so it can be skipped

pkgs/core/src/PgflowSqlClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ export class PgflowSqlClient<TFlow extends AnyFlow>
1919
async pollForTasks(
2020
queueName: string,
2121
batchSize = 20,
22-
visibilityTimeout = 2,
2322
maxPollSeconds = 5,
24-
pollIntervalMs = 200
23+
pollIntervalMs = 200,
24+
visibilityTimeout = 2
2525
): Promise<StepTaskRecord<TFlow>[]> {
2626
return await this.sql<StepTaskRecord<TFlow>[]>`
2727
SELECT *

pkgs/edge-worker/src/EdgeWorker.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,12 @@ export class EdgeWorker {
186186
*
187187
* // batch size for polling messages
188188
* batchSize: 10,
189+
*
190+
* // in-worker polling interval
191+
* maxPollSeconds: 2,
192+
*
193+
* // in-database polling interval
194+
* pollIntervalMs: 100,
189195
* });
190196
* ```
191197
*/
@@ -204,6 +210,8 @@ export class EdgeWorker {
204210
maxConcurrent: config.maxConcurrent ?? 10,
205211
maxPgConnections: config.maxPgConnections ?? 4,
206212
batchSize: config.batchSize ?? 10,
213+
maxPollSeconds: config.maxPollSeconds ?? 2,
214+
pollIntervalMs: config.pollIntervalMs ?? 100,
207215
connectionString:
208216
config.connectionString || this.platform.getConnectionString(),
209217
};

pkgs/edge-worker/src/core/Worker.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export class Worker {
99

1010
private batchProcessor: IBatchProcessor;
1111
private sql: postgres.Sql;
12+
private mainLoopPromise: Promise<void> | undefined;
1213

1314
constructor(
1415
batchProcessor: IBatchProcessor,
@@ -22,13 +23,15 @@ export class Worker {
2223
this.logger = logger;
2324
}
2425

25-
async startOnlyOnce(workerBootstrap: WorkerBootstrap) {
26+
startOnlyOnce(workerBootstrap: WorkerBootstrap) {
2627
if (this.lifecycle.isRunning) {
2728
this.logger.debug('Worker already running, ignoring start request');
2829
return;
2930
}
3031

31-
await this.start(workerBootstrap);
32+
if (!this.mainLoopPromise) {
33+
this.mainLoopPromise = this.start(workerBootstrap);
34+
}
3235
}
3336

3437
private async start(workerBootstrap: WorkerBootstrap) {
@@ -68,6 +71,16 @@ export class Worker {
6871
this.logger.info('-> Stopped accepting new messages');
6972
this.abortController.abort();
7073

74+
try {
75+
this.logger.debug('-> Waiting for main loop to complete');
76+
await this.mainLoopPromise;
77+
} catch (error) {
78+
this.logger.error(
79+
`Error in main loop: ${error}. Continuing to stop worker`
80+
);
81+
throw error;
82+
}
83+
7184
this.logger.info('-> Waiting for pending tasks to complete...');
7285
await this.batchProcessor.awaitCompletion();
7386
this.logger.info('-> Pending tasks completed!');

pkgs/edge-worker/src/flow/StepTaskPoller.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import type { AnyFlow } from '@pgflow/dsl';
66
export interface StepTaskPollerConfig {
77
batchSize: number;
88
queueName: string;
9+
maxPollSeconds?: number;
10+
pollIntervalMs?: number;
911
}
1012

1113
/**
@@ -32,13 +34,16 @@ export class StepTaskPoller<TFlow extends AnyFlow>
3234
}
3335

3436
this.logger.debug(
35-
`Polling for flow tasks with batch size ${this.config.batchSize}`
37+
`Polling for flow tasks with batch size ${this.config.batchSize}, maxPollSeconds: ${this.config.maxPollSeconds}, pollIntervalMs: ${this.config.pollIntervalMs}`
3638
);
3739

3840
try {
41+
// Pass polling configuration to the adapter if they're provided
3942
const tasks = await this.adapter.pollForTasks(
4043
this.config.queueName,
41-
this.config.batchSize
44+
this.config.batchSize,
45+
this.config.maxPollSeconds,
46+
this.config.pollIntervalMs
4247
);
4348
this.logger.debug(`Retrieved ${tasks.length} flow tasks`);
4449
return tasks;

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@ export type FlowWorkerConfig = {
4444
* @default 10
4545
*/
4646
batchSize?: number;
47+
48+
/**
49+
* In-worker polling interval in seconds
50+
* @default 2
51+
*/
52+
maxPollSeconds?: number;
53+
54+
/**
55+
* In-database polling interval in milliseconds
56+
* @default 100
57+
*/
58+
pollIntervalMs?: number;
4759
};
4860

4961
/**
@@ -96,6 +108,8 @@ export function createFlowWorker<TFlow extends AnyFlow>(
96108
const pollerConfig: StepTaskPollerConfig = {
97109
batchSize: config.batchSize || 10,
98110
queueName: flow.slug,
111+
maxPollSeconds: config.maxPollSeconds || 2,
112+
pollIntervalMs: config.pollIntervalMs || 100,
99113
};
100114
const poller = new StepTaskPoller<TFlow>(
101115
pgflowAdapter,

pkgs/edge-worker/tests/integration/_helpers.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
type FlowWorkerConfig,
55
} from '../../src/flow/createFlowWorker.ts';
66
import type { postgres } from '../sql.ts';
7-
import { createFakeLogger } from '../fakes.ts';
87
import { PgflowSqlClient } from '@pgflow/core';
98

109
export async function startFlow<TFlow extends AnyFlow>(
@@ -33,7 +32,14 @@ export function startWorker<
3332
...options,
3433
};
3534

36-
const worker = createFlowWorker(flow, mergedOptions, createFakeLogger);
35+
const consoleLogger = {
36+
debug: console.log,
37+
info: console.log,
38+
warn: console.warn,
39+
error: console.error,
40+
};
41+
42+
const worker = createFlowWorker(flow, mergedOptions, () => consoleLogger);
3743

3844
worker.startOnlyOnce({
3945
edgeFunctionName: 'test_flow',

pkgs/edge-worker/tests/integration/flow/minimalFlow.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ Deno.test(
3030
const worker = startWorker(sql, MinimalFlow, {
3131
maxConcurrent: 1,
3232
batchSize: 10,
33+
maxPollSeconds: 1,
34+
pollIntervalMs: 200,
3335
});
3436

3537
try {
@@ -114,8 +116,10 @@ Deno.test(
114116
'Run output should match expected value'
115117
);
116118
} finally {
119+
console.log('Stopping worker');
117120
// Stop the worker
118121
await worker.stop();
122+
console.log('Worker stopped');
119123
}
120124
})
121125
);

0 commit comments

Comments
 (0)