Skip to content

Commit d29dee1

Browse files
committed
getClient() and fix versioning strategy
1 parent 5e4a5c3 commit d29dee1

File tree

7 files changed

+105
-33
lines changed

7 files changed

+105
-33
lines changed

packages/common/src/converter/failure-converter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ const CUTOFF_STACK_PATTERNS = combineRegExp(
5858
/** Activity execution */
5959
/\s+at Activity\.execute \(.*[\\/]worker[\\/](?:src|lib)[\\/]activity\.[jt]s:\d+:\d+\)/,
6060
/** Nexus execution */
61-
/\s+at NexusHandler\.invokeUserCode \(.*[\\/]worker[\\/](?:src|lib)[\\/]nexus\.[jt]s:\d+:\d+\)/,
61+
/\s+at( async)? NexusHandler\.invokeUserCode \(.*[\\/]worker[\\/](?:src|lib)[\\/]nexus\/index\.[jt]s:\d+:\d+\)/,
6262
/** Workflow activation */
6363
/\s+at Activator\.\S+NextHandler \(.*[\\/]workflow[\\/](?:src|lib)[\\/]internals\.[jt]s:\d+:\d+\)/,
6464
/** Workflow run anything in context */

packages/core-bridge/src/worker.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ mod config {
465465

466466
use temporal_sdk_core::{
467467
api::worker::{
468-
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError, WorkflowSlotKind
468+
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError, WorkerVersioningStrategy, WorkflowSlotKind
469469
}, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions, SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder
470470
};
471471

@@ -510,6 +510,7 @@ mod config {
510510
// Set all other options
511511
let mut builder = WorkerConfigBuilder::default();
512512
builder
513+
.versioning_strategy(WorkerVersioningStrategy::None{build_id: "".to_owned()})
513514
.client_identity_override(Some(self.identity))
514515
.task_queue(self.task_queue)
515516
.namespace(self.namespace)

packages/nexus/src/context.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
import { HandlerContext as BaseHandlerContext, getHandlerContext } from 'nexus-rpc/lib/handler';
22
import { Logger, LogLevel, LogMetadata } from '@temporalio/common';
3+
import { Client } from '@temporalio/client';
34

45
export interface HandlerContext extends BaseHandlerContext {
56
log: Logger;
7+
client: Client;
68
}
79

810
function getLogger() {
9-
const ctx = getHandlerContext<HandlerContext>();
10-
if (ctx == null) {
11-
throw new ReferenceError('HandlerContext uninitialized');
12-
}
13-
return ctx.log;
11+
return getHandlerContext<HandlerContext>().log;
1412
}
1513

1614
/**
@@ -36,3 +34,13 @@ export const log: Logger = {
3634
return getLogger().error(message, meta);
3735
},
3836
};
37+
38+
/**
39+
* Returns a client to be used in a Nexus Operation's context, this Client is powered by the same Connection that the
40+
* worker was created with.
41+
*/
42+
export function getClient(): Client {
43+
return getHandlerContext<HandlerContext>().client;
44+
}
45+
46+
// TODO: also support getting a metrics handler.

packages/nexus/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
export { log } from './context';
1+
export { log, getClient } from './context';

packages/test/src/test-nexus-handler.ts

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// TODO: Remove /Users/bergundy/... for stack traces, requires publishing nexus-rpc.
2+
13
import { randomUUID } from 'node:crypto';
24
import anyTest, { TestFn } from 'ava';
35
import getPort from 'get-port';
@@ -313,7 +315,8 @@ test('start operation handler errors', async (t) => {
313315
cleanStackTrace(err.stack!),
314316
`ApplicationFailure: deliberate failure
315317
at Function.create (common/src/failure.ts)
316-
at op (test/src/test-nexus-handler.ts)`
318+
at op (test/src/test-nexus-handler.ts)
319+
at ServiceRegistry.start (/Users/bergundy/temporal/nexus-sdk-typescript/src/operation.ts)`
317320
);
318321
t.deepEqual((err as ApplicationFailure).details, ['details']);
319322
t.is((err as ApplicationFailure).failure?.source, 'TypeScriptSDK');
@@ -338,7 +341,8 @@ test('start operation handler errors', async (t) => {
338341
t.deepEqual(
339342
cleanStackTrace(err.stack!),
340343
`HandlerError: deliberate error
341-
at op (test/src/test-nexus-handler.ts)`
344+
at op (test/src/test-nexus-handler.ts)
345+
at ServiceRegistry.start (/Users/bergundy/temporal/nexus-sdk-typescript/src/operation.ts)`
342346
);
343347
}
344348
{
@@ -361,7 +365,8 @@ test('start operation handler errors', async (t) => {
361365
t.deepEqual(
362366
cleanStackTrace(err.stack!),
363367
`OperationError: deliberate error
364-
at op (test/src/test-nexus-handler.ts)`
368+
at op (test/src/test-nexus-handler.ts)
369+
at ServiceRegistry.start (/Users/bergundy/temporal/nexus-sdk-typescript/src/operation.ts)`
365370
);
366371
}
367372
});
@@ -447,7 +452,8 @@ test('cancel operation handler errors', async (t) => {
447452
cleanStackTrace(err.stack!),
448453
`ApplicationFailure: deliberate failure
449454
at Function.create (common/src/failure.ts)
450-
at Object.cancel (test/src/test-nexus-handler.ts)`
455+
at Object.cancel (test/src/test-nexus-handler.ts)
456+
at ServiceRegistry.cancel (/Users/bergundy/temporal/nexus-sdk-typescript/src/operation.ts)`
451457
);
452458
t.deepEqual((err as ApplicationFailure).details, ['details']);
453459
t.is((err as ApplicationFailure).failure?.source, 'TypeScriptSDK');
@@ -475,7 +481,8 @@ test('cancel operation handler errors', async (t) => {
475481
t.deepEqual(
476482
cleanStackTrace(err.stack!),
477483
`HandlerError: deliberate error
478-
at Object.cancel (test/src/test-nexus-handler.ts)`
484+
at Object.cancel (test/src/test-nexus-handler.ts)
485+
at ServiceRegistry.cancel (/Users/bergundy/temporal/nexus-sdk-typescript/src/operation.ts)`
479486
);
480487
}
481488
});
@@ -528,3 +535,38 @@ test('logger is available in handler context', async (t) => {
528535
input: 'hello',
529536
});
530537
});
538+
539+
test('getClient is available in handler context', async (t) => {
540+
const { env, taskQueue, httpPort, endpointId } = t.context;
541+
542+
const w = await Worker.create({
543+
connection: env.nativeConnection,
544+
namespace: env.namespace,
545+
taskQueue,
546+
nexusServices: [
547+
nexus.serviceHandler(
548+
nexus.service('testService', {
549+
testSyncOp: nexus.operation<void, boolean>(),
550+
}),
551+
{
552+
async testSyncOp() {
553+
const systemInfo = await temporalnexus.getClient().connection.workflowService.getSystemInfo({ namespace: 'default' });
554+
return systemInfo.capabilities?.nexus ?? false;
555+
},
556+
}
557+
),
558+
],
559+
});
560+
561+
await w.runUntil(async () => {
562+
const res = await fetch(
563+
`http://localhost:${httpPort}/nexus/endpoints/${endpointId}/services/testService/testSyncOp`,
564+
{
565+
method: 'POST',
566+
}
567+
);
568+
t.true(res.ok);
569+
const output = await res.json();
570+
t.is(output, true);
571+
});
572+
});

packages/worker/src/nexus/index.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
} from '@temporalio/common/lib/internal-non-workflow';
2323
import { fixBuffers } from '@temporalio/common/lib/proto-utils';
2424
import { isAbortError } from '@temporalio/common/lib/type-helpers';
25-
import { isGrpcServiceError, ServiceError } from '@temporalio/client';
25+
import { Client, isGrpcServiceError, ServiceError } from '@temporalio/client';
2626
import { Logger, withMetadata } from '../logger';
2727

2828
const UNINITIALIZED = Symbol();
@@ -80,6 +80,7 @@ export class NexusHandler {
8080
constructor(
8181
public readonly taskToken: Uint8Array,
8282
public readonly info: nexus.HandlerInfo,
83+
public readonly client: Client,
8384
public readonly abortController: AbortController,
8485
public readonly serviceRegistry: nexus.ServiceRegistry,
8586
public readonly dataConverter: LoadedDataConverter,
@@ -276,10 +277,17 @@ export class NexusHandler {
276277
): Promise<coresdk.nexus.INexusTaskCompletion> {
277278
const context: HandlerContext = {
278279
info: this.info,
280+
client: this.client,
279281
links: [],
280282
log: withMetadata(this.workerLogger, { sdkComponent: SdkComponent.nexus }),
281283
};
282-
return await withContext(context, this.execute.bind(this, task));
284+
let execute = this.execute.bind(this, task);
285+
// Ensure that client calls made with the worker's client in this handler's context are tied to the abort signal.
286+
// TODO: Actually support canceling requests backed by NativeConnection. Once it does, this functionality should be
287+
// tested.
288+
// TS can't infer this and typing out the types is redundant and hard to read.
289+
execute = this.client.withAbortSignal.bind(this.client, this.info.abortSignal, execute) as any;
290+
return await withContext(context, execute);
283291
}
284292
}
285293

packages/worker/src/worker.ts

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ import {
5656
import { errorMessage, NonNullableObject, OmitFirstParam } from '@temporalio/common/lib/type-helpers';
5757
import { workflowLogAttributes } from '@temporalio/workflow/lib/logs';
5858
import { native } from '@temporalio/core-bridge';
59+
import { Client } from '@temporalio/client';
5960
import { coresdk, temporal } from '@temporalio/proto';
6061
import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow';
6162
import { Activity, CancelReason, activityLogAttributes } from './activity';
@@ -455,6 +456,8 @@ export class Worker {
455456
*/
456457
protected hasOutstandingNexusPoll = false;
457458

459+
private client?: Client;
460+
458461
protected readonly numInFlightActivationsSubject = new BehaviorSubject<number>(0);
459462
protected readonly numInFlightActivitiesSubject = new BehaviorSubject<number>(0);
460463
protected readonly numInFlightNonLocalActivitiesSubject = new BehaviorSubject<number>(0);
@@ -492,11 +495,11 @@ export class Worker {
492495
...compiledOptions,
493496
...(compiledOptions.workflowBundle && isCodeBundleOption(compiledOptions.workflowBundle)
494497
? {
495-
// Avoid dumping workflow bundle code to the console
496-
workflowBundle: <WorkflowBundle>{
497-
code: `<string of length ${compiledOptions.workflowBundle.code.length}>`,
498-
},
499-
}
498+
// Avoid dumping workflow bundle code to the console
499+
workflowBundle: <WorkflowBundle>{
500+
code: `<string of length ${compiledOptions.workflowBundle.code.length}>`,
501+
},
502+
}
500503
: {}),
501504
},
502505
});
@@ -710,7 +713,7 @@ export class Worker {
710713
) {
711714
logger.warn(
712715
'Ignoring WorkerOptions.interceptors.workflowModules because WorkerOptions.workflowBundle is set.\n' +
713-
'To use workflow interceptors with a workflowBundle, pass them in the call to bundleWorkflowCode.'
716+
'To use workflow interceptors with a workflowBundle, pass them in the call to bundleWorkflowCode.'
714717
);
715718
}
716719

@@ -756,6 +759,15 @@ export class Worker {
756759
protected readonly isReplayWorker: boolean = false
757760
) {
758761
this.workflowCodecRunner = new WorkflowCodecRunner(options.loadedDataConverter.payloadCodecs);
762+
if (connection !== null) {
763+
this.client = new Client({
764+
namespace: options.namespace,
765+
connection,
766+
identity: options.identity,
767+
dataConverter: options.dataConverter,
768+
// TODO: support interceptors.
769+
});
770+
}
759771
}
760772

761773
/**
@@ -883,8 +895,8 @@ export class Worker {
883895
*/
884896
protected pollLoop$<T>(pollFn: () => Promise<T>): Observable<T> {
885897
return from(
886-
(async function* () {
887-
for (;;) {
898+
(async function*() {
899+
for (; ;) {
888900
try {
889901
yield await pollFn();
890902
} catch (err) {
@@ -919,14 +931,14 @@ export class Worker {
919931
// so it can be cancelled if requested
920932
let output:
921933
| {
922-
type: 'result';
923-
result: coresdk.activity_result.IActivityExecutionResult;
924-
}
934+
type: 'result';
935+
result: coresdk.activity_result.IActivityExecutionResult;
936+
}
925937
| {
926-
type: 'run';
927-
activity: Activity;
928-
input: ActivityExecuteInput;
929-
}
938+
type: 'run';
939+
activity: Activity;
940+
input: ActivityExecuteInput;
941+
}
930942
| { type: 'ignore' };
931943
switch (variant) {
932944
case 'start': {
@@ -1176,6 +1188,7 @@ export class Worker {
11761188
const nexusHandler = new NexusHandler(
11771189
taskToken,
11781190
info,
1191+
this.client!, // Must be defined if we are handling Nexus tasks.
11791192
ctrl,
11801193
this.options.nexusServiceRegistry!, // Must be defined if we are handling Nexus tasks.
11811194
this.options.loadedDataConverter,
@@ -1284,9 +1297,9 @@ export class Worker {
12841297
const completion = synthetic
12851298
? undefined
12861299
: coresdk.workflow_completion.WorkflowActivationCompletion.encodeDelimited({
1287-
runId: activation.runId,
1288-
successful: {},
1289-
}).finish();
1300+
runId: activation.runId,
1301+
successful: {},
1302+
}).finish();
12901303
return { state: undefined, output: { close, completion } };
12911304
}
12921305

0 commit comments

Comments
 (0)