Skip to content

Commit 36c4f5d

Browse files
Fix ProtobufJSONPayloadConverter to not require Buffer in the workflo… (#1170)
## What was changed <!-- Describe what has changed in this PR --> This is a fix to #1169 The ProtobufJsonPayloadConverter ensures that Buffer is globally available, and if it is not, it just adds it when needed, but leaving the global space untouched after that. It also needs to ensure that no Buffer objects are leaked, replacing them with Uint8Array objects. ## Why? Protobufs containing binary fields cannot be properly encoded/decoded with this converter without the Buffer class, and this class is not available by default in the workflow context. ## Checklist 1. Closes <!-- add issue number here --> #1169 2. How was this tested: <!--- Please describe how you tested your changes/how we can test them --> Added a system test
1 parent b6e8397 commit 36c4f5d

File tree

3 files changed

+97
-9
lines changed

3 files changed

+97
-9
lines changed

packages/common/src/converter/protobuf-payload-converters.ts

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import {
1414

1515
import { encodingTypes, METADATA_ENCODING_KEY, METADATA_MESSAGE_TYPE_KEY } from './types';
1616

17+
const GLOBAL_BUFFER = globalThis.constructor.constructor('return globalThis.Buffer')();
18+
1719
abstract class ProtobufPayloadConverter implements PayloadConverterWithEncoding {
1820
protected readonly root: Root | undefined;
1921
public abstract encodingType: string;
@@ -121,17 +123,69 @@ export class ProtobufJsonPayloadConverter extends ProtobufPayloadConverter {
121123
return undefined;
122124
}
123125

124-
const jsonValue = protoJsonSerializer.toProto3JSON(value);
125-
126-
return this.constructPayload({
127-
messageTypeName: getNamespacedTypeName(value.$type),
128-
message: encode(JSON.stringify(jsonValue)),
129-
});
126+
const hasBufferChanged = setBufferInGlobal();
127+
try {
128+
const jsonValue = protoJsonSerializer.toProto3JSON(value);
129+
130+
return this.constructPayload({
131+
messageTypeName: getNamespacedTypeName(value.$type),
132+
message: encode(JSON.stringify(jsonValue)),
133+
});
134+
} finally {
135+
resetBufferInGlobal(hasBufferChanged);
136+
}
130137
}
131138

132139
public fromPayload<T>(content: Payload): T {
133-
const { messageType, data } = this.validatePayload(content);
134-
return protoJsonSerializer.fromProto3JSON(messageType, JSON.parse(decode(data))) as unknown as T;
140+
const hasBufferChanged = setBufferInGlobal();
141+
try {
142+
const { messageType, data } = this.validatePayload(content);
143+
const res = protoJsonSerializer.fromProto3JSON(messageType, JSON.parse(decode(data))) as unknown as T;
144+
if (Buffer.isBuffer(res)) {
145+
return new Uint8Array(res) as any;
146+
}
147+
replaceBuffers(res);
148+
return res;
149+
} finally {
150+
resetBufferInGlobal(hasBufferChanged);
151+
}
152+
}
153+
}
154+
155+
function replaceBuffers<X>(obj: X) {
156+
const replaceBuffersImpl = <Y>(value: any, key: string | number, target: Y) => {
157+
if (Buffer.isBuffer(value)) {
158+
// Need to copy. `Buffer` manages a pool slab, internally reused when Buffer objects are GC.
159+
type T = keyof typeof target;
160+
target[key as T] = new Uint8Array(value) as any;
161+
} else {
162+
replaceBuffers(value);
163+
}
164+
};
165+
166+
if (obj != null && typeof obj === 'object') {
167+
// Performance optimization for large arrays
168+
if (Array.isArray(obj)) {
169+
obj.forEach(replaceBuffersImpl);
170+
} else {
171+
for (const [key, value] of Object.entries(obj)) {
172+
replaceBuffersImpl(value, key, obj);
173+
}
174+
}
175+
}
176+
}
177+
178+
function setBufferInGlobal(): boolean {
179+
if (typeof globalThis.Buffer === 'undefined') {
180+
globalThis.Buffer = GLOBAL_BUFFER;
181+
return true;
182+
}
183+
return false;
184+
}
185+
186+
function resetBufferInGlobal(hasChanged: boolean): void {
187+
if (hasChanged) {
188+
delete (globalThis as any).Buffer;
135189
}
136190
}
137191

packages/test/src/test-payload-converter.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
2525
import { defaultOptions } from './mock-native-worker';
2626
import { messageInstance } from './payload-converters/proto-payload-converter';
2727
import { protobufWorkflow } from './workflows/protobufs';
28+
import { echoBinaryProtobuf } from './workflows/echo-binary-protobuf';
2829

2930
test('UndefinedPayloadConverter converts from undefined only', (t) => {
3031
const converter = new UndefinedPayloadConverter();
@@ -167,7 +168,7 @@ test('ProtobufJSONPayloadConverter converts binary', (t) => {
167168
});
168169

169170
const testInstance = converter.fromPayload<root.BinaryMessage>(encoded!);
170-
t.deepEqual(testInstance.data, Buffer.from(instance.data));
171+
t.deepEqual(testInstance.data, instance.data);
171172
});
172173

173174
if (RUN_INTEGRATION_TESTS) {
@@ -208,6 +209,30 @@ if (RUN_INTEGRATION_TESTS) {
208209
});
209210
t.pass();
210211
});
212+
213+
test('Worker encodes/decodes a protobuf containing a binary array', async (t) => {
214+
const binaryInstance = root.BinaryMessage.create({ data: encode('abc') });
215+
const dataConverter = { payloadConverterPath: require.resolve('./payload-converters/proto-payload-converter') };
216+
const taskQueue = `${__filename}/${t.title}`;
217+
218+
const worker = await Worker.create({
219+
...defaultOptions,
220+
workflowsPath: require.resolve('./workflows/echo-binary-protobuf'),
221+
taskQueue,
222+
dataConverter,
223+
});
224+
225+
const client = new WorkflowClient({ dataConverter });
226+
227+
await worker.runUntil(async () => {
228+
const result = await client.execute(echoBinaryProtobuf, {
229+
args: [binaryInstance],
230+
workflowId: uuid4(),
231+
taskQueue,
232+
});
233+
t.deepEqual(result, binaryInstance);
234+
});
235+
});
211236
}
212237

213238
test('DefaultPayloadConverterWithProtobufs converts protobufs', (t) => {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { ApplicationFailure } from '@temporalio/common';
2+
import type { BinaryMessage } from '../../protos/root';
3+
4+
export async function echoBinaryProtobuf(input: BinaryMessage): Promise<BinaryMessage> {
5+
if (input.data instanceof Uint8Array) {
6+
return input;
7+
}
8+
throw ApplicationFailure.nonRetryable('input.data is not a Uint8Array');
9+
}

0 commit comments

Comments
 (0)