Skip to content

Commit 7751b57

Browse files
mjameswhbergundy
andauthored
feat(client): Throw more specific errors from Client APIs (#1147)
## What changed - All Client APIs now throw `NamespaceNotFoundError` when appropriate, rather than `WorkflowNotFound`, `ScheduleNotFound` or `ActivityNotFound` (fix #1145, fix #976) - Errors that happen while preparing API requests from a `Client`, and particularly errors happening while converting input objects to payloads, now results in throwing an appropriate exception, rather than an opaque ServiceError. --------- Co-authored-by: Roey Berman <roey.berman@gmail.com>
1 parent 5c99e89 commit 7751b57

File tree

16 files changed

+518
-201
lines changed

16 files changed

+518
-201
lines changed

packages/client/src/async-completion-client.ts

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import { Status } from '@grpc/grpc-js/build/src/constants';
1+
import { status as grpcStatus } from '@grpc/grpc-js';
22
import { ensureTemporalFailure } from '@temporalio/common';
3+
import type { temporal } from '@temporalio/proto';
34
import {
45
encodeErrorToFailure,
56
encodeToPayloads,
@@ -12,8 +13,9 @@ import {
1213
LoadedWithDefaults,
1314
WithDefaults,
1415
} from './base-client';
15-
import { isServerErrorResponse } from './errors';
16+
import { isGrpcServiceError } from './errors';
1617
import { WorkflowService } from './types';
18+
import { rethrowKnownErrorTypes } from './helpers';
1719

1820
/**
1921
* Thrown by {@link AsyncCompletionClient} when trying to complete or heartbeat an Activity that does not exist in the
@@ -95,10 +97,13 @@ export class AsyncCompletionClient extends BaseClient {
9597
* Transforms grpc errors into well defined TS errors.
9698
*/
9799
protected handleError(err: unknown): never {
98-
if (isServerErrorResponse(err)) {
99-
if (err.code === Status.NOT_FOUND) {
100+
if (isGrpcServiceError(err)) {
101+
rethrowKnownErrorTypes(err);
102+
103+
if (err.code === grpcStatus.NOT_FOUND) {
100104
throw new ActivityNotFoundError('Not found');
101105
}
106+
102107
throw new ActivityCompletionError(err.details || err.message);
103108
}
104109
throw new ActivityCompletionError('Unexpected failure');
@@ -114,20 +119,21 @@ export class AsyncCompletionClient extends BaseClient {
114119
async complete(fullActivityId: FullActivityId, result: unknown): Promise<void>;
115120

116121
async complete(taskTokenOrFullActivityId: Uint8Array | FullActivityId, result: unknown): Promise<void> {
122+
const payloads = await encodeToPayloads(this.dataConverter, result);
117123
try {
118124
if (taskTokenOrFullActivityId instanceof Uint8Array) {
119125
await this.workflowService.respondActivityTaskCompleted({
120126
identity: this.options.identity,
121127
namespace: this.options.namespace,
122128
taskToken: taskTokenOrFullActivityId,
123-
result: { payloads: await encodeToPayloads(this.dataConverter, result) },
129+
result: { payloads },
124130
});
125131
} else {
126132
await this.workflowService.respondActivityTaskCompletedById({
127133
identity: this.options.identity,
128134
namespace: this.options.namespace,
129135
...taskTokenOrFullActivityId,
130-
result: { payloads: await encodeToPayloads(this.dataConverter, result) },
136+
result: { payloads },
131137
});
132138
}
133139
} catch (err) {
@@ -145,20 +151,21 @@ export class AsyncCompletionClient extends BaseClient {
145151
async fail(fullActivityId: FullActivityId, err: unknown): Promise<void>;
146152

147153
async fail(taskTokenOrFullActivityId: Uint8Array | FullActivityId, err: unknown): Promise<void> {
154+
const failure = await encodeErrorToFailure(this.dataConverter, ensureTemporalFailure(err));
148155
try {
149156
if (taskTokenOrFullActivityId instanceof Uint8Array) {
150157
await this.workflowService.respondActivityTaskFailed({
151158
identity: this.options.identity,
152159
namespace: this.options.namespace,
153160
taskToken: taskTokenOrFullActivityId,
154-
failure: await encodeErrorToFailure(this.dataConverter, ensureTemporalFailure(err)),
161+
failure,
155162
});
156163
} else {
157164
await this.workflowService.respondActivityTaskFailedById({
158165
identity: this.options.identity,
159166
namespace: this.options.namespace,
160167
...taskTokenOrFullActivityId,
161-
failure: await encodeErrorToFailure(this.dataConverter, err),
168+
failure,
162169
});
163170
}
164171
} catch (err) {
@@ -176,20 +183,21 @@ export class AsyncCompletionClient extends BaseClient {
176183
reportCancellation(fullActivityId: FullActivityId, details?: unknown): Promise<void>;
177184

178185
async reportCancellation(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise<void> {
186+
const payloads = await encodeToPayloads(this.dataConverter, details);
179187
try {
180188
if (taskTokenOrFullActivityId instanceof Uint8Array) {
181189
await this.workflowService.respondActivityTaskCanceled({
182190
identity: this.options.identity,
183191
namespace: this.options.namespace,
184192
taskToken: taskTokenOrFullActivityId,
185-
details: { payloads: await encodeToPayloads(this.dataConverter, details) },
193+
details: { payloads },
186194
});
187195
} else {
188196
await this.workflowService.respondActivityTaskCanceledById({
189197
identity: this.options.identity,
190198
namespace: this.options.namespace,
191199
...taskTokenOrFullActivityId,
192-
details: { payloads: await encodeToPayloads(this.dataConverter, details) },
200+
details: { payloads },
193201
});
194202
}
195203
} catch (err) {
@@ -207,36 +215,31 @@ export class AsyncCompletionClient extends BaseClient {
207215
heartbeat(fullActivityId: FullActivityId, details?: unknown): Promise<void>;
208216

209217
async heartbeat(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise<void> {
218+
const payloads = await encodeToPayloads(this.dataConverter, details);
210219
let cancelRequested = false;
211220
try {
212-
const response = await this._sendHeartbeat(taskTokenOrFullActivityId, details);
213-
cancelRequested = response.cancelRequested;
221+
let response: temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
222+
if (taskTokenOrFullActivityId instanceof Uint8Array) {
223+
response = await this.workflowService.recordActivityTaskHeartbeat({
224+
identity: this.options.identity,
225+
namespace: this.options.namespace,
226+
taskToken: taskTokenOrFullActivityId,
227+
details: { payloads },
228+
});
229+
} else {
230+
response = await this.workflowService.recordActivityTaskHeartbeatById({
231+
identity: this.options.identity,
232+
namespace: this.options.namespace,
233+
...taskTokenOrFullActivityId,
234+
details: { payloads },
235+
});
236+
}
237+
cancelRequested = !!response.cancelRequested;
214238
} catch (err) {
215239
this.handleError(err);
216240
}
217241
if (cancelRequested) {
218242
throw new ActivityCancelledError('cancelled');
219243
}
220244
}
221-
222-
private async _sendHeartbeat(
223-
taskTokenOrFullActivityId: Uint8Array | FullActivityId,
224-
details?: unknown
225-
): Promise<{ cancelRequested: boolean }> {
226-
if (taskTokenOrFullActivityId instanceof Uint8Array) {
227-
return await this.workflowService.recordActivityTaskHeartbeat({
228-
identity: this.options.identity,
229-
namespace: this.options.namespace,
230-
taskToken: taskTokenOrFullActivityId,
231-
details: { payloads: await encodeToPayloads(this.dataConverter, details) },
232-
});
233-
} else {
234-
return await this.workflowService.recordActivityTaskHeartbeatById({
235-
identity: this.options.identity,
236-
namespace: this.options.namespace,
237-
...taskTokenOrFullActivityId,
238-
details: { payloads: await encodeToPayloads(this.dataConverter, details) },
239-
});
240-
}
241-
}
242245
}

packages/client/src/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import * as grpc from '@grpc/grpc-js';
33
import type { RPCImpl } from 'protobufjs';
44
import { filterNullAndUndefined, normalizeTlsConfig, TLSConfig } from '@temporalio/common/lib/internal-non-workflow';
55
import { Duration, msOptionalToNumber } from '@temporalio/common/lib/time';
6-
import { isServerErrorResponse, ServiceError } from './errors';
6+
import { isGrpcServiceError, ServiceError } from './errors';
77
import { defaultGrpcRetryOptions, makeGrpcRetryInterceptor } from './grpc-retry';
88
import pkg from './pkg';
99
import { CallContext, HealthService, Metadata, OperatorService, WorkflowService } from './types';
@@ -276,7 +276,7 @@ export class Connection {
276276
try {
277277
await this.withDeadline(deadline, () => this.workflowService.getSystemInfo({}));
278278
} catch (err) {
279-
if (isServerErrorResponse(err)) {
279+
if (isGrpcServiceError(err)) {
280280
// Ignore old servers
281281
if (err.code !== grpc.status.UNIMPLEMENTED) {
282282
throw new ServiceError('Failed to connect to Temporal server', { cause: err });

packages/client/src/errors.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ServerErrorResponse } from '@grpc/grpc-js';
1+
import { ServiceError as GrpcServiceError } from '@grpc/grpc-js';
22
import { RetryState, TemporalFailure } from '@temporalio/common';
33

44
/**
@@ -47,10 +47,11 @@ export class WorkflowContinuedAsNewError extends Error {
4747
}
4848
}
4949

50+
export function isGrpcServiceError(err: unknown): err is GrpcServiceError {
51+
return err instanceof Error && (err as any).details !== undefined && (err as any).metadata !== undefined;
52+
}
53+
5054
/**
51-
* Type assertion helper, assertion is mostly empty because any additional
52-
* properties are optional.
55+
* @deprecated Use `isGrpcServiceError` instead
5356
*/
54-
export function isServerErrorResponse(err: unknown): err is ServerErrorResponse {
55-
return err instanceof Error;
56-
}
57+
export const isServerErrorResponse = isGrpcServiceError;

packages/client/src/helpers.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1+
import { ServiceError as GrpcServiceError } from '@grpc/grpc-js';
12
import {
23
LoadedDataConverter,
34
mapFromPayloads,
5+
NamespaceNotFoundError,
46
searchAttributePayloadConverter,
57
SearchAttributes,
68
} from '@temporalio/common';
79
import { Replace } from '@temporalio/common/lib/type-helpers';
810
import { optionalTsToDate, tsToDate } from '@temporalio/common/lib/time';
911
import { decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow/codec-helpers';
10-
import { temporal } from '@temporalio/proto';
12+
import { temporal, google } from '@temporalio/proto';
1113
import { RawWorkflowExecutionInfo, WorkflowExecutionInfo, WorkflowExecutionStatusName } from './types';
1214

1315
function workflowStatusCodeToName(code: temporal.api.enums.v1.WorkflowExecutionStatus): WorkflowExecutionStatusName {
@@ -75,3 +77,36 @@ export async function executionInfoFromRaw<T>(
7577
raw: rawDataToEmbed,
7678
};
7779
}
80+
81+
type ErrorDetailsName = `temporal.api.errordetails.v1.${keyof typeof temporal.api.errordetails.v1}`;
82+
83+
/**
84+
* If the error type can be determined based on embedded grpc error details,
85+
* then rethrow the appropriate TypeScript error. Otherwise do nothing.
86+
*
87+
* This function should be used before falling back to generic error handling
88+
* based on grpc error code. Very few error types are currently supported, but
89+
* this function will be expanded over time as more server error types are added.
90+
*/
91+
export function rethrowKnownErrorTypes(err: GrpcServiceError): void {
92+
// We really don't expect multiple error details, but this really is an array, so just in case...
93+
for (const entry of getGrpcStatusDetails(err) ?? []) {
94+
if (!entry.type_url || !entry.value) continue;
95+
const type = entry.type_url.replace(/^type.googleapis.com\//, '') as ErrorDetailsName;
96+
97+
switch (type) {
98+
case 'temporal.api.errordetails.v1.NamespaceNotFoundFailure': {
99+
const { namespace } = temporal.api.errordetails.v1.NamespaceNotFoundFailure.decode(entry.value);
100+
throw new NamespaceNotFoundError(namespace);
101+
}
102+
}
103+
}
104+
}
105+
106+
function getGrpcStatusDetails(err: GrpcServiceError): google.rpc.Status['details'] | undefined {
107+
const statusBuffer = err.metadata.get('grpc-status-details-bin')?.[0];
108+
if (!statusBuffer || typeof statusBuffer === 'string') {
109+
return undefined;
110+
}
111+
return google.rpc.Status.decode(statusBuffer).details;
112+
}

packages/client/src/schedule-client.ts

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { temporal } from '@temporalio/proto';
1111
import { optionalDateToTs, optionalTsToDate, optionalTsToMs, tsToDate } from '@temporalio/common/lib/time';
1212
import { CreateScheduleInput, CreateScheduleOutput, ScheduleClientInterceptor } from './interceptors';
1313
import { WorkflowService } from './types';
14-
import { isServerErrorResponse, ServiceError } from './errors';
14+
import { isGrpcServiceError, ServiceError } from './errors';
1515
import {
1616
Backfill,
1717
CompiledScheduleUpdateOptions,
@@ -45,6 +45,7 @@ import {
4545
LoadedWithDefaults,
4646
WithDefaults,
4747
} from './base-client';
48+
import { rethrowKnownErrorTypes } from './helpers';
4849

4950
/**
5051
* Handle to a single Schedule
@@ -263,7 +264,7 @@ export class ScheduleClient extends BaseClient {
263264
if (err.code === grpcStatus.ALREADY_EXISTS) {
264265
throw new ScheduleAlreadyRunning('Schedule already exists and is running', opts.scheduleId);
265266
}
266-
this.rethrowGrpcError(err, opts.scheduleId, 'Failed to create schedule');
267+
this.rethrowGrpcError(err, 'Failed to create schedule', opts.scheduleId);
267268
}
268269
}
269270

@@ -279,7 +280,7 @@ export class ScheduleClient extends BaseClient {
279280
scheduleId,
280281
});
281282
} catch (err: any) {
282-
this.rethrowGrpcError(err, scheduleId, 'Failed to describe schedule');
283+
this.rethrowGrpcError(err, 'Failed to describe schedule', scheduleId);
283284
}
284285
}
285286

@@ -306,7 +307,7 @@ export class ScheduleClient extends BaseClient {
306307
try {
307308
return await this.workflowService.updateSchedule(req);
308309
} catch (err: any) {
309-
this.rethrowGrpcError(err, scheduleId, 'Failed to update schedule');
310+
this.rethrowGrpcError(err, 'Failed to update schedule', scheduleId);
310311
}
311312
}
312313

@@ -326,7 +327,7 @@ export class ScheduleClient extends BaseClient {
326327
patch,
327328
});
328329
} catch (err: any) {
329-
this.rethrowGrpcError(err, scheduleId, 'Failed to patch schedule');
330+
this.rethrowGrpcError(err, 'Failed to patch schedule', scheduleId);
330331
}
331332
}
332333

@@ -343,7 +344,7 @@ export class ScheduleClient extends BaseClient {
343344
scheduleId,
344345
});
345346
} catch (err: any) {
346-
this.rethrowGrpcError(err, scheduleId, 'Failed to delete schedule');
347+
this.rethrowGrpcError(err, 'Failed to delete schedule', scheduleId);
347348
}
348349
}
349350

@@ -366,13 +367,16 @@ export class ScheduleClient extends BaseClient {
366367
public async *list(options?: ListScheduleOptions): AsyncIterable<ScheduleSummary> {
367368
let nextPageToken: Uint8Array | undefined = undefined;
368369
for (;;) {
369-
const response: temporal.api.workflowservice.v1.IListSchedulesResponse = await this.workflowService.listSchedules(
370-
{
370+
let response: temporal.api.workflowservice.v1.ListSchedulesResponse;
371+
try {
372+
response = await this.workflowService.listSchedules({
371373
nextPageToken,
372374
namespace: this.options.namespace,
373375
maximumPageSize: options?.pageSize,
374-
}
375-
);
376+
});
377+
} catch (e) {
378+
this.rethrowGrpcError(e, 'Failed to list schedules', undefined);
379+
}
376380

377381
for (const raw of response.schedules ?? []) {
378382
yield <ScheduleSummary>{
@@ -497,20 +501,23 @@ export class ScheduleClient extends BaseClient {
497501
};
498502
}
499503

500-
protected rethrowGrpcError(err: unknown, scheduleId: string, fallbackMessage: string): never {
501-
if (isServerErrorResponse(err)) {
504+
protected rethrowGrpcError(err: unknown, fallbackMessage: string, scheduleId?: string): never {
505+
if (isGrpcServiceError(err)) {
506+
rethrowKnownErrorTypes(err);
507+
502508
if (err.code === grpcStatus.NOT_FOUND) {
503-
throw new ScheduleNotFoundError(err.details ?? 'Schedule not found', scheduleId);
509+
throw new ScheduleNotFoundError(err.details ?? 'Schedule not found', scheduleId ?? '');
504510
}
505511
if (
506512
err.code === grpcStatus.INVALID_ARGUMENT &&
507513
err.message.match(/^3 INVALID_ARGUMENT: Invalid schedule spec: /)
508514
) {
509515
throw new TypeError(err.message.replace(/^3 INVALID_ARGUMENT: Invalid schedule spec: /, ''));
510516
}
517+
511518
throw new ServiceError(fallbackMessage, { cause: err });
512519
}
513-
throw new ServiceError('Unexpected error while making gRPC request');
520+
throw new ServiceError('Unexpected error while making gRPC request', { cause: err as Error });
514521
}
515522
}
516523

0 commit comments

Comments
 (0)