Skip to content

Commit ce0532f

Browse files
tharun208Tharun Rajendranmjameswh
authored
feat(workflow): add upsert memo command (#1321)
Co-authored-by: Tharun Rajendran <tharunrajendran@Tharuns-MacBook-Pro.local> Co-authored-by: James Watkins-Harvey <james.watkinsharvey@temporal.io>
1 parent 9d31dbd commit ce0532f

File tree

6 files changed

+184
-27
lines changed

6 files changed

+184
-27
lines changed

packages/test/src/helpers.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import ava, { TestFn } from 'ava';
55
import * as grpc from '@grpc/grpc-js';
66
import asyncRetry from 'async-retry';
77
import { v4 as uuid4 } from 'uuid';
8-
import { inWorkflowContext } from '@temporalio/workflow';
8+
import { inWorkflowContext, WorkflowInfo } from '@temporalio/workflow';
99
import { Payload, PayloadCodec } from '@temporalio/common';
1010
import { Worker as RealWorker, WorkerOptions } from '@temporalio/worker';
1111
import * as worker from '@temporalio/worker';
@@ -16,6 +16,7 @@ import {
1616
TestWorkflowEnvironment as RealTestWorkflowEnvironment,
1717
TimeSkippingTestWorkflowEnvironmentOptions,
1818
} from '@temporalio/testing';
19+
import { LoggerSinksInternal as DefaultLoggerSinks } from '@temporalio/workflow/lib/logs';
1920

2021
export function u8(s: string): Uint8Array {
2122
// TextEncoder requires lib "dom"
@@ -260,3 +261,18 @@ export async function getRandomPort(fn = (_port: number) => Promise.resolve()):
260261
});
261262
});
262263
}
264+
265+
export function asSdkLoggerSink(
266+
fn: (info: WorkflowInfo, message: string, attrs?: Record<string, unknown>) => Promise<void>,
267+
opts?: Omit<worker.InjectedSinkFunction<any>, 'fn'>
268+
): worker.InjectedSinks<DefaultLoggerSinks> {
269+
return {
270+
__temporal_logger: {
271+
trace: { fn, ...opts },
272+
debug: { fn, ...opts },
273+
info: { fn, ...opts },
274+
warn: { fn, ...opts },
275+
error: { fn, ...opts },
276+
},
277+
};
278+
}

packages/test/src/test-integration-workflows.ts

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { activityStartedSignal } from './workflows/definitions';
1515
import * as workflows from './workflows';
1616
import { Context, helpers, makeTestFunction } from './helpers-integration';
1717
import { overrideSdkInternalFlag } from './mock-internal-flags';
18-
import { RUN_TIME_SKIPPING_TESTS } from './helpers';
18+
import { RUN_TIME_SKIPPING_TESTS, asSdkLoggerSink } from './helpers';
1919

2020
const test = makeTestFunction({ workflowsPath: __filename, workflowInterceptorModules: [__filename] });
2121

@@ -947,6 +947,95 @@ if (RUN_TIME_SKIPPING_TESTS) {
947947
});
948948
}
949949

