Skip to content

Commit beb1c9c

Browse files
axiomofjoymikeldking
authored andcommitted
fix(annotations): revert bulk insert (#7456)
1 parent 11656a1 commit beb1c9c

File tree

10 files changed

+1111
-723
lines changed

10 files changed

+1111
-723
lines changed

.github/workflows/python-CI.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ jobs:
266266
strategy:
267267
fail-fast: false
268268
matrix:
269-
py: [3.12, 3.13]
269+
py: [3.13]
270270
os: [ubuntu-latest]
271271
steps:
272272
- name: Checkout repository
@@ -291,7 +291,7 @@ jobs:
291291
strategy:
292292
fail-fast: false
293293
matrix:
294-
py: [3.9, 3.12, 3.13]
294+
py: [3.9, 3.13]
295295
os: [ubuntu-latest]
296296
steps:
297297
- name: Checkout repository
@@ -329,7 +329,7 @@ jobs:
329329
strategy:
330330
fail-fast: false
331331
matrix:
332-
py: [3.9, 3.12, 3.13]
332+
py: [3.9, 3.13]
333333
os: [ubuntu-latest]
334334
steps:
335335
- name: Checkout repository
@@ -368,7 +368,7 @@ jobs:
368368
strategy:
369369
fail-fast: false
370370
matrix:
371-
py: [3.9, 3.12, 3.13]
371+
py: [3.9, 3.13]
372372
os: [ubuntu-latest, windows-latest, macos-latest, macos-13]
373373
exclude:
374374
- py: 3.13
@@ -416,7 +416,7 @@ jobs:
416416
strategy:
417417
fail-fast: false
418418
matrix:
419-
py: [3.9, 3.12, 3.13]
419+
py: [3.9, 3.13]
420420
os: [ubuntu-latest]
421421
steps:
422422
- name: Checkout repository
@@ -454,7 +454,7 @@ jobs:
454454
strategy:
455455
fail-fast: false
456456
matrix:
457-
py: [3.9, 3.12, 3.13]
457+
py: [3.9, 3.13]
458458
db: [sqlite, postgresql]
459459
os: [ubuntu-latest, windows-latest, macos-latest, macos-13]
460460
exclude:

src/phoenix/db/insertion/document_annotation.py

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
from collections.abc import Mapping
22
from datetime import datetime
3-
from typing import NamedTuple, Optional
3+
from typing import Any, NamedTuple, Optional
44

5-
from sqlalchemy import insert, select
5+
from sqlalchemy import Row, Select, and_, select, tuple_
66
from sqlalchemy.ext.asyncio import AsyncSession
77
from typing_extensions import TypeAlias
88

99
from phoenix.db import models
10-
from phoenix.db.helpers import num_docs_col
10+
from phoenix.db.helpers import dedup, num_docs_col
1111
from phoenix.db.insertion.helpers import as_kv
1212
from phoenix.db.insertion.types import (
1313
Insertables,
@@ -24,16 +24,25 @@
2424
_DocumentPosition: TypeAlias = int
2525
_AnnoRowId: TypeAlias = int
2626
_NumDocs: TypeAlias = int
27+
_Identifier: TypeAlias = str
2728

28-
_Key: TypeAlias = tuple[_Name, _SpanId, _DocumentPosition]
29-
_UniqueBy: TypeAlias = tuple[_Name, _SpanRowId, _DocumentPosition]
29+
30+
class _Key(NamedTuple):
31+
annotation_name: _Name
32+
annotation_identifier: _Identifier
33+
span_id: _SpanId
34+
document_position: _DocumentPosition
35+
36+
37+
_UniqueBy: TypeAlias = tuple[_Name, _SpanRowId, _DocumentPosition, _Identifier]
3038
_Existing: TypeAlias = tuple[
3139
_SpanRowId,
3240
_SpanId,
3341
_NumDocs,
3442
Optional[_AnnoRowId],
3543
Optional[_Name],
3644
Optional[_DocumentPosition],
45+
Optional[_Identifier],
3746
Optional[datetime],
3847
]
3948

@@ -46,15 +55,16 @@ class DocumentAnnotationQueueInserter(
4655
DocumentAnnotationDmlEvent,
4756
],
4857
table=models.DocumentAnnotation,
49-
unique_by=(),
58+
unique_by=("name", "span_rowid", "document_position", "identifier"),
59+
constraint_name="uq_document_annotations_name_span_rowid_document_pos_identifier",
5060
):
5161
async def _events(
5262
self,
5363
session: AsyncSession,
5464
*insertions: Insertables.DocumentAnnotation,
5565
) -> list[DocumentAnnotationDmlEvent]:
5666
records = [dict(as_kv(ins.row)) for ins in insertions]
57-
stmt = insert(self.table).values(records).returning(self.table.id)
67+
stmt = self._insert_on_conflict(*records).returning(self.table.id)
5868
ids = tuple([_ async for _ in await session.stream_scalars(stmt)])
5969
return [DocumentAnnotationDmlEvent(ids)]
6070

@@ -71,19 +81,37 @@ async def _partition(
7181
to_postpone: list[Postponed[Precursors.DocumentAnnotation]] = []
7282
to_discard: list[Received[Precursors.DocumentAnnotation]] = []
7383

74-
span_ids = {p.item.span_id for p in parcels}
75-
stmt = select(models.Span.id, models.Span.span_id, num_docs_col(self._db.dialect)).where(
76-
models.Span.span_id.in_(span_ids)
77-
)
78-
result = await session.execute(stmt)
79-
spans = result.all()
84+
stmt = self._select_existing(*map(_key, parcels))
85+
existing: list[Row[_Existing]] = [_ async for _ in await session.stream(stmt)]
8086
existing_spans: Mapping[str, _SpanAttr] = {
81-
row.span_id: _SpanAttr(row.id, row.num_docs) for row in spans
87+
e.span_id: _SpanAttr(e.span_rowid, e.num_docs) for e in existing
88+
}
89+
existing_annos: Mapping[_Key, _AnnoAttr] = {
90+
_Key(
91+
annotation_name=e.name,
92+
annotation_identifier=e.identifier,
93+
span_id=e.span_id,
94+
document_position=e.document_position,
95+
): _AnnoAttr(e.span_rowid, e.id, e.updated_at)
96+
for e in existing
97+
if e.id is not None and e.name is not None and e.updated_at is not None
8298
}
8399

84100
for p in parcels:
85-
if p.item.span_id in existing_spans:
86-
span = existing_spans[p.item.span_id]
101+
if (anno := existing_annos.get(_key(p))) is not None:
102+
if p.received_at <= anno.updated_at:
103+
to_discard.append(p)
104+
else:
105+
to_insert.append(
106+
Received(
107+
received_at=p.received_at,
108+
item=p.item.as_insertable(
109+
span_rowid=anno.span_rowid,
110+
id_=anno.id_,
111+
),
112+
)
113+
)
114+
elif (span := existing_spans.get(p.item.span_id)) is not None:
87115
if 0 <= p.item.document_position < span.num_docs:
88116
to_insert.append(
89117
Received(
@@ -106,9 +134,56 @@ async def _partition(
106134
to_discard.append(p)
107135

108136
assert len(to_insert) + len(to_postpone) + len(to_discard) == len(parcels)
137+
to_insert = dedup(sorted(to_insert, key=_time, reverse=True), _unique_by)[::-1]
109138
return to_insert, to_postpone, to_discard
110139

140+
def _select_existing(self, *keys: _Key) -> Select[_Existing]:
141+
anno = self.table
142+
span = (
143+
select(models.Span.id, models.Span.span_id, num_docs_col(self._db.dialect))
144+
.where(models.Span.span_id.in_({k.span_id for k in keys}))
145+
.cte()
146+
)
147+
onclause = and_(
148+
span.c.id == anno.span_rowid,
149+
anno.name.in_({k.annotation_name for k in keys}),
150+
tuple_(anno.name, anno.identifier, span.c.span_id, anno.document_position).in_(keys),
151+
)
152+
return select(
153+
span.c.id.label("span_rowid"),
154+
span.c.span_id,
155+
span.c.num_docs,
156+
anno.id,
157+
anno.name,
158+
anno.document_position,
159+
anno.identifier,
160+
anno.updated_at,
161+
).outerjoin_from(span, anno, onclause)
162+
111163

112164
class _SpanAttr(NamedTuple):
113165
span_rowid: _SpanRowId
114166
num_docs: _NumDocs
167+
168+
169+
class _AnnoAttr(NamedTuple):
170+
span_rowid: _SpanRowId
171+
id_: _AnnoRowId
172+
updated_at: datetime
173+
174+
175+
def _key(p: Received[Precursors.DocumentAnnotation]) -> _Key:
176+
return _Key(
177+
annotation_name=p.item.obj.name,
178+
annotation_identifier=p.item.obj.identifier,
179+
span_id=p.item.span_id,
180+
document_position=p.item.document_position,
181+
)
182+
183+
184+
def _unique_by(p: Received[Insertables.DocumentAnnotation]) -> _UniqueBy:
185+
return p.item.obj.name, p.item.span_rowid, p.item.document_position, p.item.identifier
186+
187+
188+
def _time(p: Received[Any]) -> datetime:
189+
return p.received_at

src/phoenix/db/insertion/span_annotation.py

Lines changed: 94 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
from collections.abc import Mapping
22
from datetime import datetime
3-
from typing import Optional
3+
from typing import Any, NamedTuple, Optional
44

5-
from sqlalchemy import insert, select
5+
from sqlalchemy import Row, Select, and_, select, tuple_
66
from sqlalchemy.ext.asyncio import AsyncSession
77
from typing_extensions import TypeAlias
88

99
from phoenix.db import models
10+
from phoenix.db.helpers import dedup
1011
from phoenix.db.insertion.helpers import as_kv
1112
from phoenix.db.insertion.types import (
1213
Insertables,
@@ -20,15 +21,23 @@
2021
_Name: TypeAlias = str
2122
_SpanId: TypeAlias = str
2223
_SpanRowId: TypeAlias = int
24+
_Identifier: TypeAlias = str
2325
_AnnoRowId: TypeAlias = int
2426

25-
_Key: TypeAlias = tuple[_Name, _SpanId]
26-
_UniqueBy: TypeAlias = tuple[_Name, _SpanRowId]
27+
28+
class _Key(NamedTuple):
29+
annotation_name: _Name
30+
annotation_identifier: _Identifier
31+
span_id: _SpanId
32+
33+
34+
_UniqueBy: TypeAlias = tuple[_Name, _SpanRowId, _Identifier]
2735
_Existing: TypeAlias = tuple[
2836
_SpanRowId,
2937
_SpanId,
3038
Optional[_AnnoRowId],
3139
Optional[_Name],
40+
Optional[_Identifier],
3241
Optional[datetime],
3342
]
3443

@@ -41,15 +50,15 @@ class SpanAnnotationQueueInserter(
4150
SpanAnnotationDmlEvent,
4251
],
4352
table=models.SpanAnnotation,
44-
unique_by=(),
53+
unique_by=("name", "span_rowid", "identifier"),
4554
):
4655
async def _events(
4756
self,
4857
session: AsyncSession,
4958
*insertions: Insertables.SpanAnnotation,
5059
) -> list[SpanAnnotationDmlEvent]:
5160
records = [dict(as_kv(ins.row)) for ins in insertions]
52-
stmt = insert(self.table).values(records).returning(self.table.id)
61+
stmt = self._insert_on_conflict(*records).returning(self.table.id)
5362
ids = tuple([_ async for _ in await session.stream_scalars(stmt)])
5463
return [SpanAnnotationDmlEvent(ids)]
5564

@@ -66,18 +75,42 @@ async def _partition(
6675
to_postpone: list[Postponed[Precursors.SpanAnnotation]] = []
6776
to_discard: list[Received[Precursors.SpanAnnotation]] = []
6877

69-
span_ids = {p.item.span_id for p in parcels}
70-
stmt = select(models.Span.id, models.Span.span_id).where(models.Span.span_id.in_(span_ids))
71-
result = await session.execute(stmt)
72-
spans = result.all()
73-
existing_spans: Mapping[str, int] = {row.span_id: row.id for row in spans}
78+
stmt = self._select_existing(*map(_key, parcels))
79+
existing: list[Row[_Existing]] = [_ async for _ in await session.stream(stmt)]
80+
existing_spans: Mapping[str, _SpanAttr] = {
81+
e.span_id: _SpanAttr(e.span_rowid) for e in existing
82+
}
83+
existing_annos: Mapping[_Key, _AnnoAttr] = {
84+
_Key(
85+
annotation_name=e.name,
86+
annotation_identifier=e.identifier,
87+
span_id=e.span_id,
88+
): _AnnoAttr(e.span_rowid, e.id, e.updated_at)
89+
for e in existing
90+
if e.id is not None and e.name is not None and e.updated_at is not None
91+
}
7492

7593
for p in parcels:
76-
if p.item.span_id in existing_spans:
94+
if (anno := existing_annos.get(_key(p))) is not None:
95+
if p.received_at <= anno.updated_at:
96+
to_discard.append(p)
97+
else:
98+
to_insert.append(
99+
Received(
100+
received_at=p.received_at,
101+
item=p.item.as_insertable(
102+
span_rowid=anno.span_rowid,
103+
id_=anno.id_,
104+
),
105+
)
106+
)
107+
elif (span := existing_spans.get(p.item.span_id)) is not None:
77108
to_insert.append(
78109
Received(
79110
received_at=p.received_at,
80-
item=p.item.as_insertable(span_rowid=existing_spans[p.item.span_id]),
111+
item=p.item.as_insertable(
112+
span_rowid=span.span_rowid,
113+
),
81114
)
82115
)
83116
elif isinstance(p, Postponed):
@@ -91,4 +124,52 @@ async def _partition(
91124
to_discard.append(p)
92125

93126
assert len(to_insert) + len(to_postpone) + len(to_discard) == len(parcels)
127+
to_insert = dedup(sorted(to_insert, key=_time, reverse=True), _unique_by)[::-1]
94128
return to_insert, to_postpone, to_discard
129+
130+
def _select_existing(self, *keys: _Key) -> Select[_Existing]:
131+
anno = self.table
132+
span = (
133+
select(models.Span.id, models.Span.span_id)
134+
.where(models.Span.span_id.in_({k.span_id for k in keys}))
135+
.cte()
136+
)
137+
onclause = and_(
138+
span.c.id == anno.span_rowid,
139+
anno.name.in_({k.annotation_name for k in keys}),
140+
tuple_(anno.name, anno.identifier, span.c.span_id).in_(keys),
141+
)
142+
return select(
143+
span.c.id.label("span_rowid"),
144+
span.c.span_id,
145+
anno.id,
146+
anno.name,
147+
anno.identifier,
148+
anno.updated_at,
149+
).outerjoin_from(span, anno, onclause)
150+
151+
152+
class _SpanAttr(NamedTuple):
153+
span_rowid: _SpanRowId
154+
155+
156+
class _AnnoAttr(NamedTuple):
157+
span_rowid: _SpanRowId
158+
id_: _AnnoRowId
159+
updated_at: datetime
160+
161+
162+
def _key(p: Received[Precursors.SpanAnnotation]) -> _Key:
163+
return _Key(
164+
annotation_name=p.item.obj.name,
165+
annotation_identifier=p.item.obj.identifier,
166+
span_id=p.item.span_id,
167+
)
168+
169+
170+
def _unique_by(p: Received[Insertables.SpanAnnotation]) -> _UniqueBy:
171+
return p.item.obj.name, p.item.span_rowid, p.item.identifier
172+
173+
174+
def _time(p: Received[Any]) -> datetime:
175+
return p.received_at

0 commit comments

Comments
 (0)