Skip to content

Commit bcde021

Browse files
authored
feat(client): Add withAbortSignal context manager (#1272)
## What was changed Clients and the Connection class get a `withAbortSignal` context manager that can be used to cancel all ongoing connections started within a function's scope. ```ts const ctrl = new AbortController(); setTimeout(() => ctrl.abort(), 10_000); // 👇 throws if incomplete by the timeout. await conn.withAbortSignal(ctrl.signal, () => { /* make client or direct grpc calls */); ``` ## Why? Provide a way to abort client grpc calls.
1 parent 63583b7 commit bcde021

File tree

5 files changed

+70
-12
lines changed

5 files changed

+70
-12
lines changed

packages/activity/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
* @module
7070
*/
7171

72+
// Keep this around until we drop support for Node 14.
7273
import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import
7374
import { AsyncLocalStorage } from 'node:async_hooks';
7475
import { Logger, Duration, LogLevel, LogMetadata } from '@temporalio/common';

packages/client/src/base-client.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// Keep this around until we drop support for Node 14.
2+
import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import
13
import os from 'node:os';
24
import { DataConverter, LoadedDataConverter } from '@temporalio/common';
35
import { isLoadedDataConverter, loadDataConverter } from '@temporalio/common/lib/internal-non-workflow';
@@ -67,6 +69,18 @@ export class BaseClient {
6769
return await this.connection.withDeadline(deadline, fn);
6870
}
6971

72+
/**
73+
* Set an {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that, when aborted,
74+
* cancels any ongoing service requests executed in `fn`'s scope.
75+
*
76+
* @returns value returned from `fn`
77+
*
78+
* @see {@link Connection.withAbortSignal}
79+
*/
80+
async withAbortSignal<R>(abortSignal: AbortSignal, fn: () => Promise<R>): Promise<R> {
81+
return await this.connection.withAbortSignal(abortSignal, fn);
82+
}
83+
7084
/**
7185
* Set metadata for any service requests executed in `fn`'s scope.
7286
*

packages/client/src/connection.ts

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// Keep this around until we drop support for Node 14.
2+
import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import
13
import { AsyncLocalStorage } from 'node:async_hooks';
24
import * as grpc from '@grpc/grpc-js';
35
import type { RPCImpl } from 'protobufjs';
@@ -375,7 +377,7 @@ export class Connection {
375377
}: RPCImplOptions): RPCImpl {
376378
return (method: { name: string }, requestData: any, callback: grpc.requestCallback<any>) => {
377379
const metadataContainer = new grpc.Metadata();
378-
const { metadata, deadline } = callContextStorage.getStore() ?? {};
380+
const { metadata, deadline, abortSignal } = callContextStorage.getStore() ?? {};
379381
for (const [k, v] of Object.entries(staticMetadata)) {
380382
metadataContainer.set(k, v);
381383
}
@@ -384,7 +386,7 @@ export class Connection {
384386
metadataContainer.set(k, v);
385387
}
386388
}
387-
return client.makeUnaryRequest(
389+
const call = client.makeUnaryRequest(
388390
`/${serviceName}/${method.name}`,
389391
(arg: any) => arg,
390392
(arg: any) => arg,
@@ -393,6 +395,11 @@ export class Connection {
393395
{ interceptors, deadline },
394396
callback
395397
);
398+
if (abortSignal != null) {
399+
abortSignal.addEventListener('abort', () => call.cancel());
400+
}
401+
402+
return call;
396403
};
397404
}
398405

@@ -403,7 +410,27 @@ export class Connection {
403410
*/
404411
async withDeadline<ReturnType>(deadline: number | Date, fn: () => Promise<ReturnType>): Promise<ReturnType> {
405412
const cc = this.callContextStorage.getStore();
406-
return await this.callContextStorage.run({ deadline, metadata: cc?.metadata }, fn);
413+
return await this.callContextStorage.run({ ...cc, deadline }, fn);
414+
}
415+
416+
/**
417+
* Set an {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that, when aborted,
418+
* cancels any ongoing requests executed in `fn`'s scope.
419+
*
420+
* @returns value returned from `fn`
421+
*
422+
* @example
423+
*
424+
* ```ts
425+
* const ctrl = new AbortController();
426+
* setTimeout(() => ctrl.abort(), 10_000);
427+
* // 👇 throws if incomplete by the timeout.
428+
* await conn.withAbortSignal(ctrl.signal, () => client.workflow.execute(myWorkflow, options));
429+
* ```
430+
*/
431+
async withAbortSignal<ReturnType>(abortSignal: AbortSignal, fn: () => Promise<ReturnType>): Promise<ReturnType> {
432+
const cc = this.callContextStorage.getStore();
433+
return await this.callContextStorage.run({ ...cc, abortSignal }, fn);
407434
}
408435

409436
/**
@@ -416,16 +443,15 @@ export class Connection {
416443
*
417444
* @example
418445
*
419-
*```ts
420-
*const workflowHandle = await conn.withMetadata({ apiKey: 'secret' }, () =>
421-
* conn.withMetadata({ otherKey: 'set' }, () => client.start(options)))
422-
*);
423-
*```
446+
* ```ts
447+
* const workflowHandle = await conn.withMetadata({ apiKey: 'secret' }, () =>
448+
* conn.withMetadata({ otherKey: 'set' }, () => client.start(options)))
449+
* );
450+
* ```
424451
*/
425452
async withMetadata<ReturnType>(metadata: Metadata, fn: () => Promise<ReturnType>): Promise<ReturnType> {
426453
const cc = this.callContextStorage.getStore();
427-
metadata = { ...cc?.metadata, ...metadata };
428-
return await this.callContextStorage.run({ metadata, deadline: cc?.deadline }, fn);
454+
return await this.callContextStorage.run({ ...cc, metadata: { ...cc?.metadata, ...metadata } }, fn);
429455
}
430456

431457
/**

packages/client/src/types.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ export interface CallContext {
8282
* Metadata to set on gRPC requests
8383
*/
8484
metadata?: Metadata;
85+
86+
abortSignal?: AbortSignal;
8587
}
8688