950+
export async function upsertAndReadMemo(memo: Record<string, unknown>): Promise<Record<string, unknown> | undefined> {
951+
workflow.upsertMemo(memo);
952+
return workflow.workflowInfo().memo;
953+
}
954+
955+
test('Workflow can upsert memo', async (t) => {
956+
const { createWorker, startWorkflow } = helpers(t);
957+
const worker = await createWorker();
958+
await worker.runUntil(async () => {
959+
const handle = await startWorkflow(upsertAndReadMemo, {
960+
memo: {
961+
alpha: 'bar1',
962+
bravo: 'bar3',
963+
charlie: { delta: 'bar2', echo: 12 },
964+
foxtrot: 'bar4',
965+
},
966+
args: [
967+
{
968+
alpha: 'bar11',
969+
bravo: null,
970+
charlie: { echo: 34, golf: 'bar5' },
971+
hotel: 'bar6',
972+
},
973+
],
974+
});
975+
const result = await handle.result();
976+
t.deepEqual(result, {
977+
alpha: 'bar11',
978+
charlie: { echo: 34, golf: 'bar5' },
979+
foxtrot: 'bar4',
980+
hotel: 'bar6',
981+
});
982+
const { memo } = await handle.describe();
983+
t.deepEqual(memo, {
984+
alpha: 'bar11',
985+
charlie: { echo: 34, golf: 'bar5' },
986+
foxtrot: 'bar4',
987+
hotel: 'bar6',
988+
});
989+
});
990+
});
991+
992+
test('Sink functions contains upserted memo', async (t) => {
993+
const { createWorker, executeWorkflow } = helpers(t);
994+
const recordedMessages = Array<{ message: string; memo: Record<string, unknown> | undefined }>();
995+
const sinks = asSdkLoggerSink(async (info, message, _attrs) => {
996+
recordedMessages.push({
997+
message,
998+
memo: info.memo,
999+
});
1000+
});
1001+
const worker = await createWorker({ sinks });
1002+
await worker.runUntil(async () => {
1003+
await executeWorkflow(upsertAndReadMemo, {
1004+
memo: {
1005+
note1: 'aaa',
1006+
note2: 'bbb',
1007+
note4: 'eee',
1008+
},
1009+
args: [
1010+
{
1011+
note2: 'ccc',
1012+
note3: 'ddd',
1013+
note4: null,
1014+
},
1015+
],
1016+
});
1017+
});
1018+
1019+
t.deepEqual(recordedMessages, [
1020+
{
1021+
message: 'Workflow started',
1022+
memo: {
1023+
note1: 'aaa',
1024+
note2: 'bbb',
1025+
note4: 'eee',
1026+
},
1027+
},
1028+
{
1029+
message: 'Workflow completed',
1030+
memo: {
1031+
note1: 'aaa',
1032+
note2: 'ccc',
1033+
note3: 'ddd',
1034+
},
1035+
},
1036+
]);
1037+
});
1038+
9501039
export const interceptors: workflow.WorkflowInterceptorsFactory = () => {
9511040
const interceptorsFactoryFunc = module.exports[`${workflow.workflowInfo().workflowType}Interceptors`];
9521041
if (typeof interceptorsFactoryFunc === 'function') {

packages/test/src/test-sinks.ts

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,11 @@
22
import test from 'ava';
33
import { v4 as uuid4 } from 'uuid';
44
import { Connection, WorkflowClient } from '@temporalio/client';
5-
import {
6-
DefaultLogger,
7-
InjectedSinks,
8-
Runtime,
9-
InjectedSinkFunction,
10-
WorkerOptions,
11-
LogEntry,
12-
} from '@temporalio/worker';
13-
import { LoggerSinksInternal as DefaultLoggerSinks } from '@temporalio/workflow/lib/logs';
5+
import { DefaultLogger, InjectedSinks, Runtime, WorkerOptions, LogEntry } from '@temporalio/worker';
146
import { SearchAttributes, WorkflowInfo } from '@temporalio/workflow';
157
import { UnsafeWorkflowInfo } from '@temporalio/workflow/src/interfaces';
168
import { SdkComponent } from '@temporalio/common';
17-
import { RUN_INTEGRATION_TESTS, Worker, registerDefaultCustomSearchAttributes } from './helpers';
9+
import { RUN_INTEGRATION_TESTS, Worker, asSdkLoggerSink, registerDefaultCustomSearchAttributes } from './helpers';
1810
import { defaultOptions } from './mock-native-worker';
1911
import * as workflows from './workflows';
2012

@@ -27,21 +19,6 @@ class DependencyError extends Error {
2719
}
2820
}
2921

30-
function asSdkLoggerSink(
31-
fn: (info: WorkflowInfo, message: string, attrs?: Record<string, unknown>) => Promise<void>,
32-
opts?: Omit<InjectedSinkFunction<any>, 'fn'>
33-
): InjectedSinks<DefaultLoggerSinks> {
34-
return {
35-
__temporal_logger: {
36-
trace: { fn, ...opts },
37-
debug: { fn, ...opts },
38-
info: { fn, ...opts },
39-
warn: { fn, ...opts },
40-
error: { fn, ...opts },
41-
},
42-
};
43-
}
44-
4522
if (RUN_INTEGRATION_TESTS) {
4623
const recordedLogs: { [workflowId: string]: LogEntry[] } = {};
4724

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,4 @@ export * from './upsert-and-read-search-attributes';
9696
export * from './url-whatwg';
9797
export * from './wait-on-user';
9898
export * from './workflow-cancellation-scenarios';
99+
export * from './upsert-and-read-memo';
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { upsertMemo, workflowInfo } from '@temporalio/workflow';
2+
3+
export async function upsertAndReadMemo(memo: Record<string, unknown>): Promise<Record<string, unknown> | undefined> {
4+
upsertMemo(memo);
5+
return workflowInfo().memo;
6+
}

packages/workflow/src/workflow.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,6 +1366,74 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void
13661366
});
13671367
}
13681368

