|
1 | 1 | from __future__ import annotations
|
2 | 2 |
|
3 | 3 | import operator
|
4 |
| -from datetime import datetime |
| 4 | +from datetime import datetime, timedelta |
5 | 5 | from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Optional
|
6 | 6 |
|
7 | 7 | import strawberry
|
|
40 | 40 | from phoenix.server.api.types.ProjectSession import ProjectSession, to_gql_project_session
|
41 | 41 | from phoenix.server.api.types.SortDir import SortDir
|
42 | 42 | from phoenix.server.api.types.Span import Span
|
| 43 | +from phoenix.server.api.types.TimeSeries import TimeSeries, TimeSeriesDataPoint |
43 | 44 | from phoenix.server.api.types.Trace import Trace
|
44 | 45 | from phoenix.server.api.types.ValidationResult import ValidationResult
|
45 | 46 | from phoenix.trace.dsl import SpanFilter
|
@@ -639,6 +640,97 @@ async def updated_at(
|
639 | 640 | )
|
640 | 641 | return updated_at
|
641 | 642 |
|
| 643 | + @strawberry.field( |
| 644 | + description="Hourly span count for the project.", |
| 645 | + ) # type: ignore |
| 646 | + async def span_count_time_series( |
| 647 | + self, |
| 648 | + info: Info[Context, None], |
| 649 | + time_range: Optional[TimeRange] = UNSET, |
| 650 | + ) -> SpanCountTimeSeries: |
| 651 | + """Returns a time series of span counts grouped by hour for the project. |
| 652 | +
|
| 653 | + This field provides hourly aggregated span counts, which can be useful for |
| 654 | + visualizing span activity over time. The data points represent the number |
| 655 | + of spans that started in each hour. |
| 656 | +
|
| 657 | + Args: |
| 658 | + info: The GraphQL info object containing context information. |
| 659 | + time_range: Optional time range to filter the spans. If provided, only |
| 660 | + spans that started within this range will be counted. |
| 661 | +
|
| 662 | + Returns: |
| 663 | + A SpanCountTimeSeries object containing data points with timestamps |
| 664 | + (rounded to the nearest hour) and corresponding span counts. |
| 665 | +
|
| 666 | + Notes: |
| 667 | + - The timestamps are rounded down to the nearest hour. |
| 668 | + - If a time range is provided, the start time is rounded down to the |
| 669 | + nearest hour, and the end time is rounded up to the nearest hour. |
| 670 | + - The SQL query is optimized for both PostgreSQL and SQLite databases. |
| 671 | + """ |
| 672 | + # Determine the appropriate SQL function to truncate timestamps to hours |
| 673 | + # based on the database dialect |
| 674 | + if info.context.db.dialect is SupportedSQLDialect.POSTGRESQL: |
| 675 | + # PostgreSQL uses date_trunc for timestamp truncation |
| 676 | + hour = func.date_trunc("hour", models.Span.start_time) |
| 677 | + elif info.context.db.dialect is SupportedSQLDialect.SQLITE: |
| 678 | + # SQLite uses strftime for timestamp formatting |
| 679 | + hour = func.strftime("%Y-%m-%dT%H:00:00.000+00:00", models.Span.start_time) |
| 680 | + else: |
| 681 | + assert_never(info.context.db.dialect) |
| 682 | + |
| 683 | + # Build the base query to count spans grouped by hour |
| 684 | + stmt = ( |
| 685 | + select(hour, func.count()) |
| 686 | + .join(models.Trace) |
| 687 | + .where(models.Trace.project_rowid == self.project_rowid) |
| 688 | + .group_by(hour) |
| 689 | + .order_by(hour) |
| 690 | + ) |
| 691 | + |
| 692 | + # Apply time range filtering if provided |
| 693 | + if time_range: |
| 694 | + if t := time_range.start: |
| 695 | + # Round down to nearest hour for the start time |
| 696 | + start = t.replace(minute=0, second=0, microsecond=0) |
| 697 | + stmt = stmt.where(start <= models.Span.start_time) |
| 698 | + if t := time_range.end: |
| 699 | + # Round up to nearest hour for the end time |
| 700 | + # If the time is already at the start of an hour, use it as is |
| 701 | + if t.minute == 0 and t.second == 0 and t.microsecond == 0: |
| 702 | + end = t |
| 703 | + else: |
| 704 | + # Otherwise, round up to the next hour |
| 705 | + end = t.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) |
| 706 | + stmt = stmt.where(models.Span.start_time < end) |
| 707 | + |
| 708 | + # Execute the query and convert the results to a time series |
| 709 | + async with info.context.db() as session: |
| 710 | + data = await session.stream(stmt) |
| 711 | + return SpanCountTimeSeries( |
| 712 | + data=[ |
| 713 | + TimeSeriesDataPoint( |
| 714 | + timestamp=_as_datetime(t), |
| 715 | + value=v, |
| 716 | + ) |
| 717 | + async for t, v in data |
| 718 | + ] |
| 719 | + ) |
| 720 | + |
| 721 | + |
| 722 | +@strawberry.type |
| 723 | +class SpanCountTimeSeries(TimeSeries): |
| 724 | + """A time series of span count""" |
| 725 | + |
642 | 726 |
|
643 | 727 | INPUT_VALUE = SpanAttributes.INPUT_VALUE.split(".")
|
644 | 728 | OUTPUT_VALUE = SpanAttributes.OUTPUT_VALUE.split(".")
|
| 729 | + |
| 730 | + |
| 731 | +def _as_datetime(value: Any) -> datetime: |
| 732 | + if isinstance(value, datetime): |
| 733 | + return value |
| 734 | + if isinstance(value, str): |
| 735 | + return datetime.fromisoformat(value) |
| 736 | + raise ValueError(f"Cannot convert {value} to datetime") |
0 commit comments