Skip to content

Commit 0490e7b

Browse files
committed
Use the Nexus SDK ServiceRegistry
1 parent 01502c7 commit 0490e7b

File tree

3 files changed

+48
-76
lines changed

3 files changed

+48
-76
lines changed

packages/worker/src/nexus.ts

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import {
1010
IllegalStateError,
1111
LoadedDataConverter,
1212
Payload,
13+
PayloadConverter,
1314
SdkComponent,
1415
} from '@temporalio/common';
1516
import { temporal, coresdk } from '@temporalio/proto';
1617
import { HandlerContext } from '@temporalio/nexus/lib/context';
17-
import { decodeFromPayload, encodeToPayload, encodeErrorToFailure } from '@temporalio/common/lib/internal-non-workflow';
18+
import { encodeToPayload, encodeErrorToFailure, decodeOptionalSingle } from '@temporalio/common/lib/internal-non-workflow';
1819
import { fixBuffers } from '@temporalio/common/lib/proto-utils';
1920
import { isAbortError } from '@temporalio/common/lib/type-helpers';
2021
import { isGrpcServiceError, ServiceError } from '@temporalio/client';
@@ -76,10 +77,7 @@ export class NexusHandler {
7677
public readonly taskToken: Uint8Array,
7778
public readonly info: nexus.HandlerInfo,
7879
public readonly abortController: AbortController,
79-
public readonly handler:
80-
| nexus.OperationHandler<unknown, unknown>
81-
| nexus.SyncOperationHandler<unknown, unknown>
82-
| undefined,
80+
public readonly serviceRegistry: nexus.ServiceRegistry,
8381
public readonly dataConverter: LoadedDataConverter,
8482
/**
8583
* Logger bound to `sdkComponent: worker`, with metadata from this Nexus task.
@@ -118,24 +116,15 @@ export class NexusHandler {
118116
options: nexus.StartOperationOptions
119117
): Promise<coresdk.nexus.INexusTaskCompletion> {
120118
try {
121-
const input = await decodeFromPayload(this.dataConverter, payload);
122-
if (typeof this.handler === 'function') {
123-
const handler = this.handler as nexus.SyncOperationHandler<unknown, unknown>;
124-
const output = await this.invokeUserCode('startOperation', handler.bind(undefined, input, options));
125-
return {
126-
taskToken: this.taskToken,
127-
completed: {
128-
startOperation: {
129-
syncSuccess: {
130-
payload: await encodeToPayload(this.dataConverter, output),
131-
links: nexus.handlerLinks().map(nexusLinkToProtoLink),
132-
},
133-
},
134-
},
135-
};
136-
}
137-
const handler = this.handler as nexus.OperationHandler<unknown, unknown>;
138-
const result = await this.invokeUserCode('startOperation', handler.start.bind(handler, input, options));
119+
const decoded = await decodeOptionalSingle(this.dataConverter.payloadCodecs, payload);
120+
// Nexus headers have string values and Temporal Payloads have binary values. Instead of converting Payload
121+
// instances into Content instances, we embed the Payload in the serializer and pretend we are deserializing an
122+
// empty Content.
123+
const input = new nexus.LazyValue(
124+
new PayloadSerializer(this.dataConverter.payloadConverter, decoded ?? undefined),
125+
{},
126+
);
127+
const result = await this.invokeUserCode('startOperation', this.serviceRegistry.start.bind(this.serviceRegistry, this.info.service, this.info.operation, input, options));
139128
if (isAsyncResult(result)) {
140129
return {
141130
taskToken: this.taskToken,
@@ -191,14 +180,7 @@ export class NexusHandler {
191180
options: nexus.CancelOperationOptions
192181
): Promise<coresdk.nexus.INexusTaskCompletion> {
193182
try {
194-
if (typeof this.handler === 'function') {
195-
throw new nexus.HandlerError({
196-
type: 'NOT_IMPLEMENTED',
197-
message: 'cancel not implemented for this operation',
198-
});
199-
}
200-
const handler = this.handler as nexus.OperationHandler<unknown, unknown>;
201-
await this.invokeUserCode('cancelOperation', handler.cancel.bind(handler, token, options));
183+
await this.invokeUserCode('cancelOperation', this.serviceRegistry.cancel.bind(this.serviceRegistry, this.info.service, this.info.operation, token, options));
202184
return {
203185
taskToken: this.taskToken,
204186
completed: {
@@ -358,11 +340,11 @@ function convertKnownErrors(err: unknown): nexus.HandlerError {
358340
case (status.ABORTED, status.UNAVAILABLE):
359341
return new nexus.HandlerError({ type: 'UNAVAILABLE', cause: err });
360342
case (status.CANCELLED,
361-
status.DATA_LOSS,
362-
status.INTERNAL,
363-
status.UNKNOWN,
364-
status.UNAUTHENTICATED,
365-
status.PERMISSION_DENIED):
343+
status.DATA_LOSS,
344+
status.INTERNAL,
345+
status.UNKNOWN,
346+
status.UNAUTHENTICATED,
347+
status.PERMISSION_DENIED):
366348
// Note that UNAUTHENTICATED and PERMISSION_DENIED have Nexus error types but we convert to internal because
367349
// this is not a client auth error and happens when the handler fails to auth with Temporal and should be
368350
// considered retryable.
@@ -431,3 +413,25 @@ function nexusLinkToProtoLink(nlink: nexus.Link): temporal.api.nexus.v1.ILink {
431413
type: nlink.type,
432414
};
433415
}
416+
417+
/**
418+
* An adapter from a Temporal PayloadConverer and a Nexus Serializer.
419+
*/
420+
class PayloadSerializer implements nexus.Serializer {
421+
constructor(
422+
readonly payloadConverter: PayloadConverter,
423+
readonly payload?: Payload,
424+
) { }
425+
426+
deserialize<T>(): T {
427+
if (this.payload == null) {
428+
return undefined as T;
429+
}
430+
return this.payloadConverter.fromPayload(this.payload);
431+
}
432+
433+
/** Not used in this path */
434+
serialize(): nexus.Content {
435+
throw new Error("not implemented");
436+
}
437+
}

packages/worker/src/worker-options.ts

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,7 @@ export interface CompiledWorkerOptions
794794
defaultHeartbeatThrottleIntervalMs: number;
795795
loadedDataConverter: LoadedDataConverter;
796796
activities: Map<string, ActivityFunction>;
797-
nexusServices: Map<string, Map<string, nexus.OperationHandler<any, any> | nexus.SyncOperationHandler<any, any>>>;
797+
nexusServiceRegistry?: nexus.ServiceRegistry;
798798
tuner: native.WorkerTunerOptions;
799799
}
800800

