Skip to content

Commit 5210d3a

Browse files
authored
Add internal spans to event repo (#1678)
1 parent b586762 commit 5210d3a

File tree

5 files changed

+149
-85
lines changed

5 files changed

+149
-85
lines changed

apps/webapp/app/v3/dynamicFlushScheduler.server.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import { nanoid } from "nanoid";
2+
13
export type DynamicFlushSchedulerConfig<T> = {
24
batchSize: number;
35
flushInterval: number;
4-
callback: (batch: T[]) => Promise<void>;
6+
callback: (flushId: string, batch: T[]) => Promise<void>;
57
};
68

79
export class DynamicFlushScheduler<T> {
@@ -10,7 +12,7 @@ export class DynamicFlushScheduler<T> {
1012
private readonly BATCH_SIZE: number;
1113
private readonly FLUSH_INTERVAL: number;
1214
private flushTimer: NodeJS.Timeout | null;
13-
private readonly callback: (batch: T[]) => Promise<void>;
15+
private readonly callback: (flushId: string, batch: T[]) => Promise<void>;
1416

1517
constructor(config: DynamicFlushSchedulerConfig<T>) {
1618
this.batchQueue = [];
@@ -57,7 +59,7 @@ export class DynamicFlushScheduler<T> {
5759

5860
const batchToFlush = this.batchQueue.shift();
5961
try {
60-
await this.callback(batchToFlush!);
62+
await this.callback(nanoid(), batchToFlush!);
6163
if (this.batchQueue.length > 0) {
6264
this.flushNextBatch();
6365
}

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 101 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Attributes, Link, TraceFlags } from "@opentelemetry/api";
1+
import { Attributes, Link, trace, TraceFlags, Tracer } from "@opentelemetry/api";
22
import { RandomIdGenerator } from "@opentelemetry/sdk-trace-base";
33
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
44
import {
@@ -32,6 +32,8 @@ import { singleton } from "~/utils/singleton";
3232
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
3333
import { startActiveSpan } from "./tracer.server";
3434
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";
35+
import { startSpan } from "./tracing.server";
36+
import { nanoid } from "nanoid";
3537

3638
const MAX_FLUSH_DEPTH = 5;
3739

@@ -99,6 +101,7 @@ export type EventRepoConfig = {
99101
batchInterval: number;
100102
redis: RedisWithClusterOptions;
101103
retentionInDays: number;
104+
tracer?: Tracer;
102105
};
103106

104107
export type QueryOptions = Prisma.TaskEventWhereInput;
@@ -202,6 +205,8 @@ export class EventRepository {
202205
private _randomIdGenerator = new RandomIdGenerator();
203206
private _redisPublishClient: RedisClient;
204207
private _subscriberCount = 0;
208+
private _tracer: Tracer;
209+
private _lastFlushedAt: Date | undefined;
205210

206211
get subscriberCount() {
207212
return this._subscriberCount;
@@ -219,22 +224,23 @@ export class EventRepository {
219224
});
220225

221226
this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis);
227+
this._tracer = _config.tracer ?? trace.getTracer("eventRepo", "0.0.1");
222228
}
223229

224230
async insert(event: CreatableEvent) {
225231
this._flushScheduler.addToBatch([event]);
226232
}
227233

228234
async insertImmediate(event: CreatableEvent) {
229-
await this.#flushBatch([event]);
235+
await this.#flushBatch(nanoid(), [event]);
230236
}
231237

232238
async insertMany(events: CreatableEvent[]) {
233239
this._flushScheduler.addToBatch(events);
234240
}
235241

236242
async insertManyImmediate(events: CreatableEvent[]) {
237-
return await this.#flushBatch(events);
243+
return await this.#flushBatch(nanoid(), events);
238244
}
239245

240246
async completeEvent(spanId: string, options?: UpdateEventOptions) {
@@ -1019,42 +1025,56 @@ export class EventRepository {
10191025
};
10201026
}
10211027

1022-
async #flushBatch(batch: CreatableEvent[]) {
1023-
const events = excludePartialEventsWithCorrespondingFullEvent(batch);
1028+
async #flushBatch(flushId: string, batch: CreatableEvent[]) {
1029+
return await startSpan(this._tracer, "flushBatch", async (span) => {
1030+
const events = excludePartialEventsWithCorrespondingFullEvent(batch);
10241031

1025-
const flushedEvents = await this.#doFlushBatch(events);
1032+
span.setAttribute("flush_id", flushId);
1033+
span.setAttribute("event_count", events.length);
1034+
span.setAttribute("partial_event_count", batch.length - events.length);
1035+
span.setAttribute(
1036+
"last_flush_in_ms",
1037+
this._lastFlushedAt ? new Date().getTime() - this._lastFlushedAt.getTime() : 0
1038+
);
10261039

1027-
if (flushedEvents.length !== events.length) {
1028-
logger.debug("[EventRepository][flushBatch] Failed to insert all events", {
1029-
attemptCount: events.length,
1030-
successCount: flushedEvents.length,
1031-
});
1032-
}
1040+
const flushedEvents = await this.#doFlushBatch(flushId, events);
1041+
1042+
this._lastFlushedAt = new Date();
1043+
1044+
if (flushedEvents.length !== events.length) {
1045+
logger.debug("[EventRepository][flushBatch] Failed to insert all events", {
1046+
attemptCount: events.length,
1047+
successCount: flushedEvents.length,
1048+
});
1049+
1050+
span.setAttribute("failed_event_count", events.length - flushedEvents.length);
1051+
}
10331052

1034-
this.#publishToRedis(flushedEvents);
1053+
this.#publishToRedis(flushedEvents);
1054+
});
10351055
}
10361056

1037-
async #doFlushBatch(events: CreatableEvent[], depth: number = 1): Promise<CreatableEvent[]> {
1038-
try {
1039-
await this.db.taskEvent.createMany({
1040-
data: events as Prisma.TaskEventCreateManyInput[],
1041-
});
1057+
async #doFlushBatch(
1058+
flushId: string,
1059+
events: CreatableEvent[],
1060+
depth: number = 1
1061+
): Promise<CreatableEvent[]> {
1062+
return await startSpan(this._tracer, "doFlushBatch", async (span) => {
1063+
try {
1064+
span.setAttribute("event_count", events.length);
1065+
span.setAttribute("depth", depth);
1066+
span.setAttribute("flush_id", flushId);
10421067

1043-
return events;
1044-
} catch (error) {
1045-
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
1046-
logger.error("Failed to insert events, most likely because of null characters", {
1047-
error: {
1048-
name: error.name,
1049-
message: error.message,
1050-
stack: error.stack,
1051-
clientVersion: error.clientVersion,
1052-
},
1068+
await this.db.taskEvent.createMany({
1069+
data: events as Prisma.TaskEventCreateManyInput[],
10531070
});
10541071

1055-
if (events.length === 1) {
1056-
logger.debug("Attempting to insert event individually and it failed", {
1057-
event: events[0],
1072+
span.setAttribute("inserted_event_count", events.length);
1073+
1074+
return events;
1075+
} catch (error) {
1076+
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
1077+
logger.error("Failed to insert events, most likely because of null characters", {
10581078
error: {
10591079
name: error.name,
10601080
message: error.message,
@@ -1063,38 +1083,62 @@ export class EventRepository {
10631083
},
10641084
});
10651085

1066-
return [];
1067-
}
1086+
if (events.length === 1) {
1087+
logger.debug("Attempting to insert event individually and it failed", {
1088+
event: events[0],
1089+
error: {
1090+
name: error.name,
1091+
message: error.message,
1092+
stack: error.stack,
1093+
clientVersion: error.clientVersion,
1094+
},
1095+
});
10681096

1069-
if (depth > MAX_FLUSH_DEPTH) {
1070-
logger.error("Failed to insert events, reached maximum depth", {
1071-
error: {
1072-
name: error.name,
1073-
message: error.message,
1074-
stack: error.stack,
1075-
clientVersion: error.clientVersion,
1076-
},
1077-
depth,
1078-
eventsCount: events.length,
1079-
});
1097+
span.setAttribute("failed_event_count", 1);
10801098

1081-
return [];
1082-
}
1099+
return [];
1100+
}
10831101

1084-
// Split the events into two batches, and recursively try to insert them.
1085-
const middle = Math.floor(events.length / 2);
1086-
const [firstHalf, secondHalf] = [events.slice(0, middle), events.slice(middle)];
1102+
if (depth > MAX_FLUSH_DEPTH) {
1103+
logger.error("Failed to insert events, reached maximum depth", {
1104+
error: {
1105+
name: error.name,
1106+
message: error.message,
1107+
stack: error.stack,
1108+
clientVersion: error.clientVersion,
1109+
},
1110+
depth,
1111+
eventsCount: events.length,
1112+
});
10871113

1088-
const [firstHalfEvents, secondHalfEvents] = await Promise.all([
1089-
this.#doFlushBatch(firstHalf, depth + 1),
1090-
this.#doFlushBatch(secondHalf, depth + 1),
1091-
]);
1114+
span.setAttribute("reached_max_flush_depth", true);
1115+
span.setAttribute("failed_event_count", events.length);
10921116

1093-
return firstHalfEvents.concat(secondHalfEvents);
1094-
}
1117+
return [];
1118+
}
10951119

1096-
throw error;
1097-
}
1120+
// Split the events into two batches, and recursively try to insert them.
1121+
const middle = Math.floor(events.length / 2);
1122+
const [firstHalf, secondHalf] = [events.slice(0, middle), events.slice(middle)];
1123+
1124+
return await startSpan(this._tracer, "bisectBatch", async (span) => {
1125+
span.setAttribute("first_half_count", firstHalf.length);
1126+
span.setAttribute("second_half_count", secondHalf.length);
1127+
span.setAttribute("depth", depth);
1128+
span.setAttribute("flush_id", flushId);
1129+
1130+
const [firstHalfEvents, secondHalfEvents] = await Promise.all([
1131+
this.#doFlushBatch(flushId, firstHalf, depth + 1),
1132+
this.#doFlushBatch(flushId, secondHalf, depth + 1),
1133+
]);
1134+
1135+
return firstHalfEvents.concat(secondHalfEvents);
1136+
});
1137+
}
1138+
1139+
throw error;
1140+
}
1141+
});
10981142
}
10991143

11001144
async #publishToRedis(events: CreatableEvent[]) {

apps/webapp/app/v3/otlpExporter.server.ts

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,58 +25,72 @@ import {
2525
CreatableEventEnvironmentType,
2626
} from "./eventRepository.server";
2727
import { logger } from "~/services/logger.server";
28+
import { trace, Tracer } from "@opentelemetry/api";
29+
import { startSpan } from "./tracing.server";
2830

2931
export type OTLPExporterConfig = {
3032
batchSize: number;
3133
batchInterval: number;
3234
};
3335

3436
class OTLPExporter {
37+
private _tracer: Tracer;
38+
3539
constructor(
3640
private readonly _eventRepository: EventRepository,
3741
private readonly _verbose: boolean
38-
) {}
42+
) {
43+
this._tracer = trace.getTracer("otlp-exporter");
44+
}
3945

4046
async exportTraces(
4147
request: ExportTraceServiceRequest,
4248
immediate: boolean = false
4349
): Promise<ExportTraceServiceResponse> {
44-
this.#logExportTracesVerbose(request);
50+
return await startSpan(this._tracer, "exportTraces", async (span) => {
51+
this.#logExportTracesVerbose(request);
4552

46-
const events = this.#filterResourceSpans(request.resourceSpans).flatMap((resourceSpan) => {
47-
return convertSpansToCreateableEvents(resourceSpan);
48-
});
53+
const events = this.#filterResourceSpans(request.resourceSpans).flatMap((resourceSpan) => {
54+
return convertSpansToCreateableEvents(resourceSpan);
55+
});
4956

50-
this.#logEventsVerbose(events);
57+
this.#logEventsVerbose(events);
5158

52-
if (immediate) {
53-
await this._eventRepository.insertManyImmediate(events);
54-
} else {
55-
await this._eventRepository.insertMany(events);
56-
}
59+
span.setAttribute("event_count", events.length);
60+
61+
if (immediate) {
62+
await this._eventRepository.insertManyImmediate(events);
63+
} else {
64+
await this._eventRepository.insertMany(events);
65+
}
5766

58-
return ExportTraceServiceResponse.create();
67+
return ExportTraceServiceResponse.create();
68+
});
5969
}
6070

6171
async exportLogs(
6272
request: ExportLogsServiceRequest,
6373
immediate: boolean = false
6474
): Promise<ExportLogsServiceResponse> {
65-
this.#logExportLogsVerbose(request);
75+
return await startSpan(this._tracer, "exportLogs", async (span) => {
76+
this.#logExportLogsVerbose(request);
6677

67-
const events = this.#filterResourceLogs(request.resourceLogs).flatMap((resourceLog) => {
68-
return convertLogsToCreateableEvents(resourceLog);
69-
});
78+
const events = this.#filterResourceLogs(request.resourceLogs).flatMap((resourceLog) => {
79+
return convertLogsToCreateableEvents(resourceLog);
80+
});
7081

71-
this.#logEventsVerbose(events);
82+
this.#logEventsVerbose(events);
7283

73-
if (immediate) {
74-
await this._eventRepository.insertManyImmediate(events);
75-
} else {
76-
await this._eventRepository.insertMany(events);
77-
}
84+
span.setAttribute("event_count", events.length);
7885

79-
return ExportLogsServiceResponse.create();
86+
if (immediate) {
87+
await this._eventRepository.insertManyImmediate(events);
88+
} else {
89+
await this._eventRepository.insertMany(events);
90+
}
91+
92+
return ExportLogsServiceResponse.create();
93+
});
8094
}
8195

8296
#logEventsVerbose(events: CreatableEvent[]) {

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,12 @@ export class FinalizeTaskRunService extends BaseService {
122122
const result = await resumeService.call({ id: run.id });
123123

124124
if (result.success) {
125-
logger.log("FinalizeTaskRunService: Resumed dependent parents", { result, run });
125+
logger.log("FinalizeTaskRunService: Resumed dependent parents", { result, run: run.id });
126126
} else {
127-
logger.error("FinalizeTaskRunService: Failed to resume dependent parents", { result, run });
127+
logger.error("FinalizeTaskRunService: Failed to resume dependent parents", {
128+
result,
129+
run: run.id,
130+
});
128131
}
129132

130133
//enqueue alert

apps/webapp/app/v3/tracer.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
SpanKind,
99
SpanOptions,
1010
SpanStatusCode,
11+
Tracer,
1112
diag,
1213
trace,
1314
} from "@opentelemetry/api";

0 commit comments

Comments
 (0)