Skip to content

Commit 6aa6b1f

Browse files
committed
refactor: Simplify processSegment and processSpan functions
Move transaction down the stack and job enqueue to side effect callback
1 parent 06d56eb commit 6aa6b1f

File tree

3 files changed

+232
-237
lines changed

3 files changed

+232
-237
lines changed

packages/core/src/events/events.d.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import type {
2121
ProviderLog,
2222
ProviderLogDto,
2323
Providers,
24-
Segment,
25-
Span,
2624
User,
2725
Workspace,
2826
} from '../browser'

packages/core/src/services/tracing/segments/process.ts

Lines changed: 134 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -61,82 +61,82 @@ export async function processSegment(
6161
transaction = new Transaction(),
6262
disk: DiskWrapper = diskFactory('private'),
6363
) {
64-
return await transaction.call(async (tx) => {
65-
const validating = validateSegmentChain(args.segment, args.chain)
66-
if (validating.error) return Result.error(validating.error)
64+
const validating = validateSegmentChain(args.segment, args.chain)
65+
if (validating.error) return Result.error(validating.error)
6766

68-
// const getting = await getState(args, transaction, disk)
69-
// if (getting.error) return Result.error(getting.error)
70-
// const state = getting.value
67+
const getting = await getState(args, disk)
68+
if (getting.error) return Result.error(getting.error)
69+
const state = getting.value
7170

72-
const id = state.current?.id ?? state.segment.id
71+
const id = state.current?.id ?? state.segment.id
7372

74-
const traceId = state.current?.traceId ?? state.traceId
73+
const traceId = state.current?.traceId ?? state.traceId
7574

76-
const parentId = state.current?.parentId ?? state.chain.at(-1)?.id
75+
const parentId = state.current?.parentId ?? state.chain.at(-1)?.id
7776

78-
const type = state.current?.type ?? state.segment.type
77+
const type = state.current?.type ?? state.segment.type
7978

80-
// const specification = SEGMENT_SPECIFICATIONS[type]
81-
// if (!specification) {
82-
// return Result.error(new UnprocessableEntityError('Invalid segment type'))
83-
// }
79+
// const specification = SEGMENT_SPECIFICATIONS[type]
80+
// if (!specification) {
81+
// return Result.error(new UnprocessableEntityError('Invalid segment type'))
82+
// }
8483

85-
// let metadata = {
86-
// ...({
87-
// traceId: traceId,
88-
// segmentId: id,
89-
// type: type,
90-
// } satisfies BaseSegmentMetadata),
91-
// } as SegmentMetadata
84+
// let metadata = {
85+
// ...({
86+
// traceId: traceId,
87+
// segmentId: id,
88+
// type: type,
89+
// } satisfies BaseSegmentMetadata),
90+
// } as SegmentMetadata
9291

93-
// // @ts-expect-error seems typescript cannot infer that state types are the same
94-
// const processing = await specification.process(state, tx)
95-
// if (processing.error) return Result.error(processing.error)
96-
// metadata = { ...metadata, ...processing.value }
92+
// // @ts-expect-error seems typescript cannot infer that state types are the same
93+
// const processing = await specification.process(state, tx)
94+
// if (processing.error) return Result.error(processing.error)
95+
// metadata = { ...metadata, ...processing.value }
9796

98-
// // Note: edge case when the global document segment is being processed right now for the first time
99-
// if (metadata.type === SegmentType.Document) {
100-
// state.run = { metadata } as SegmentWithDetails<SegmentType.Document>
101-
// }
97+
// // Note: edge case when the global document segment is being processed right now for the first time
98+
// if (metadata.type === SegmentType.Document) {
99+
// state.run = { metadata } as SegmentWithDetails<SegmentType.Document>
100+
// }
102101

103-
// const computingei = computeExternalId(state)
104-
// if (computingei.error) return Result.error(computingei.error)
105-
// const externalId = computingei.value
102+
// const computingei = computeExternalId(state)
103+
// if (computingei.error) return Result.error(computingei.error)
104+
// const externalId = computingei.value
106105

107-
// const enrichingnm = enrichName(state)
108-
// if (enrichingnm.error) return Result.error(enrichingnm.error)
109-
// const name = enrichingnm.value
106+
// const enrichingnm = enrichName(state)
107+
// if (enrichingnm.error) return Result.error(enrichingnm.error)
108+
// const name = enrichingnm.value
110109

111-
// const computingsc = computeSource(state)
112-
// if (computingsc.error) return Result.error(computingsc.error)
113-
// const source = computingsc.value
110+
// const computingsc = computeSource(state)
111+
// if (computingsc.error) return Result.error(computingsc.error)
112+
// const source = computingsc.value
114113

115-
// const computingst = computeStatus(state)
116-
// if (computingst.error) return Result.error(computingst.error)
117-
// const { status, message } = computingst.value
114+
// const computingst = computeStatus(state)
115+
// if (computingst.error) return Result.error(computingst.error)
116+
// const { status, message } = computingst.value
118117

119-
const computinglu = computeLogUuid(state)
120-
if (computinglu.error) return Result.error(computinglu.error)
121-
const logUuid = computinglu.value
118+
const computinglu = computeLogUuid(state)
119+
if (computinglu.error) return Result.error(computinglu.error)
120+
const logUuid = computinglu.value
122121

123-
// const computingdc = computeDocument(state)
124-
// if (computingdc.error) return Result.error(computingdc.error)
125-
// const { commitUuid, documentUuid, documentHash,
126-
// documentType, provider, model } = computingdc.value // prettier-ignore
122+
// const computingdc = computeDocument(state)
123+
// if (computingdc.error) return Result.error(computingdc.error)
124+
// const { commitUuid, documentUuid, documentHash,
125+
// documentType, provider, model } = computingdc.value // prettier-ignore
127126

128-
// const computingeu = computeExperimentUuid(state)
129-
// if (computingeu.error) return Result.error(computingeu.error)
130-
// const experimentUuid = computingeu.value
127+
// const computingeu = computeExperimentUuid(state)
128+
// if (computingeu.error) return Result.error(computingeu.error)
129+
// const experimentUuid = computingeu.value
131130

132-
// const computingsa = computeStatistics(state)
133-
// if (computingsa.error) return Result.error(computingsa.error)
134-
// const { tokens, cost, duration } = computingsa.value
131+
// const computingsa = computeStatistics(state)
132+
// if (computingsa.error) return Result.error(computingsa.error)
133+
// const { tokens, cost, duration } = computingsa.value
135134

136-
// const computingts = computeTimestamps(state)
137-
// if (computingts.error) return Result.error(computingts.error)
138-
// const { startedAt, endedAt } = computingts.value
135+
// const computingts = computeTimestamps(state)
136+
// if (computingts.error) return Result.error(computingts.error)
137+
// const { startedAt, endedAt } = computingts.value
139138

139+
return await transaction.call(async (tx) => {
140140
const repository = new SegmentsRepository(args.workspace.id, tx)
141141
const locking = await repository.lock({ segmentId: id, traceId })
142142
if (locking.error) {
@@ -380,106 +380,95 @@ async function getState(
380380
apiKey: ApiKey
381381
workspace: Workspace
382382
},
383-
transaction = new Transaction(),
384383
disk: DiskWrapper,
384+
db = database,
385385
): Promise<TypedResult<SegmentProcessArgs>> {
386-
return await transaction.call(async (tx) => {
387-
const gettingcs = await getCurrentState(
388-
{ segment, traceId, workspace },
389-
tx,
390-
disk,
391-
)
392-
if (gettingcs.error) return Result.error(gettingcs.error)
393-
const current = gettingcs.value
386+
const gettingcs = await getCurrentState(
387+
{ segment, traceId, workspace },
388+
db,
389+
disk,
390+
)
391+
if (gettingcs.error) return Result.error(gettingcs.error)
392+
const current = gettingcs.value
394393

395-
const gettingrs = await getRunState(
396-
{ segment, traceId, workspace },
397-
tx,
398-
disk,
399-
)
400-
if (gettingrs.error) return Result.error(gettingrs.error)
401-
const run = gettingrs.value
394+
const gettingrs = await getRunState({ segment, traceId, workspace }, db, disk)
395+
if (gettingrs.error) return Result.error(gettingrs.error)
396+
const run = gettingrs.value
402397

403-
const gettinghs = await getChildState(
404-
{ childId, childType, traceId, workspace },
405-
tx,
406-
disk,
398+
const gettinghs = await getChildState(
399+
{ childId, childType, traceId, workspace },
400+
db,
401+
disk,
402+
)
403+
if (gettinghs.error) return Result.error(gettinghs.error)
404+
const child = gettinghs.value
405+
406+
let source = segment.source as SegmentSource | undefined
407+
if (!source) source = inheritField<SegmentSource>('source', chain)
408+
if (!source) source = run?.source
409+
if (!source) source = current?.source
410+
if (!source) {
411+
return Result.error(
412+
new UnprocessableEntityError('Segment source is required'),
407413
)
408-
if (gettinghs.error) return Result.error(gettinghs.error)
409-
const child = gettinghs.value
410-
411-
let source = segment.source as SegmentSource | undefined
412-
if (!source) source = inheritField<SegmentSource>('source', chain)
413-
if (!source) source = run?.source
414-
if (!source) source = current?.source
415-
if (!source) {
416-
return Result.error(
417-
new UnprocessableEntityError('Segment source is required'),
418-
)
419-
}
414+
}
420415

421-
let commitUuid = segment.data?.commitUuid
422-
if (!commitUuid) commitUuid = inheritField<string>('commitUuid', chain) // prettier-ignore
423-
if (!commitUuid) commitUuid = run?.commitUuid
424-
if (!commitUuid) commitUuid = current?.commitUuid
425-
if (!commitUuid) {
426-
return Result.error(
427-
new UnprocessableEntityError('Commit uuid is required'),
428-
)
429-
}
416+
let commitUuid = segment.data?.commitUuid
417+
if (!commitUuid) commitUuid = inheritField<string>('commitUuid', chain) // prettier-ignore
418+
if (!commitUuid) commitUuid = run?.commitUuid
419+
if (!commitUuid) commitUuid = current?.commitUuid
420+
if (!commitUuid) {
421+
return Result.error(new UnprocessableEntityError('Commit uuid is required'))
422+
}
430423

431-
let documentUuid = segment.data?.documentUuid
432-
if (!documentUuid) documentUuid = inheritField<string>('documentUuid', chain) // prettier-ignore
433-
if (!documentUuid) documentUuid = run?.documentUuid
434-
if (!documentUuid) documentUuid = current?.documentUuid
435-
if (!documentUuid) {
436-
return Result.error(
437-
new UnprocessableEntityError('Document uuid is required'),
438-
)
439-
}
424+
let documentUuid = segment.data?.documentUuid
425+
if (!documentUuid) documentUuid = inheritField<string>('documentUuid', chain) // prettier-ignore
426+
if (!documentUuid) documentUuid = run?.documentUuid
427+
if (!documentUuid) documentUuid = current?.documentUuid
428+
if (!documentUuid) {
429+
return Result.error(
430+
new UnprocessableEntityError('Document uuid is required'),
431+
)
432+
}
440433

441-
const commitsRepository = new CommitsRepository(workspace.id, tx)
442-
const gettingco = await commitsRepository.getCommitByUuid({
443-
uuid: commitUuid,
434+
const commitsRepository = new CommitsRepository(workspace.id, db)
435+
const gettingco = await commitsRepository.getCommitByUuid({
436+
uuid: commitUuid,
437+
})
438+
if (gettingco.error) return Result.error(gettingco.error)
439+
const commit = gettingco.value
440+
441+
let document
442+
let evaluation
443+
if (source === SegmentSource.Evaluation) {
444+
// TODO(tracing): we actually don't have the a repository method
445+
// to get the versioned evaluation without the document uuid
446+
evaluation = undefined as unknown as EvaluationV2<EvaluationType.Llm>
447+
} else {
448+
const documentsRepository = new DocumentVersionsRepository(workspace.id, db)
449+
const getting = await documentsRepository.getDocumentAtCommit({
450+
commitUuid: commitUuid,
451+
documentUuid: documentUuid,
444452
})
445-
if (gettingco.error) return Result.error(gettingco.error)
446-
const commit = gettingco.value
447-
448-
let document
449-
let evaluation
450-
if (source === SegmentSource.Evaluation) {
451-
// TODO(tracing): we actually don't have the a repository method
452-
// to get the versioned evaluation without the document uuid
453-
evaluation = undefined as unknown as EvaluationV2<EvaluationType.Llm>
454-
} else {
455-
const documentsRepository = new DocumentVersionsRepository(
456-
workspace.id,
457-
tx,
458-
)
459-
const getting = await documentsRepository.getDocumentAtCommit({
460-
commitUuid: commitUuid,
461-
documentUuid: documentUuid,
462-
})
463-
if (getting.error) return Result.error(getting.error)
453+
if (getting.error) return Result.error(getting.error)
464454

465-
const scanning = await scan({ prompt: getting.value.content })
455+
const scanning = await scan({ prompt: getting.value.content })
466456

467-
document = { ...getting.value, config: scanning.config }
468-
}
457+
document = { ...getting.value, config: scanning.config }
458+
}
469459

470-
return Result.ok({
471-
segment: segment,
472-
chain: chain,
473-
child: child,
474-
traceId: traceId,
475-
current: current,
476-
run: run,
477-
document: document,
478-
evaluation: evaluation,
479-
commit: commit,
480-
apiKey: apiKey,
481-
workspace: workspace,
482-
})
460+
return Result.ok({
461+
segment: segment,
462+
chain: chain,
463+
child: child,
464+
traceId: traceId,
465+
current: current,
466+
run: run,
467+
document: document,
468+
evaluation: evaluation,
469+
commit: commit,
470+
apiKey: apiKey,
471+
workspace: workspace,
483472
})
484473
}
485474

0 commit comments

Comments
 (0)