8789
/**
@@ -105,4 +107,12 @@ export interface ConnectionLike {
105107
* @returns returned value of `fn`
106108
*/
107109
withMetadata<R>(metadata: Metadata, fn: () => Promise<R>): Promise<R>;
110+
111+
/**
112+
* Set an {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that, when aborted,
113+
* cancels any ongoing requests executed in `fn`'s scope.
114+
*
115+
* @returns value returned from `fn`
116+
*/
117+
withAbortSignal<R>(abortSignal: AbortSignal, fn: () => Promise<R>): Promise<R>;
108118
}

packages/test/src/test-client-connection.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import fs from 'node:fs/promises';
44
import test from 'ava';
55
import * as grpc from '@grpc/grpc-js';
66
import * as protoLoader from '@grpc/proto-loader';
7-
import { Connection, defaultGrpcRetryOptions, makeGrpcRetryInterceptor } from '@temporalio/client';
7+
import { Connection, defaultGrpcRetryOptions, isGrpcServiceError, makeGrpcRetryInterceptor } from '@temporalio/client';
88
import pkg from '@temporalio/client/lib/pkg';
99
import { temporal, grpc as grpcProto } from '@temporalio/proto';
1010

@@ -38,7 +38,7 @@ async function bindLocalhostTls(server: grpc.Server): Promise<number> {
3838
return await util.promisify(server.bindAsync.bind(server))('localhost:0', credentials);
3939
}
4040

41-
test('withMetadata / withDeadline set the CallContext for RPC call', async (t) => {
41+
test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC call', async (t) => {
4242
const server = new grpc.Server();
4343
let gotTestHeaders = false;
4444
let gotDeadline = false;
@@ -73,6 +73,9 @@ test('withMetadata / withDeadline set the CallContext for RPC call', async (t) =
7373
}
7474
callback(null, {});
7575
},
76+
updateNamespace() {
77+
// Simulate a hanging call to test abort signal support.
78+
},
7679
});
7780
const port = await bindLocalhost(server);
7881
server.start();
@@ -84,6 +87,10 @@ test('withMetadata / withDeadline set the CallContext for RPC call', async (t) =
8487
);
8588
t.true(gotTestHeaders);
8689
t.true(gotDeadline);
90+
const ctrl = new AbortController();
91+
setTimeout(() => ctrl.abort(), 10);
92+
const err = await t.throwsAsync(conn.withAbortSignal(ctrl.signal, () => conn.workflowService.updateNamespace({})));
93+
t.true(isGrpcServiceError(err) && err.code === grpc.status.CANCELLED);
8794
});
8895

8996
test('healthService works', async (t) => {

0 commit comments

Comments
 (0)