Skip to content

Commit 021d6d8

Browse files
authored
Improve efficiency of cancelling in progress TaskEvent records in v3 and v4 (#2112)
1 parent 068c024 commit 021d6d8

File tree

4 files changed

+69
-29
lines changed

4 files changed

+69
-29
lines changed

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,43 @@ export class EventRepository {
326326
});
327327
}
328328

329+
async cancelEvents(events: TaskEventRecord[], cancelledAt: Date, reason: string) {
330+
const eventsToCancel = events.filter((event) => event.isPartial);
331+
332+
if (eventsToCancel.length === 0) {
333+
return;
334+
}
335+
336+
await this.insertMany(
337+
eventsToCancel.map((event) => ({
338+
...omit(event, "id"),
339+
isPartial: false,
340+
isError: false,
341+
isCancelled: true,
342+
status: "ERROR",
343+
links: event.links ?? [],
344+
events: [
345+
{
346+
name: "cancellation",
347+
time: cancelledAt,
348+
properties: {
349+
reason,
350+
},
351+
},
352+
...((event.events as any[]) ?? []),
353+
],
354+
duration: calculateDurationFromStart(event.startTime, cancelledAt),
355+
properties: event.properties as Attributes,
356+
metadata: event.metadata as Attributes,
357+
style: event.style as Attributes,
358+
output: event.output as Attributes,
359+
outputType: event.outputType,
360+
payload: event.payload as Attributes,
361+
payloadType: event.payloadType,
362+
}))
363+
);
364+
}
365+
329366
async crashEvent({
330367
event,
331368
crashedAt,
@@ -394,28 +431,35 @@ export class EventRepository {
394431
queryOptions,
395432
startCreatedAt,
396433
endCreatedAt,
397-
{ spanId: true, isPartial: true, isCancelled: true }
434+
{ spanId: true, isPartial: true, isCancelled: true },
435+
undefined,
436+
{ limit: 500 }
398437
);
399438

400-
const filteredTaskEvents = taskEvents.filter((event) => {
401-
// Event must be partial
402-
if (!event.isPartial) return false;
403-
404-
// If the event is cancelled, it is not incomplete
405-
if (event.isCancelled) return false;
406-
407-
if (allowCompleteDuplicate) {
408-
return true;
439+
// Optimize the filtering by pre-processing the data
440+
const completeEventSpanIds = new Set<string>();
441+
const incompleteEvents: Array<{ spanId: string }> = [];
442+
443+
// Single pass to categorize events and build lookup structures
444+
for (const event of taskEvents) {
445+
if (!event.isPartial && !event.isCancelled) {
446+
// This is a complete event
447+
completeEventSpanIds.add(event.spanId);
448+
} else if (event.isPartial && !event.isCancelled) {
449+
// This is a potentially incomplete event
450+
incompleteEvents.push(event);
409451
}
452+
// Skip cancelled events as they are not incomplete
453+
}
410454

411-
// There must not be another complete event with the same spanId
412-
const hasCompleteDuplicate = taskEvents.some(
413-
(otherEvent) =>
414-
otherEvent.spanId === event.spanId && !otherEvent.isPartial && !otherEvent.isCancelled
415-
);
455+
// Filter incomplete events, excluding those with complete duplicates
456+
const filteredTaskEvents = allowCompleteDuplicate
457+
? incompleteEvents
458+
: incompleteEvents.filter((event) => !completeEventSpanIds.has(event.spanId));
416459

417-
return !hasCompleteDuplicate;
418-
});
460+
if (filteredTaskEvents.length === 0) {
461+
return [];
462+
}
419463

420464
return this.#queryEvents(
421465
storeTable,

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,9 @@ export function registerRunEngineEventBusHandlers() {
309309
run.completedAt ?? undefined
310310
);
311311

312-
await Promise.all(
313-
inProgressEvents.map((event) => {
314-
const error = createJsonErrorObject(run.error);
315-
return eventRepository.cancelEvent(event, time, error.message);
316-
})
317-
);
312+
const error = createJsonErrorObject(run.error);
313+
314+
await eventRepository.cancelEvents(inProgressEvents, time, error.message);
318315
} catch (error) {
319316
logger.error("[runCancelled] Failed to cancel event", {
320317
error: error instanceof Error ? error.message : error,

apps/webapp/app/v3/services/cancelTaskRunV1.server.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,10 @@ export class CancelTaskRunServiceV1 extends BaseService {
9595

9696
logger.debug("Cancelling in-progress events", {
9797
inProgressEvents: inProgressEvents.map((event) => event.id),
98+
eventCount: inProgressEvents.length,
9899
});
99100

100-
await Promise.all(
101-
inProgressEvents.map((event) => {
102-
return eventRepository.cancelEvent(event, opts.cancelledAt, opts.reason);
103-
})
104-
);
101+
await eventRepository.cancelEvents(inProgressEvents, opts.cancelledAt, opts.reason);
105102

106103
// Cancel any in progress attempts
107104
if (opts.cancelAttempts) {

apps/webapp/app/v3/taskEventStore.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ export class TaskEventStore {
7777
endCreatedAt?: Date,
7878
select?: TSelect,
7979
orderBy?: Prisma.TaskEventOrderByWithRelationInput,
80-
options?: { includeDebugLogs?: boolean }
80+
options?: { includeDebugLogs?: boolean; limit?: number }
8181
): Promise<Prisma.TaskEventGetPayload<{ select: TSelect }>[]> {
8282
let finalWhere: Prisma.TaskEventWhereInput = where;
8383

@@ -111,6 +111,7 @@ export class TaskEventStore {
111111
},
112112
select,
113113
orderBy,
114+
take: options?.limit,
114115
})) as Prisma.TaskEventGetPayload<{ select: TSelect }>[];
115116
} else {
116117
// When partitioning is not enabled, we ignore the createdAt range.
@@ -121,6 +122,7 @@ export class TaskEventStore {
121122
},
122123
select,
123124
orderBy,
125+
take: options?.limit,
124126
})) as Prisma.TaskEventGetPayload<{ select: TSelect }>[];
125127
}
126128
}

0 commit comments

Comments
 (0)