Skip to content

Commit ad4daa3

Browse files
authored
Fix run replication payloads/outputs trying to send invalid types like BigInt to clickhouse (#2065)
1 parent bb7173b commit ad4daa3

File tree

3 files changed

+174
-2
lines changed

3 files changed

+174
-2
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
import { recordSpanError, startSpan, trace, type Tracer } from "@internal/tracing";
1111
import { Logger, LogLevel } from "@trigger.dev/core/logger";
1212
import { tryCatch } from "@trigger.dev/core/utils";
13-
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
13+
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
1414
import { TaskRun } from "@trigger.dev/database";
1515
import { nanoid } from "nanoid";
1616
import EventEmitter from "node:events";
@@ -636,7 +636,7 @@ export class RunsReplicationService {
636636
dataType,
637637
};
638638

639-
const [parseError, parsedData] = await tryCatch(parsePacket(packet));
639+
const [parseError, parsedData] = await tryCatch(parsePacketAsJson(packet));
640640

641641
if (parseError) {
642642
this.logger.error("Error parsing packet", {

apps/webapp/test/runsReplicationService.test.ts

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { z } from "zod";
66
import { TaskRunStatus } from "~/database-types";
77
import { RunsReplicationService } from "~/services/runsReplicationService.server";
88
import { createInMemoryTracing } from "./utils/tracing";
9+
import superjson from "superjson";
910

1011
vi.setConfig({ testTimeout: 60_000 });
1112

@@ -133,6 +134,147 @@ describe("RunsReplicationService", () => {
133134
}
134135
);
135136

137+
containerTest(
138+
"should replicate runs with super json payloads to clickhouse",
139+
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
140+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
141+
142+
const clickhouse = new ClickHouse({
143+
url: clickhouseContainer.getConnectionUrl(),
144+
name: "runs-replication",
145+
compression: {
146+
request: true,
147+
},
148+
});
149+
150+
const { tracer, exporter } = createInMemoryTracing();
151+
152+
const runsReplicationService = new RunsReplicationService({
153+
clickhouse,
154+
pgConnectionUrl: postgresContainer.getConnectionUri(),
155+
serviceName: "runs-replication",
156+
slotName: "task_runs_to_clickhouse_v1",
157+
publicationName: "task_runs_to_clickhouse_v1_publication",
158+
redisOptions,
159+
maxFlushConcurrency: 1,
160+
flushIntervalMs: 100,
161+
flushBatchSize: 1,
162+
leaderLockTimeoutMs: 5000,
163+
leaderLockExtendIntervalMs: 1000,
164+
ackIntervalSeconds: 5,
165+
tracer,
166+
});
167+
168+
await runsReplicationService.start();
169+
170+
const organization = await prisma.organization.create({
171+
data: {
172+
title: "test",
173+
slug: "test",
174+
},
175+
});
176+
177+
const project = await prisma.project.create({
178+
data: {
179+
name: "test",
180+
slug: "test",
181+
organizationId: organization.id,
182+
externalRef: "test",
183+
},
184+
});
185+
186+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
187+
data: {
188+
slug: "test",
189+
type: "DEVELOPMENT",
190+
projectId: project.id,
191+
organizationId: organization.id,
192+
apiKey: "test",
193+
pkApiKey: "test",
194+
shortcode: "test",
195+
},
196+
});
197+
198+
const date = new Date();
199+
200+
// Now we insert a row into the table
201+
const taskRun = await prisma.taskRun.create({
202+
data: {
203+
friendlyId: "run_1234",
204+
taskIdentifier: "my-task",
205+
payload: superjson.stringify({
206+
foo: "bar",
207+
bigint: BigInt(1234),
208+
date,
209+
map: new Map([["foo", "bar"]]),
210+
}),
211+
payloadType: "application/super+json",
212+
traceId: "1234",
213+
spanId: "1234",
214+
queue: "test",
215+
runtimeEnvironmentId: runtimeEnvironment.id,
216+
projectId: project.id,
217+
organizationId: organization.id,
218+
environmentType: "DEVELOPMENT",
219+
engine: "V2",
220+
},
221+
});
222+
223+
await setTimeout(1000);
224+
225+
// Check that the row was replicated to clickhouse
226+
const queryRuns = clickhouse.reader.query({
227+
name: "runs-replication",
228+
query: "SELECT * FROM trigger_dev.task_runs_v2",
229+
schema: z.any(),
230+
});
231+
232+
const [queryError, result] = await queryRuns({});
233+
234+
expect(queryError).toBeNull();
235+
expect(result?.length).toBe(1);
236+
expect(result?.[0]).toEqual(
237+
expect.objectContaining({
238+
run_id: taskRun.id,
239+
friendly_id: taskRun.friendlyId,
240+
task_identifier: taskRun.taskIdentifier,
241+
environment_id: runtimeEnvironment.id,
242+
project_id: project.id,
243+
organization_id: organization.id,
244+
environment_type: "DEVELOPMENT",
245+
engine: "V2",
246+
})
247+
);
248+
249+
const queryPayloads = clickhouse.reader.query({
250+
name: "runs-replication",
251+
query: "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}",
252+
schema: z.any(),
253+
params: z.object({ run_id: z.string() }),
254+
});
255+
256+
const [payloadQueryError, payloadResult] = await queryPayloads({ run_id: taskRun.id });
257+
258+
expect(payloadQueryError).toBeNull();
259+
expect(payloadResult?.length).toBe(1);
260+
expect(payloadResult?.[0]).toEqual(
261+
expect.objectContaining({
262+
run_id: taskRun.id,
263+
payload: {
264+
data: expect.objectContaining({
265+
foo: "bar",
266+
bigint: "1234",
267+
date: date.toISOString(),
268+
map: [["foo", "bar"]],
269+
}),
270+
},
271+
})
272+
);
273+
274+
await runsReplicationService.stop();
275+
}
276+
);
277+
136278
containerTest(
137279
"should not produce any handle_transaction spans when no TaskRun events are produced",
138280
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {

packages/core/src/v3/utils/ioSerialization.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,36 @@ export async function parsePacket(value: IOPacket, options?: ParsePacketOptions)
4141
}
4242
}
4343

44+
export async function parsePacketAsJson(
45+
value: IOPacket,
46+
options?: ParsePacketOptions
47+
): Promise<any> {
48+
if (!value.data) {
49+
return undefined;
50+
}
51+
52+
switch (value.dataType) {
53+
case "application/json":
54+
return JSON.parse(value.data, makeSafeReviver(options));
55+
case "application/super+json":
56+
const { parse, serialize } = await loadSuperJSON();
57+
58+
const superJsonResult = parse(value.data);
59+
60+
const { json } = serialize(superJsonResult);
61+
62+
return json;
63+
case "text/plain":
64+
return value.data;
65+
case "application/store":
66+
throw new Error(
67+
`Cannot parse an application/store packet (${value.data}). Needs to be imported first.`
68+
);
69+
default:
70+
return value.data;
71+
}
72+
}
73+
4474
export async function conditionallyImportAndParsePacket(
4575
value: IOPacket,
4676
client?: ApiClient

0 commit comments

Comments
 (0)