1369+
/**
1370+
* Updates this Workflow's Memos by merging the provided `memo` with existing
1371+
* Memos (as returned by `workflowInfo().memo`).
1372+
*
1373+
* New memo is merged by replacing properties of the same name _at the first
1374+
* level only_. Setting a property to value `undefined` or `null` clears that
1375+
* key from the Memo.
1376+
*
1377+
* For example:
1378+
*
1379+
* ```ts
1380+
* upsertMemo({
1381+
* key1: value,
1382+
* key3: { subkey1: value }
1383+
* key4: value,
1384+
* });
1385+
* upsertMemo({
1386+
* key2: value
1387+
* key3: { subkey2: value }
1388+
* key4: undefined,
1389+
* });
1390+
* ```
1391+
*
1392+
* would result in the Workflow having these Memo:
1393+
*
1394+
* ```ts
1395+
* {
1396+
* key1: value,
1397+
* key2: value,
1398+
* key3: { subkey2: value } // Note this object was completely replaced
1399+
* // Note that key4 was completely removed
1400+
* }
1401+
* ```
1402+
*
1403+
* @param memo The Record to merge.
1404+
*/
1405+
export function upsertMemo(memo: Record<string, unknown>): void {
1406+
const activator = assertInWorkflowContext('Workflow.upsertMemo(...) may only be used from a Workflow Execution.');
1407+
1408+
if (memo == null) {
1409+
throw new Error('memo must be a non-null Record');
1410+
}
1411+
1412+
activator.pushCommand({
1413+
modifyWorkflowProperties: {
1414+
upsertedMemo: {
1415+
fields: mapToPayloads(
1416+
activator.payloadConverter,
1417+
// Convert null to undefined
1418+
Object.fromEntries(Object.entries(memo).map(([k, v]) => [k, v ?? undefined]))
1419+
),
1420+
},
1421+
},
1422+
});
1423+
1424+
activator.mutateWorkflowInfo((info: WorkflowInfo): WorkflowInfo => {
1425+
return {
1426+
...info,
1427+
memo: Object.fromEntries(
1428+
Object.entries({
1429+
...info.memo,
1430+
...memo,
1431+
}).filter(([_, v]) => v != null)
1432+
),
1433+
};
1434+
});
1435+
}
1436+
13691437
export const stackTraceQuery = defineQuery<string>('__stack_trace');
13701438
export const enhancedStackTraceQuery = defineQuery<EnhancedStackTrace>('__enhanced_stack_trace');
13711439
export const workflowMetadataQuery = defineQuery<temporal.api.sdk.v1.IWorkflowMetadata>('__temporal_workflow_metadata');

0 commit comments

Comments
 (0)