@@ -968,41 +968,17 @@ export function compileWorkerOptions(rawOpts: WorkerOptions, logger: Logger): Co
968968
defaultHeartbeatThrottleIntervalMs: msToNumber(opts.defaultHeartbeatThrottleInterval),
969969
loadedDataConverter: loadDataConverter(opts.dataConverter),
970970
activities,
971-
nexusServices: compileNexusServices(opts),
971+
nexusServiceRegistry: nexusServiceRegistryFromOptions(opts),
972972
enableNonLocalActivities: opts.enableNonLocalActivities && activities.size > 0,
973973
tuner,
974974
};
975975
}
976976

977-
function compileNexusServices(opts: WorkerOptions) {
978-
const nexusServices = new Map<
979-
string,
980-
Map<string, nexus.OperationHandler<any, any> | nexus.SyncOperationHandler<any, any>>
981-
>();
982-
for (const s of opts.nexusServices ?? []) {
983-
if (!s.name) {
984-
throw new TypeError('Tried to register a Nexus service with no name');
985-
}
986-
if (nexusServices.has(s.name)) {
987-
throw new TypeError(`Duplicate registration of nexus service ${s.name}`);
988-
}
989-
const ops = new Map<string, nexus.OperationHandler<any, any> | nexus.SyncOperationHandler<any, any>>();
990-
for (const [k, op] of Object.entries(s.operations)) {
991-
if (!op.name) {
992-
throw new TypeError(`Tried to register a Nexus operation with no name for service ${s.name} with key ${k}`);
993-
}
994-
if (ops.has(op.name)) {
995-
throw new TypeError(`Operation with name ${op.name} already registered for service ${s.name}`);
996-
}
997-
const handler = s.handlers[k];
998-
if (!handler) {
999-
throw new TypeError(`No handler registred for ${k} on service ${s.name}`);
1000-
}
1001-
ops.set(op.name, handler);
1002-
}
1003-
nexusServices.set(s.name, ops);
977+
function nexusServiceRegistryFromOptions(opts: WorkerOptions): nexus.ServiceRegistry | undefined {
978+
if (opts.nexusServices == null || opts.nexusServices.length === 0) {
979+
return undefined;
1004980
}
1005-
return nexusServices;
981+
return new nexus.ServiceRegistry(opts.nexusServices);
1006982
}
1007983

1008984
export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions {

packages/worker/src/worker.ts

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,19 +1167,11 @@ export class Worker {
11671167

11681168
info = constructNexusHandlerInfo(task.request, ctrl.signal);
11691169

1170-
const service = this.options.nexusServices.get(info.service);
1171-
const handler = service?.get(info.operation);
1172-
if (typeof handler !== 'function' && typeof handler !== 'object') {
1173-
throw new nexus.HandlerError({
1174-
type: 'NOT_FOUND',
1175-
message: `Nexus handler not registered for service: ${info.service}, operation: ${info.operation}`,
1176-
});
1177-
}
11781170
const nexusHandler = new NexusHandler(
11791171
taskToken,
11801172
info,
11811173
ctrl,
1182-
handler,
1174+
this.options.nexusServiceRegistry!, // Must be defined if we are handling Nexus tasks.
11831175
this.options.loadedDataConverter,
11841176
this.logger
11851177
);
@@ -1791,7 +1783,7 @@ export class Worker {
17911783

17921784
protected nexus$(): Observable<void> {
17931785
// This Worker did not register any nexus services, return early.
1794-
if (this.options.nexusServices?.size === 0) {
1786+
if (this.options.nexusServiceRegistry == null) {
17951787
if (!this.isReplayWorker) this.logger.info('No nexus services registered, not polling for nexus tasks');
17961788
this.nexusPollerStateSubject.next('SHUTDOWN');
17971789
return EMPTY;

0 commit comments

Comments
 (0)