Skip to content

feat(cloudflare): Flush after waitUntil #16681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 26, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions packages/cloudflare/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ClientOptions, Options, ServerRuntimeClientOptions } from '@sentry/core';
import { applySdkMetadata, ServerRuntimeClient } from '@sentry/core';
import type { makeFlushLock } from './flush';
import type { CloudflareTransportOptions } from './transport';

/**
Expand All @@ -8,24 +9,41 @@ import type { CloudflareTransportOptions } from './transport';
* @see CloudflareClientOptions for documentation on configuration options.
* @see ServerRuntimeClient for usage documentation.
*/
export class CloudflareClient extends ServerRuntimeClient<CloudflareClientOptions> {
export class CloudflareClient extends ServerRuntimeClient {
private readonly _flushLock: ReturnType<typeof makeFlushLock> | void;
/**
* Creates a new Cloudflare SDK instance.
* @param options Configuration options for this SDK.
*/
public constructor(options: CloudflareClientOptions) {
applySdkMetadata(options, 'cloudflare');
options._metadata = options._metadata || {};
const { flushLock, ...serverOptions } = options;

const clientOptions: ServerRuntimeClientOptions = {
...options,
...serverOptions,
platform: 'javascript',
// TODO: Grab version information
runtime: { name: 'cloudflare' },
// TODO: Add server name
};

super(clientOptions);
this._flushLock = flushLock;
}

/**
* Flushes pending operations and ensures all data is processed.
* If a timeout is provided, the operation will be completed within the specified time limit.
*
* @param {number} [timeout] - Optional timeout in milliseconds to force the completion of the flush operation.
* @return {Promise<boolean>} A promise that resolves to a boolean indicating whether the flush operation was successful.
*/
public async flush(timeout?: number): Promise<boolean> {
if (this._flushLock) {
await this._flushLock.finalize();
}
return super.flush(timeout);
}
}

Expand All @@ -44,4 +62,6 @@ export interface CloudflareOptions extends Options<CloudflareTransportOptions>,
*
* @see CloudflareClient for more information.
*/
export interface CloudflareClientOptions extends ClientOptions<CloudflareTransportOptions>, BaseCloudflareOptions {}
export interface CloudflareClientOptions extends ClientOptions<CloudflareTransportOptions>, BaseCloudflareOptions {
flushLock?: ReturnType<typeof makeFlushLock>;
}
25 changes: 14 additions & 11 deletions packages/cloudflare/src/flush.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import type { ExecutionContext } from '@cloudflare/workers-types';
import { flush } from '@sentry/core';

type Flusher = (...params: Parameters<typeof flush>) => void;
type FlushLock = {
readonly ready: Promise<void>;
readonly finalize: () => Promise<void>;
};

/**
* Enhances the given execution context by wrapping its `waitUntil` method with a proxy
* to monitor pending tasks, and provides a flusher function to ensure all tasks
* have been completed before executing any subsequent logic.
*
* @param {ExecutionContext | void} context - The execution context to be enhanced. If no context is provided, the function returns undefined.
* @return {Flusher | void} Returns a flusher function if a valid context is provided, otherwise undefined.
* @param {ExecutionContext} context - The execution context to be enhanced. If no context is provided, the function returns undefined.
* @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined.
*/
export function makeFlushAfterAll(context: ExecutionContext): Flusher {
export function makeFlushLock(context: ExecutionContext): FlushLock {
let resolveAllDone: () => void = () => undefined;
const allDone = new Promise<void>(res => {
resolveAllDone = res;
Expand All @@ -26,10 +28,11 @@ export function makeFlushAfterAll(context: ExecutionContext): Flusher {
}),
);
};
return (...params: Parameters<typeof flush>) => {
if (pending === 0) {
return originalWaitUntil(flush(...params));
}
originalWaitUntil(allDone.finally(() => flush(...params)));
};
return Object.freeze({
ready: allDone,
finalize: () => {
if (pending === 0) resolveAllDone();
return allDone;
},
});
}
26 changes: 13 additions & 13 deletions packages/cloudflare/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
} from '@sentry/core';
import { setAsyncLocalStorageAsyncContextStrategy } from './async';
import type { CloudflareOptions } from './client';
import { makeFlushAfterAll } from './flush';
import { isInstrumented, markAsInstrumented } from './instrument';
import { getFinalOptions } from './options';
import { wrapRequestHandler } from './request';
Expand Down Expand Up @@ -72,11 +71,11 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
handler.scheduled = new Proxy(handler.scheduled, {
apply(target, thisArg, args: Parameters<ExportedHandlerScheduledHandler<Env>>) {
const [event, env, context] = args;
const flushAfterAll = makeFlushAfterAll(context);
return withIsolationScope(isolationScope => {
const options = getFinalOptions(optionsCallback(env), env);
const waitUntil = context.waitUntil.bind(context);

const client = init(options);
const client = init(options, context);
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);
Expand All @@ -100,7 +99,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
flushAfterAll(2000);
waitUntil(client?.flush?.(2000));
}
},
);
Expand All @@ -115,11 +114,11 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
handler.email = new Proxy(handler.email, {
apply(target, thisArg, args: Parameters<EmailExportedHandler<Env>>) {
const [emailMessage, env, context] = args;
const flushAfterAll = makeFlushAfterAll(context);
return withIsolationScope(isolationScope => {
const options = getFinalOptions(optionsCallback(env), env);
const waitUntil = context.waitUntil.bind(context);

const client = init(options);
const client = init(options, context);
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);
Expand All @@ -141,7 +140,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
flushAfterAll(2000);
waitUntil(client?.flush?.(2000));
}
},
);
Expand All @@ -156,12 +155,12 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
handler.queue = new Proxy(handler.queue, {
apply(target, thisArg, args: Parameters<ExportedHandlerQueueHandler<Env, QueueHandlerMessage>>) {
const [batch, env, context] = args;
const flushAfterAll = makeFlushAfterAll(context);

return withIsolationScope(isolationScope => {
const options = getFinalOptions(optionsCallback(env), env);
const waitUntil = context.waitUntil.bind(context);

const client = init(options);
const client = init(options, context);
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);
Expand All @@ -188,7 +187,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
flushAfterAll(2000);
waitUntil(client?.flush?.(2000));
}
},
);
Expand All @@ -203,12 +202,13 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
handler.tail = new Proxy(handler.tail, {
apply(target, thisArg, args: Parameters<ExportedHandlerTailHandler<Env>>) {
const [, env, context] = args;
const flushAfterAll = makeFlushAfterAll(context);

return withIsolationScope(async isolationScope => {
const options = getFinalOptions(optionsCallback(env), env);

const client = init(options);
const waitUntil = context.waitUntil.bind(context);

const client = init(options, context);
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);
Expand All @@ -219,7 +219,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
flushAfterAll(2000);
waitUntil(client?.flush?.(2000));
}
});
},
Expand Down
10 changes: 5 additions & 5 deletions packages/cloudflare/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
withIsolationScope,
} from '@sentry/core';
import type { CloudflareOptions } from './client';
import { makeFlushAfterAll } from './flush';
import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils';
import { init } from './sdk';

Expand All @@ -34,9 +33,10 @@ export function wrapRequestHandler(
// For example, for Astro while prerendering pages at build time.
// see: https://github.com/getsentry/sentry-javascript/issues/13217
const context = wrapperOptions.context as ExecutionContext | undefined;
const afterAllFlusher = context ? makeFlushAfterAll(context) : undefined;

const client = init(options);
const waitUntil = context?.waitUntil?.bind?.(context);

const client = init(options, context);
isolationScope.setClient(client);

const urlObject = parseStringToURLObject(request.url);
Expand Down Expand Up @@ -66,7 +66,7 @@ export function wrapRequestHandler(
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
afterAllFlusher?.(2000);
waitUntil?.(client?.flush?.(2000));
}
}

Expand All @@ -90,7 +90,7 @@ export function wrapRequestHandler(
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
afterAllFlusher?.(2000);
waitUntil?.(client?.flush?.(2000));
}
},
);
Expand Down
7 changes: 6 additions & 1 deletion packages/cloudflare/src/sdk.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { type ExecutionContext } from '@cloudflare/workers-types';
import type { Integration } from '@sentry/core';
import {
consoleIntegration,
Expand All @@ -12,6 +13,7 @@ import {
} from '@sentry/core';
import type { CloudflareClientOptions, CloudflareOptions } from './client';
import { CloudflareClient } from './client';
import { makeFlushLock } from './flush';
import { fetchIntegration } from './integrations/fetch';
import { makeCloudflareTransport } from './transport';
import { defaultStackParser } from './vendor/stacktrace';
Expand All @@ -36,16 +38,19 @@ export function getDefaultIntegrations(options: CloudflareOptions): Integration[
/**
* Initializes the cloudflare SDK.
*/
export function init(options: CloudflareOptions): CloudflareClient | undefined {
export function init(options: CloudflareOptions, ctx: ExecutionContext | void): CloudflareClient | undefined {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make ctx: ExecutionContext | void part of CloudflareOptions instead of a separate arg.

void probably be replaced with ctx?: ExecutionContext as well.

if (options.defaultIntegrations === undefined) {
options.defaultIntegrations = getDefaultIntegrations(options);
}

const flushLock = ctx ? makeFlushLock(ctx) : undefined;

const clientOptions: CloudflareClientOptions = {
...options,
stackParser: stackParserFromStackParserOptions(options.stackParser || defaultStackParser),
integrations: getIntegrationsToSetup(options),
transport: options.transport || makeCloudflareTransport,
flushLock,
};

return initAndBind(CloudflareClient, clientOptions) as CloudflareClient;
Expand Down
19 changes: 7 additions & 12 deletions packages/cloudflare/test/flush.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { type ExecutionContext } from '@cloudflare/workers-types';
import * as SentryCore from '@sentry/core';
import { describe, expect, it, onTestFinished, vi } from 'vitest';
import { makeFlushAfterAll } from '../src/flush';
import { makeFlushLock } from '../src/flush';

describe('Flush buffer test', () => {
const waitUntilPromises: Promise<void>[] = [];
Expand All @@ -11,25 +10,21 @@ describe('Flush buffer test', () => {
}),
passThroughOnException: vi.fn(),
};
it('should flush buffer immediately if no waitUntil were called', () => {
const coreFlush = vi.spyOn(SentryCore, 'flush');
const flush = makeFlushAfterAll(mockExecutionContext);
flush();
expect(coreFlush).toBeCalled();
it('should flush buffer immediately if no waitUntil were called', async () => {
const { finalize } = makeFlushLock(mockExecutionContext);
await expect(finalize()).resolves.toBeUndefined();
});
it('should flush buffer only after all waitUntil were finished', async () => {
vi.useFakeTimers();
onTestFinished(() => {
vi.useRealTimers();
});
const task = new Promise(resolve => setTimeout(resolve, 100));
const coreFlush = vi.spyOn(SentryCore, 'flush');
const flush = makeFlushAfterAll(mockExecutionContext);
const lock = makeFlushLock(mockExecutionContext);
mockExecutionContext.waitUntil(task);
flush();
expect(coreFlush).not.toBeCalled();
void lock.finalize();
vi.advanceTimersToNextTimer();
await Promise.all(waitUntilPromises);
expect(coreFlush).toBeCalled();
await expect(lock.ready).resolves.toBeUndefined();
});
});
Loading