diff --git a/apps/webapp/app/components/runs/v3/RunInspector.tsx b/apps/webapp/app/components/runs/v3/RunInspector.tsx index 540083fff9..67282f123a 100644 --- a/apps/webapp/app/components/runs/v3/RunInspector.tsx +++ b/apps/webapp/app/components/runs/v3/RunInspector.tsx @@ -32,14 +32,11 @@ import { cn } from "~/utils/cn"; import { formatCurrencyAccurate } from "~/utils/numberFormatter"; import { v3RunDownloadLogsPath, - v3RunPath, v3RunSpanPath, v3RunsPath, v3SchedulePath, - v3TraceSpanPath, } from "~/utils/pathBuilder"; import { TraceSpan } from "~/utils/taskEvent"; -import { SpanLink } from "~/v3/eventRepository.server"; import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus"; import { RunTimelineEvent, RunTimelineLine } from "./InspectorTimeline"; import { RunTag } from "./RunTag"; @@ -317,18 +314,6 @@ export function RunInspector({ )} - {span?.links && span.links.length > 0 && ( - - Links - -
- {span.links.map((link, index) => ( - - ))} -
-
-
- )} Max duration @@ -647,27 +632,3 @@ function PacketDisplay({ } } } - -function SpanLinkElement({ link }: { link: SpanLink }) { - const organization = useOrganization(); - const project = useProject(); - - switch (link.type) { - case "run": { - return ( - - {link.title} - - ); - } - case "span": { - return ( - - {link.title} - - ); - } - } - - return null; -} diff --git a/apps/webapp/app/components/runs/v3/SpanInspector.tsx b/apps/webapp/app/components/runs/v3/SpanInspector.tsx index 0614efaff7..1e7efe63e6 100644 --- a/apps/webapp/app/components/runs/v3/SpanInspector.tsx +++ b/apps/webapp/app/components/runs/v3/SpanInspector.tsx @@ -1,9 +1,11 @@ +import { formatDuration, nanosecondsToMilliseconds } from "@trigger.dev/core/v3"; import { ExitIcon } from "~/assets/icons/ExitIcon"; import { CodeBlock } from "~/components/code/CodeBlock"; import { Button } from "~/components/primitives/Buttons"; import { DateTimeAccurate } from "~/components/primitives/DateTime"; import { Header2 } from "~/components/primitives/Headers"; import * as Property from "~/components/primitives/PropertyTable"; +import { Spinner } from "~/components/primitives/Spinner"; import { TabButton, TabContainer } from "~/components/primitives/Tabs"; import { TextLink } from "~/components/primitives/TextLink"; import { InfoIconTooltip, SimpleTooltip } from "~/components/primitives/Tooltip"; @@ -15,13 +17,10 @@ import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; import { useSearchParams } from "~/hooks/useSearchParam"; import { cn } from "~/utils/cn"; -import { v3RunPath, v3RunsPath, v3TraceSpanPath } from "~/utils/pathBuilder"; +import { v3RunsPath } from "~/utils/pathBuilder"; import { TraceSpan } from "~/utils/taskEvent"; -import { SpanLink } from "~/v3/eventRepository.server"; import { RunTimelineEvent, RunTimelineLine } from "./InspectorTimeline"; -import { Spinner } from "~/components/primitives/Spinner"; import { LiveTimer } from "./LiveTimer"; -import { formatDuration, nanosecondsToMilliseconds } from "@trigger.dev/core/v3"; export function SpanInspector({ span, @@ -150,18 +149,6 @@ export function SpanInspector({ )} - {span.links && span.links.length > 0 && ( - - Links - -
- {span.links.map((link, index) => ( - - ))} -
-
-
- )} ) : ( @@ -203,18 +190,6 @@ export function SpanInspector({ Message {span.message} - {span.links && span.links.length > 0 && ( - - Links - -
- {span.links.map((link, index) => ( - - ))} -
-
-
- )} {span.events !== undefined && } @@ -287,27 +262,3 @@ export function SpanTimeline({ startTime, duration, inProgress, isError }: Timel ); } - -function SpanLinkElement({ link }: { link: SpanLink }) { - const organization = useOrganization(); - const project = useProject(); - - switch (link.type) { - case "run": { - return ( - - {link.title} - - ); - } - case "span": { - return ( - - {link.title} - - ); - } - } - - return null; -} diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index e888ff5b2e..310732470e 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -481,6 +481,9 @@ const EnvironmentSchema = z.object({ .transform((v) => v ?? process.env.REDIS_PASSWORD), COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + + TASK_EVENT_PARTITIONING_ENABLED: z.string().default("0"), + TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS: z.coerce.number().int().default(60), // 1 minute }); export type Environment = z.infer; diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 2903d64780..6b8d7a432c 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -3,6 +3,7 @@ import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/Tr import { PrismaClient, prisma } from "~/db.server"; import { getUsername } from "~/utils/username"; import { eventRepository } from "~/v3/eventRepository.server"; +import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; import { isFinalRunStatus } from "~/v3/taskStatus"; type Result = Awaited>; @@ -32,6 +33,8 @@ export class RunPresenter { const run = await this.#prismaClient.taskRun.findFirstOrThrow({ select: { id: true, + createdAt: true, + taskEventStore: true, number: true, traceId: true, spanId: true, @@ -105,7 +108,12 @@ export class RunPresenter { } // get the events - const traceSummary = await eventRepository.getTraceSummary(run.traceId); + const traceSummary = await eventRepository.getTraceSummary( + getTaskEventStoreTableForRun(run), + run.traceId, + run.createdAt, + run.completedAt ?? undefined + ); if (!traceSummary) { return { run: runData, diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index 30bf6683db..7266516950 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -10,6 +10,7 @@ import { machinePresetFromName } from "~/v3/machinePresets.server"; import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus"; import { BasePresenter } from "./basePresenter.server"; import { getMaxDuration } from "~/v3/utils/maxDuration"; +import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; type Result = Awaited>; export type Span = NonNullable["span"]>; @@ -71,6 +72,7 @@ export class SpanPresenter extends BasePresenter { friendlyId: true, isTest: true, maxDurationInSeconds: true, + taskEventStore: true, tags: { select: { name: true, @@ -205,7 +207,13 @@ export class SpanPresenter extends BasePresenter { } } - const span = await eventRepository.getSpan(spanId, run.traceId); + const span = await eventRepository.getSpan( + getTaskEventStoreTableForRun(run), + spanId, + run.traceId, + run.createdAt, + run.completedAt ?? undefined + ); const metadata = run.metadata ? await prettyPrintPacket(run.metadata, run.metadataType, { @@ -342,6 +350,9 @@ export class SpanPresenter extends BasePresenter { const run = await this._prisma.taskRun.findFirst({ select: { traceId: true, + createdAt: true, + completedAt: true, + taskEventStore: true, }, where: { friendlyId: runFriendlyId, @@ -352,7 +363,13 @@ export class SpanPresenter extends BasePresenter { return; } - const span = await eventRepository.getSpan(spanId, run.traceId); + const span = await eventRepository.getSpan( + getTaskEventStoreTableForRun(run), + spanId, + run.traceId, + run.createdAt, + run.completedAt ?? undefined + ); if (!span) { return; diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.traces.$traceId.spans.$spanId/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.traces.$traceId.spans.$spanId/route.tsx deleted file mode 100644 index 8ca0b967c2..0000000000 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.traces.$traceId.spans.$spanId/route.tsx +++ /dev/null @@ -1,35 +0,0 @@ -import { LoaderFunctionArgs, redirect } from "@remix-run/server-runtime"; -import { z } from "zod"; -import { prisma } from "~/db.server"; -import { requireUserId } from "~/services/session.server"; -import { v3RunSpanPath } from "~/utils/pathBuilder"; -import { eventRepository } from "~/v3/eventRepository.server"; - -const ParamsSchema = z.object({ - organizationSlug: z.string(), - projectParam: z.string(), - traceId: z.string(), - spanId: z.string(), -}); - -export async function loader({ params, request }: LoaderFunctionArgs) { - const userId = await requireUserId(request); - - const validatedParams = ParamsSchema.parse(params); - - const trace = await eventRepository.getTraceSummary(validatedParams.traceId); - - if (!trace) { - return new Response("Not found", { status: 404 }); - } - - // Redirect to the project's runs page - return redirect( - v3RunSpanPath( - { slug: validatedParams.organizationSlug }, - { slug: validatedParams.projectParam }, - { friendlyId: trace.rootSpan.runId }, - { spanId: validatedParams.spanId } - ) - ); -} diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx index 55c9dd38dd..6b23bb2fe5 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx @@ -44,7 +44,6 @@ import { RunTag } from "~/components/runs/v3/RunTag"; import { SpanEvents } from "~/components/runs/v3/SpanEvents"; import { SpanTitle } from "~/components/runs/v3/SpanTitle"; import { TaskRunAttemptStatusCombo } from "~/components/runs/v3/TaskRunAttemptStatus"; -import { TaskRunsTable } from "~/components/runs/v3/TaskRunsTable"; import { TaskRunStatusCombo } from "~/components/runs/v3/TaskRunStatus"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; @@ -57,16 +56,13 @@ import { cn } from "~/utils/cn"; import { formatCurrencyAccurate } from "~/utils/numberFormatter"; import { v3BatchPath, - v3BatchRunsPath, v3RunDownloadLogsPath, v3RunPath, v3RunSpanPath, v3RunsPath, v3SchedulePath, v3SpanParamsSchema, - v3TraceSpanPath, } from "~/utils/pathBuilder"; -import { SpanLink } from "~/v3/eventRepository.server"; export const loader = async ({ request, params }: LoaderFunctionArgs) => { const userId = await requireUserId(request); @@ -1152,27 +1148,3 @@ function classNameForState(state: TimelineState) { } } } - -function SpanLinkElement({ link }: { link: SpanLink }) { - const organization = useOrganization(); - const project = useProject(); - - switch (link.type) { - case "run": { - return ( - - {link.title} - - ); - } - case "span": { - return ( - - {link.title} - - ); - } - } - - return null; -} diff --git a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts index 3b9dc5912b..bbc58d3de3 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts @@ -7,6 +7,7 @@ import { createGzip } from "zlib"; import { Readable } from "stream"; import { formatDurationMilliseconds } from "@trigger.dev/core/v3/utils/durations"; import { getDateFromNanoseconds } from "~/utils/taskEvent"; +import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; export async function loader({ params, request }: LoaderFunctionArgs) { const userId = await requireUserId(request); @@ -31,7 +32,12 @@ export async function loader({ params, request }: LoaderFunctionArgs) { return new Response("Not found", { status: 404 }); } - const runEvents = await eventRepository.getRunEvents(run.friendlyId); + const runEvents = await eventRepository.getRunEvents( + getTaskEventStoreTableForRun(run), + run.friendlyId, + run.createdAt, + run.completedAt ?? undefined + ); // Create a Readable stream from the runEvents array const readable = new Readable({ diff --git a/apps/webapp/app/utils/pathBuilder.ts b/apps/webapp/app/utils/pathBuilder.ts index aa5a9915b3..6a0cecc093 100644 --- a/apps/webapp/app/utils/pathBuilder.ts +++ b/apps/webapp/app/utils/pathBuilder.ts @@ -396,15 +396,6 @@ export function v3RunSpanPath( return `${v3RunPath(organization, project, run)}?span=${span.spanId}`; } -export function v3TraceSpanPath( - organization: OrgForPath, - project: ProjectForPath, - traceId: string, - spanId: string -) { - return `${v3ProjectPath(organization, project)}/traces/${traceId}/spans/${spanId}`; -} - export function v3RunStreamingPath( organization: OrgForPath, project: ProjectForPath, diff --git a/apps/webapp/app/utils/taskEvent.ts b/apps/webapp/app/utils/taskEvent.ts index 41a1703c25..9e3ebfb90c 100644 --- a/apps/webapp/app/utils/taskEvent.ts +++ b/apps/webapp/app/utils/taskEvent.ts @@ -62,7 +62,6 @@ export function prepareTrace(events: TaskEvent[]): TraceSummary | undefined { ); const span = { - recordId: event.id, id: event.spanId, parentId: event.parentId ?? undefined, runId: event.runId, @@ -247,7 +246,6 @@ export function createSpanFromEvent(events: TaskEvent[], event: PreparedEvent) { } const span = { - recordId: event.id, id: event.spanId, parentId: event.parentId ?? undefined, runId: event.runId, diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 4cfa215844..bed3871fc5 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -34,6 +34,7 @@ import { startActiveSpan } from "./tracer.server"; import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server"; import { startSpan } from "./tracing.server"; import { nanoid } from "nanoid"; +import { TaskEventStore, TaskEventStoreTable } from "./taskEventStore.server"; const MAX_FLUSH_DEPTH = 5; @@ -101,6 +102,7 @@ export type EventRepoConfig = { batchInterval: number; redis: RedisWithClusterOptions; retentionInDays: number; + partitioningEnabled: boolean; tracer?: Tracer; }; @@ -110,7 +112,6 @@ export type TaskEventRecord = TaskEvent; export type QueriedEvent = Prisma.TaskEventGetPayload<{ select: { - id: true; spanId: true; parentId: true; runId: true; @@ -154,7 +155,6 @@ export type SpanLink = }; export type SpanSummary = { - recordId: string; id: string; parentId: string | undefined; runId: string; @@ -181,25 +181,6 @@ export type UpdateEventOptions = { events?: SpanEvents; }; -type TaskEventSummary = Pick< - TaskEvent, - | "id" - | "spanId" - | "parentId" - | "runId" - | "idempotencyKey" - | "message" - | "style" - | "startTime" - | "duration" - | "isError" - | "isPartial" - | "isCancelled" - | "level" - | "events" - | "environmentType" ->; - export class EventRepository { private readonly _flushScheduler: DynamicFlushScheduler; private _randomIdGenerator = new RandomIdGenerator(); @@ -207,14 +188,15 @@ export class EventRepository { private _subscriberCount = 0; private _tracer: Tracer; private _lastFlushedAt: Date | undefined; + private taskEventStore: TaskEventStore; get subscriberCount() { return this._subscriberCount; } constructor( - private db: PrismaClient = prisma, - private readReplica: PrismaReplicaClient = $replica, + db: PrismaClient = prisma, + readReplica: PrismaReplicaClient = $replica, private readonly _config: EventRepoConfig ) { this._flushScheduler = new DynamicFlushScheduler({ @@ -225,6 +207,9 @@ export class EventRepository { this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis); this._tracer = _config.tracer ?? trace.getTracer("eventRepo", "0.0.1"); + + // Instantiate the store using the partitioning flag. + this.taskEventStore = new TaskEventStore(db, readReplica); } async insert(event: CreatableEvent) { @@ -243,8 +228,19 @@ export class EventRepository { return await this.#flushBatch(nanoid(), events); } - async completeEvent(spanId: string, options?: UpdateEventOptions) { - const events = await this.queryIncompleteEvents({ spanId }); + async completeEvent( + storeTable: TaskEventStoreTable, + spanId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: UpdateEventOptions + ) { + const events = await this.queryIncompleteEvents( + storeTable, + { spanId }, + startCreatedAt, + endCreatedAt + ); if (events.length === 0) { logger.warn("No incomplete events found for spanId", { spanId, options }); @@ -362,22 +358,35 @@ export class EventRepository { }); } - async queryEvents(queryOptions: QueryOptions): Promise { - return await this.readReplica.taskEvent.findMany({ - where: queryOptions, - }); + async #queryEvents( + storeTable: TaskEventStoreTable, + queryOptions: QueryOptions, + startCreatedAt: Date, + endCreatedAt?: Date + ): Promise { + return await this.taskEventStore.findMany( + storeTable, + queryOptions, + startCreatedAt, + endCreatedAt + ); } - async queryIncompleteEvents(queryOptions: QueryOptions, allowCompleteDuplicate = false) { + async queryIncompleteEvents( + storeTable: TaskEventStoreTable, + queryOptions: QueryOptions, + startCreatedAt: Date, + endCreatedAt?: Date, + allowCompleteDuplicate = false + ) { // First we will find all the events that match the query options (selecting minimal data). - const taskEvents = await this.readReplica.taskEvent.findMany({ - where: queryOptions, - select: { - spanId: true, - isPartial: true, - isCancelled: true, - }, - }); + const taskEvents = await this.taskEventStore.findMany( + storeTable, + queryOptions, + startCreatedAt, + endCreatedAt, + { spanId: true, isPartial: true, isCancelled: true } + ); const filteredTaskEvents = taskEvents.filter((event) => { // Event must be partial @@ -399,37 +408,31 @@ export class EventRepository { return !hasCompleteDuplicate; }); - return this.queryEvents({ - spanId: { - in: filteredTaskEvents.map((event) => event.spanId), + return this.#queryEvents( + storeTable, + { + spanId: { + in: filteredTaskEvents.map((event) => event.spanId), + }, }, - }); + startCreatedAt, + endCreatedAt + ); } - public async getTraceSummary(traceId: string): Promise { + public async getTraceSummary( + storeTable: TaskEventStoreTable, + traceId: string, + startCreatedAt: Date, + endCreatedAt?: Date + ): Promise { return await startActiveSpan("getTraceSummary", async (span) => { - const events = await this.readReplica.$queryRaw` - SELECT - id, - "spanId", - "parentId", - "runId", - "idempotencyKey", - LEFT(message, 256) as message, - style, - "startTime", - duration, - "isError", - "isPartial", - "isCancelled", - level, - events, - "environmentType" - FROM "TaskEvent" - WHERE "traceId" = ${traceId} - ORDER BY "startTime" ASC - LIMIT ${env.MAXIMUM_TRACE_SUMMARY_VIEW_COUNT} - `; + const events = await this.taskEventStore.findTraceEvents( + storeTable, + traceId, + startCreatedAt, + endCreatedAt + ); let preparedEvents: Array = []; let rootSpanId: string | undefined; @@ -469,7 +472,6 @@ export class EventRepository { ); const span = { - recordId: event.id, id: event.spanId, parentId: event.parentId ?? undefined, runId: event.runId, @@ -510,11 +512,22 @@ export class EventRepository { }); } - public async getRunEvents(runId: string): Promise { + public async getRunEvents( + storeTable: TaskEventStoreTable, + runId: string, + startCreatedAt: Date, + endCreatedAt?: Date + ): Promise { return await startActiveSpan("getRunEvents", async (span) => { - const events = await this.readReplica.taskEvent.findMany({ - select: { - id: true, + const events = await this.taskEventStore.findMany( + storeTable, + { + runId, + isPartial: false, + }, + startCreatedAt, + endCreatedAt, + { spanId: true, parentId: true, runId: true, @@ -530,15 +543,8 @@ export class EventRepository { events: true, environmentType: true, taskSlug: true, - }, - where: { - runId, - isPartial: false, - }, - orderBy: { - startTime: "asc", - }, - }); + } + ); let preparedEvents: Array = []; @@ -552,9 +558,15 @@ export class EventRepository { // A Span can be cancelled if it is partial and has a parent that is cancelled // And a span's duration, if it is partial and has a cancelled parent, is the time between the start of the span and the time of the cancellation event of the parent - public async getSpan(spanId: string, traceId: string) { + public async getSpan( + storeTable: TaskEventStoreTable, + spanId: string, + traceId: string, + startCreatedAt: Date, + endCreatedAt?: Date + ) { return await startActiveSpan("getSpan", async (s) => { - const spanEvent = await this.#getSpanEvent(spanId); + const spanEvent = await this.#getSpanEvent(storeTable, spanId, startCreatedAt, endCreatedAt); if (!spanEvent) { return; @@ -562,7 +574,12 @@ export class EventRepository { const preparedEvent = prepareEvent(spanEvent); - const span = await this.#createSpanFromEvent(preparedEvent); + const span = await this.#createSpanFromEvent( + storeTable, + preparedEvent, + startCreatedAt, + endCreatedAt + ); const output = rehydrateJson(spanEvent.output); const payload = rehydrateJson(spanEvent.payload); @@ -627,38 +644,48 @@ export class EventRepository { }); } - async #createSpanFromEvent(event: PreparedEvent) { + async #createSpanFromEvent( + storeTable: TaskEventStoreTable, + event: PreparedEvent, + startCreatedAt: Date, + endCreatedAt?: Date + ) { return await startActiveSpan("createSpanFromEvent", async (s) => { let ancestorCancelled = false; let duration = event.duration; if (!event.isCancelled && event.isPartial) { - await this.#walkSpanAncestors(event, (ancestorEvent, level) => { - if (level >= 8) { - return { stop: true }; - } + await this.#walkSpanAncestors( + storeTable, + event, + startCreatedAt, + endCreatedAt, + (ancestorEvent, level) => { + if (level >= 8) { + return { stop: true }; + } + + if (ancestorEvent.isCancelled) { + ancestorCancelled = true; - if (ancestorEvent.isCancelled) { - ancestorCancelled = true; + // We need to get the cancellation time from the cancellation span event + const cancellationEvent = ancestorEvent.events.find( + (event) => event.name === "cancellation" + ); - // We need to get the cancellation time from the cancellation span event - const cancellationEvent = ancestorEvent.events.find( - (event) => event.name === "cancellation" - ); + if (cancellationEvent) { + duration = calculateDurationFromStart(event.startTime, cancellationEvent.time); + } - if (cancellationEvent) { - duration = calculateDurationFromStart(event.startTime, cancellationEvent.time); + return { stop: true }; } - return { stop: true }; + return { stop: false }; } - - return { stop: false }; - }); + ); } const span = { - recordId: event.id, id: event.spanId, parentId: event.parentId ?? undefined, runId: event.runId, @@ -682,7 +709,10 @@ export class EventRepository { } async #walkSpanAncestors( + storeTable: TaskEventStoreTable, event: PreparedEvent, + startCreatedAt: Date, + endCreatedAt: Date | undefined, callback: (event: PreparedEvent, level: number) => { stop: boolean } ) { const parentId = event.parentId; @@ -691,7 +721,12 @@ export class EventRepository { } await startActiveSpan("walkSpanAncestors", async (s) => { - let parentEvent = await this.#getSpanEvent(parentId); + let parentEvent = await this.#getSpanEvent( + storeTable, + parentId, + startCreatedAt, + endCreatedAt + ); let level = 1; while (parentEvent) { @@ -707,49 +742,35 @@ export class EventRepository { return; } - parentEvent = await this.#getSpanEvent(preparedParentEvent.parentId); + parentEvent = await this.#getSpanEvent( + storeTable, + preparedParentEvent.parentId, + startCreatedAt, + endCreatedAt + ); level++; } }); } - async #getSpanAncestors(event: PreparedEvent, levels = 1): Promise> { - if (levels >= 8) { - return []; - } - - if (!event.parentId) { - return []; - } - - const parentEvent = await this.#getSpanEvent(event.parentId); - - if (!parentEvent) { - return []; - } - - const preparedParentEvent = prepareEvent(parentEvent); - - if (!preparedParentEvent.parentId) { - return [preparedParentEvent]; - } - - const moreAncestors = await this.#getSpanAncestors(preparedParentEvent, levels + 1); - - return [preparedParentEvent, ...moreAncestors]; - } - - async #getSpanEvent(spanId: string) { + async #getSpanEvent( + storeTable: TaskEventStoreTable, + spanId: string, + startCreatedAt: Date, + endCreatedAt?: Date + ) { return await startActiveSpan("getSpanEvent", async (s) => { - const events = await this.readReplica.taskEvent.findMany({ - where: { - spanId, - }, - orderBy: { + const events = await this.taskEventStore.findMany( + storeTable, + { spanId }, + startCreatedAt, + endCreatedAt, + undefined, + { startTime: "asc", - }, - }); + } + ); let finalEvent: TaskEvent | undefined; @@ -1054,6 +1075,10 @@ export class EventRepository { }); } + private get taskEventStoreTable(): TaskEventStoreTable { + return this._config.partitioningEnabled ? "taskEventPartitioned" : "taskEvent"; + } + async #doFlushBatch( flushId: string, events: CreatableEvent[], @@ -1065,9 +1090,10 @@ export class EventRepository { span.setAttribute("depth", depth); span.setAttribute("flush_id", flushId); - await this.db.taskEvent.createMany({ - data: events as Prisma.TaskEventCreateManyInput[], - }); + await this.taskEventStore.createMany( + this.taskEventStoreTable, + events as Prisma.TaskEventCreateManyInput[] + ); span.setAttribute("inserted_event_count", events.length); @@ -1186,6 +1212,7 @@ function initializeEventRepo() { batchSize: env.EVENTS_BATCH_SIZE, batchInterval: env.EVENTS_BATCH_INTERVAL, retentionInDays: env.EVENTS_DEFAULT_LOG_RETENTION, + partitioningEnabled: env.TASK_EVENT_PARTITIONING_ENABLED === "1", redis: { port: env.PUBSUB_REDIS_PORT, host: env.PUBSUB_REDIS_HOST, @@ -1411,35 +1438,6 @@ function findFirstCancelledAncestor(events: Map, spanId: return; } -// Prioritize spans with the same id, keeping the completed spans over partial spans -// Completed spans are either !isPartial or isCancelled -function removeDuplicateEvents(events: PreparedEvent[]) { - const dedupedEvents = new Map(); - - for (const event of events) { - const existingEvent = dedupedEvents.get(event.spanId); - - if (!existingEvent) { - dedupedEvents.set(event.spanId, event); - continue; - } - - if (event.isCancelled || !event.isPartial) { - dedupedEvents.set(event.spanId, event); - } - } - - return Array.from(dedupedEvents.values()); -} - -function isEmptyJson(json: Prisma.JsonValue) { - if (json === null) { - return true; - } - - return false; -} - function sanitizedAttributes(json: Prisma.JsonValue) { if (json === null || json === undefined) { return; @@ -1516,18 +1514,6 @@ function transformException( }; } -function filteredAttributes(attributes: Attributes, prefix: string): Attributes { - const result: Attributes = {}; - - for (const [key, value] of Object.entries(attributes)) { - if (key.startsWith(prefix)) { - result[key] = value; - } - } - - return result; -} - function calculateDurationFromStart(startTime: bigint, endTime: Date = new Date()) { const $endtime = typeof endTime === "string" ? new Date(endTime) : endTime; diff --git a/apps/webapp/app/v3/services/cancelAttempt.server.ts b/apps/webapp/app/v3/services/cancelAttempt.server.ts index 20f469c81f..b889705e1e 100644 --- a/apps/webapp/app/v3/services/cancelAttempt.server.ts +++ b/apps/webapp/app/v3/services/cancelAttempt.server.ts @@ -5,6 +5,7 @@ import { eventRepository } from "../eventRepository.server"; import { isCancellableRunStatus } from "../taskStatus"; import { BaseService } from "./baseService.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; +import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; export class CancelAttemptService extends BaseService { public async call( @@ -72,9 +73,14 @@ export class CancelAttemptService extends BaseService { }); }); - const inProgressEvents = await eventRepository.queryIncompleteEvents({ - runId: taskRunAttempt.taskRun.friendlyId, - }); + const inProgressEvents = await eventRepository.queryIncompleteEvents( + getTaskEventStoreTableForRun(taskRunAttempt.taskRun), + { + runId: taskRunAttempt.taskRun.friendlyId, + }, + taskRunAttempt.taskRun.createdAt, + taskRunAttempt.taskRun.completedAt ?? undefined + ); logger.debug("Cancelling in-progress events", { inProgressEvents: inProgressEvents.map((event) => event.id), diff --git a/apps/webapp/app/v3/services/cancelTaskRun.server.ts b/apps/webapp/app/v3/services/cancelTaskRun.server.ts index a0f37ab23b..b78e75264b 100644 --- a/apps/webapp/app/v3/services/cancelTaskRun.server.ts +++ b/apps/webapp/app/v3/services/cancelTaskRun.server.ts @@ -9,6 +9,7 @@ import { BaseService } from "./baseService.server"; import { CancelAttemptService } from "./cancelAttempt.server"; import { CancelTaskAttemptDependenciesService } from "./cancelTaskAttemptDependencies.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; +import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; type ExtendedTaskRun = Prisma.TaskRunGetPayload<{ include: { @@ -83,9 +84,14 @@ export class CancelTaskRunService extends BaseService { }, }); - const inProgressEvents = await eventRepository.queryIncompleteEvents({ - runId: taskRun.friendlyId, - }); + const inProgressEvents = await eventRepository.queryIncompleteEvents( + getTaskEventStoreTableForRun(taskRun), + { + runId: taskRun.friendlyId, + }, + taskRun.createdAt, + taskRun.completedAt ?? undefined + ); logger.debug("Cancelling in-progress events", { inProgressEvents: inProgressEvents.map((event) => event.id), diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index fe18c5c29c..74a477839c 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -32,6 +32,7 @@ import { CreateCheckpointService } from "./createCheckpoint.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { RetryAttemptService } from "./retryAttempt.server"; import { updateMetadataService } from "~/services/metadata/updateMetadata.server"; +import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; type FoundAttempt = Awaited>; @@ -164,19 +165,25 @@ export class CompleteAttemptService extends BaseService { }); // Now we need to "complete" the task run event/span - await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, { - endTime: new Date(), - attributes: { - isError: false, - output: - completion.outputType === "application/store" || completion.outputType === "text/plain" - ? completion.output - : completion.output - ? (safeJsonParse(completion.output) as Attributes) - : undefined, - outputType: completion.outputType, - }, - }); + await eventRepository.completeEvent( + getTaskEventStoreTableForRun(taskRunAttempt.taskRun), + taskRunAttempt.taskRun.spanId, + taskRunAttempt.taskRun.createdAt, + taskRunAttempt.taskRun.completedAt ?? undefined, + { + endTime: new Date(), + attributes: { + isError: false, + output: + completion.outputType === "application/store" || completion.outputType === "text/plain" + ? completion.output + : completion.output + ? (safeJsonParse(completion.output) as Attributes) + : undefined, + outputType: completion.outputType, + }, + } + ); return "COMPLETED"; } @@ -307,21 +314,27 @@ export class CompleteAttemptService extends BaseService { // The attempt has failed and we won't retry // Now we need to "complete" the task run event/span - await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, { - endTime: failedAt, - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time: failedAt, - properties: { - exception: createExceptionPropertiesFromError(sanitizedError), - }, + await eventRepository.completeEvent( + getTaskEventStoreTableForRun(taskRunAttempt.taskRun), + taskRunAttempt.taskRun.spanId, + taskRunAttempt.taskRun.createdAt, + taskRunAttempt.taskRun.completedAt ?? undefined, + { + endTime: failedAt, + attributes: { + isError: true, }, - ], - }); + events: [ + { + name: "exception", + time: failedAt, + properties: { + exception: createExceptionPropertiesFromError(sanitizedError), + }, + }, + ], + } + ); await this._prisma.taskRun.update({ where: { @@ -363,9 +376,14 @@ export class CompleteAttemptService extends BaseService { return "COMPLETED"; } - const inProgressEvents = await eventRepository.queryIncompleteEvents({ - runId: taskRunAttempt.taskRun.friendlyId, - }); + const inProgressEvents = await eventRepository.queryIncompleteEvents( + getTaskEventStoreTableForRun(taskRunAttempt.taskRun), + { + runId: taskRunAttempt.taskRun.friendlyId, + }, + taskRunAttempt.taskRun.createdAt, + taskRunAttempt.taskRun.completedAt ?? undefined + ); // Handle in-progress events switch (status) { @@ -393,21 +411,27 @@ export class CompleteAttemptService extends BaseService { await Promise.all( inProgressEvents.map((event) => { - return eventRepository.completeEvent(event.spanId, { - endTime: failedAt, - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time: failedAt, - properties: { - exception: createExceptionPropertiesFromError(sanitizedError), - }, + return eventRepository.completeEvent( + getTaskEventStoreTableForRun(taskRunAttempt.taskRun), + event.spanId, + taskRunAttempt.taskRun.createdAt, + taskRunAttempt.taskRun.completedAt ?? undefined, + { + endTime: failedAt, + attributes: { + isError: true, }, - ], - }); + events: [ + { + name: "exception", + time: failedAt, + properties: { + exception: createExceptionPropertiesFromError(sanitizedError), + }, + }, + ], + } + ); }) ); } diff --git a/apps/webapp/app/v3/services/crashTaskRun.server.ts b/apps/webapp/app/v3/services/crashTaskRun.server.ts index 8f9aaa1901..b6c463a53c 100644 --- a/apps/webapp/app/v3/services/crashTaskRun.server.ts +++ b/apps/webapp/app/v3/services/crashTaskRun.server.ts @@ -7,6 +7,7 @@ import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus" import { sanitizeError, TaskRunErrorCodes, TaskRunInternalError } from "@trigger.dev/core/v3"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { FailedTaskRunRetryHelper } from "../failedTaskRun.server"; +import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; export type CrashTaskRunServiceOptions = { reason?: string; @@ -120,9 +121,12 @@ export class CrashTaskRunService extends BaseService { }); const inProgressEvents = await eventRepository.queryIncompleteEvents( + getTaskEventStoreTableForRun(taskRun), { runId: taskRun.friendlyId, }, + taskRun.createdAt, + taskRun.completedAt ?? undefined, options?.overrideCompletion ); diff --git a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts index a55063c124..9e97795eaa 100644 --- a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts +++ b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts @@ -4,6 +4,7 @@ import { commonWorker } from "../commonWorker.server"; import { eventRepository } from "../eventRepository.server"; import { BaseService } from "./baseService.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; +import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; export class ExpireEnqueuedRunService extends BaseService { public static async ack(runId: string, tx?: PrismaClientOrTransaction) { @@ -77,22 +78,28 @@ export class ExpireEnqueuedRunService extends BaseService { }, }); - await eventRepository.completeEvent(run.spanId, { - endTime: new Date(), - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time: new Date(), - properties: { - exception: { - message: `Run expired because the TTL (${run.ttl}) was reached`, + await eventRepository.completeEvent( + getTaskEventStoreTableForRun(run), + run.spanId, + run.createdAt, + run.completedAt ?? undefined, + { + endTime: new Date(), + attributes: { + isError: true, + }, + events: [ + { + name: "exception", + time: new Date(), + properties: { + exception: { + message: `Run expired because the TTL (${run.ttl}) was reached`, + }, }, }, - }, - ], - }); + ], + } + ); } } diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 47c96a0f19..b7df21cde5 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -29,6 +29,7 @@ import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import { Prisma, TaskRun } from "@trigger.dev/database"; import { sanitizeQueueName } from "~/models/taskQueue.server"; import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server"; +import { getTaskEventStore } from "../taskEventStore.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -394,6 +395,7 @@ export class TriggerTaskService extends BaseService { delayUntil, queuedAt: delayUntil ? undefined : new Date(), maxAttempts: body.options?.maxAttempts, + taskEventStore: getTaskEventStore(), ttl, tags: tagIds.length === 0 diff --git a/apps/webapp/app/v3/taskEventStore.server.ts b/apps/webapp/app/v3/taskEventStore.server.ts new file mode 100644 index 0000000000..2a677101c2 --- /dev/null +++ b/apps/webapp/app/v3/taskEventStore.server.ts @@ -0,0 +1,176 @@ +// TaskEventStore.ts +import type { Prisma, TaskEvent } from "@trigger.dev/database"; +import type { PrismaClient, PrismaReplicaClient } from "~/db.server"; +import { env } from "~/env.server"; + +export type CommonTaskEvent = Omit; +export type TraceEvent = Pick< + TaskEvent, + | "spanId" + | "parentId" + | "runId" + | "idempotencyKey" + | "message" + | "style" + | "startTime" + | "duration" + | "isError" + | "isPartial" + | "isCancelled" + | "level" + | "events" + | "environmentType" +>; + +export type TaskEventStoreTable = "taskEvent" | "taskEventPartitioned"; + +export function getTaskEventStoreTableForRun(run: { + taskEventStore?: string; +}): TaskEventStoreTable { + return run.taskEventStore === "taskEventPartitioned" ? "taskEventPartitioned" : "taskEvent"; +} + +export function getTaskEventStore(): TaskEventStoreTable { + return env.TASK_EVENT_PARTITIONING_ENABLED === "1" ? "taskEventPartitioned" : "taskEvent"; +} + +export class TaskEventStore { + constructor(private db: PrismaClient, private readReplica: PrismaReplicaClient) {} + + /** + * Insert one record. + */ + async create(table: TaskEventStoreTable, data: Prisma.TaskEventCreateInput) { + if (table === "taskEventPartitioned") { + return await this.db.taskEventPartitioned.create({ data }); + } else { + return await this.db.taskEvent.create({ data }); + } + } + + /** + * Insert many records. + */ + async createMany(table: TaskEventStoreTable, data: Prisma.TaskEventCreateManyInput[]) { + if (table === "taskEventPartitioned") { + return await this.db.taskEventPartitioned.createMany({ data }); + } else { + return await this.db.taskEvent.createMany({ data }); + } + } + + /** + * Query records. When partitioning is enabled and a startCreatedAt is provided, + * the store will add a condition on createdAt (from startCreatedAt up to endCreatedAt, + * which defaults to now). + * + * @param where The base Prisma where filter. + * @param startCreatedAt The start of the createdAt range. + * @param endCreatedAt Optional end of the createdAt range (defaults to now). + * @param select Optional select clause. + */ + async findMany( + table: TaskEventStoreTable, + where: Prisma.TaskEventWhereInput, + startCreatedAt: Date, + endCreatedAt?: Date, + select?: TSelect, + orderBy?: Prisma.TaskEventOrderByWithRelationInput + ): Promise[]> { + let finalWhere: Prisma.TaskEventWhereInput = where; + + if (table === "taskEventPartitioned") { + // Add 1 minute to endCreatedAt to make sure we include all events in the range. + const end = endCreatedAt + ? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000) + : new Date(); + + finalWhere = { + AND: [ + where, + { + createdAt: { + gte: startCreatedAt, + lt: end, + }, + }, + ], + }; + } + + if (table === "taskEventPartitioned") { + return (await this.readReplica.taskEventPartitioned.findMany({ + where: finalWhere as Prisma.TaskEventPartitionedWhereInput, + select, + orderBy, + })) as Prisma.TaskEventGetPayload<{ select: TSelect }>[]; + } else { + // When partitioning is not enabled, we ignore the createdAt range. + return (await this.readReplica.taskEvent.findMany({ + where, + select, + orderBy, + })) as Prisma.TaskEventGetPayload<{ select: TSelect }>[]; + } + } + + async findTraceEvents( + table: TaskEventStoreTable, + traceId: string, + startCreatedAt: Date, + endCreatedAt?: Date + ) { + if (table === "taskEventPartitioned") { + return await this.readReplica.$queryRaw` + SELECT + "spanId", + "parentId", + "runId", + "idempotencyKey", + LEFT(message, 256) as message, + style, + "startTime", + duration, + "isError", + "isPartial", + "isCancelled", + level, + events, + "environmentType" + FROM "TaskEventPartitioned" + WHERE + "traceId" = ${traceId} + AND "createdAt" >= ${startCreatedAt.toISOString()}::timestamp + AND "createdAt" < ${(endCreatedAt + ? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000) + : new Date() + ).toISOString()}::timestamp + ORDER BY "startTime" ASC + LIMIT ${env.MAXIMUM_TRACE_SUMMARY_VIEW_COUNT} + `; + } else { + return await this.readReplica.$queryRaw` + SELECT + id, + "spanId", + "parentId", + "runId", + "idempotencyKey", + LEFT(message, 256) as message, + style, + "startTime", + duration, + "isError", + "isPartial", + "isCancelled", + level, + events, + "environmentType" + FROM "TaskEvent" + WHERE "traceId" = ${traceId} + ORDER BY "startTime" ASC + LIMIT ${env.MAXIMUM_TRACE_SUMMARY_VIEW_COUNT} + `; + } + } +} diff --git a/internal-packages/database/prisma/migrations/20250212053026_create_task_event_partitioned_table/migration.sql b/internal-packages/database/prisma/migrations/20250212053026_create_task_event_partitioned_table/migration.sql new file mode 100644 index 0000000000..b5abf4a68a --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250212053026_create_task_event_partitioned_table/migration.sql @@ -0,0 +1,63 @@ +-- CreateTable +CREATE TABLE "TaskEventPartitioned" ( + "id" text NOT NULL, + "message" TEXT NOT NULL, + "traceId" TEXT NOT NULL, + "spanId" TEXT NOT NULL, + "parentId" TEXT, + "tracestate" TEXT, + "isError" BOOLEAN NOT NULL DEFAULT false, + "isPartial" BOOLEAN NOT NULL DEFAULT false, + "isCancelled" BOOLEAN NOT NULL DEFAULT false, + "serviceName" TEXT NOT NULL, + "serviceNamespace" TEXT NOT NULL, + "level" "TaskEventLevel" NOT NULL DEFAULT 'TRACE', + "kind" "TaskEventKind" NOT NULL DEFAULT 'INTERNAL', + "status" "TaskEventStatus" NOT NULL DEFAULT 'UNSET', + "links" JSONB, + "events" JSONB, + "startTime" BIGINT NOT NULL, + "duration" BIGINT NOT NULL DEFAULT 0, + "attemptId" TEXT, + "attemptNumber" INTEGER, + "environmentId" TEXT NOT NULL, + "environmentType" "RuntimeEnvironmentType" NOT NULL, + "organizationId" TEXT NOT NULL, + "projectId" TEXT NOT NULL, + "projectRef" TEXT NOT NULL, + "runId" TEXT NOT NULL, + "runIsTest" BOOLEAN NOT NULL DEFAULT false, + "idempotencyKey" TEXT, + "taskSlug" TEXT NOT NULL, + "taskPath" TEXT, + "taskExportName" TEXT, + "workerId" TEXT, + "workerVersion" TEXT, + "queueId" TEXT, + "queueName" TEXT, + "batchId" TEXT, + "properties" JSONB NOT NULL, + "metadata" JSONB, + "style" JSONB, + "output" JSONB, + "outputType" TEXT, + "payload" JSONB, + "payloadType" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "usageDurationMs" INTEGER NOT NULL DEFAULT 0, + "usageCostInCents" DOUBLE PRECISION NOT NULL DEFAULT 0, + "machinePreset" TEXT, + "machinePresetCpu" DOUBLE PRECISION, + "machinePresetMemory" DOUBLE PRECISION, + "machinePresetCentsPerMs" DOUBLE PRECISION, + CONSTRAINT "TaskEventPartitioned_pkey" PRIMARY KEY ("id", "createdAt") +) PARTITION BY RANGE ("createdAt"); + +-- CreateIndex +CREATE INDEX "TaskEventPartitioned_traceId_idx" ON "TaskEventPartitioned"("traceId"); + +-- CreateIndex +CREATE INDEX "TaskEventPartitioned_spanId_idx" ON "TaskEventPartitioned"("spanId"); + +-- CreateIndex +CREATE INDEX "TaskEventPartitioned_runId_idx" ON "TaskEventPartitioned"("runId"); \ No newline at end of file diff --git a/internal-packages/database/prisma/migrations/20250212075957_add_task_event_store_to_task_run/migration.sql b/internal-packages/database/prisma/migrations/20250212075957_add_task_event_store_to_task_run/migration.sql new file mode 100644 index 0000000000..bee6739169 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250212075957_add_task_event_store_to_task_run/migration.sql @@ -0,0 +1,5 @@ +-- AlterTable +ALTER TABLE + "TaskRun" +ADD + COLUMN "taskEventStore" TEXT NOT NULL DEFAULT 'taskEvent'; \ No newline at end of file diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 577c898d4c..d676484d9b 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1729,6 +1729,8 @@ model TaskRun { /// optional token that can be used to authenticate the task run oneTimeUseToken String? + taskEventStore String @default("taskEvent") + batchItems BatchTaskRunItem[] dependency TaskRunDependency? CheckpointRestoreEvent CheckpointRestoreEvent[] @@ -2704,3 +2706,100 @@ model RealtimeStreamChunk { @@index([runId]) @@index([createdAt]) } + +/// This is the unified otel span/log model that will eventually be replaced by clickhouse +model TaskEventPartitioned { + id String @default(cuid()) + /// This matches the span name for a trace event, or the log body for a log event + message String + + traceId String + spanId String + parentId String? + tracestate String? + + isError Boolean @default(false) + isPartial Boolean @default(false) + isCancelled Boolean @default(false) + + serviceName String + serviceNamespace String + + level TaskEventLevel @default(TRACE) + kind TaskEventKind @default(INTERNAL) + status TaskEventStatus @default(UNSET) + + links Json? + events Json? + + /// This is the time the event started in nanoseconds since the epoch + startTime BigInt + + /// This is the duration of the event in nanoseconds + duration BigInt @default(0) + + attemptId String? + attemptNumber Int? + + environmentId String + environmentType RuntimeEnvironmentType + + organizationId String + + projectId String + projectRef String + + runId String + runIsTest Boolean @default(false) + + idempotencyKey String? + + taskSlug String + taskPath String? + taskExportName String? + + workerId String? + workerVersion String? + + queueId String? + queueName String? + + batchId String? + + /// This represents all the span attributes available, like http.status_code, and special attributes like $style.icon, $output, $metadata.payload.userId, as it's used for searching and filtering + properties Json + + /// This represents all span attributes in the $metadata namespace, like $metadata.payload + metadata Json? + + /// This represents all span attributes in the $style namespace, like $style + style Json? + + /// This represents all span attributes in the $output namespace, like $output + output Json? + + /// This represents the mimetype of the output, such as application/json or application/super+json + outputType String? + + payload Json? + payloadType String? + + createdAt DateTime @default(now()) + + // This represents the amount of "usage time" the event took, e.g. the CPU time + usageDurationMs Int @default(0) + usageCostInCents Float @default(0) + + machinePreset String? + machinePresetCpu Float? + machinePresetMemory Float? + machinePresetCentsPerMs Float? + + @@id([id, createdAt]) + /// Used on the run page + @@index([traceId]) + /// Used when looking up span events to complete when a run completes + @@index([spanId]) + // Used for getting all logs for a run + @@index([runId]) +}