Skip to content

Commit 37659e3

Browse files
authored
feat(uptime): Write converter function to convert uptime result data into an EAP TraceItems (#94146)
This takes uptime result data and converts it into EAP trace items. We will have one `TraceItem` per request made in an uptime check. Usually, there will only be one request, but when we have redirects there can be multiple. It turned out to be simpler for querying purposes to denormalize the data on each row. Querying this data will look something like `select trace_id from eap where <filters>`, then `select * from eap where trace_id = <trace_id>`. This will allow us to construct the whole uptime result when we query. <!-- Describe your PR here. -->
1 parent d48435f commit 37659e3

File tree

2 files changed

+534
-0
lines changed

2 files changed

+534
-0
lines changed
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
"""
2+
Converts uptime results to EAP TraceItem format.
3+
4+
This module handles the conversion of uptime check results into denormalized
5+
TraceItem format for the Events Analytics Platform (EAP). Each TraceItem
6+
represents one HTTP request with check-level metadata duplicated across
7+
all requests in a redirect chain.
8+
"""
9+
10+
import logging
11+
from collections.abc import MutableMapping
12+
13+
from google.protobuf.timestamp_pb2 import Timestamp
14+
from sentry_kafka_schemas.schema_types.uptime_results_v1 import (
15+
CheckResult,
16+
RequestDurations,
17+
RequestInfo,
18+
)
19+
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
20+
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem
21+
22+
from sentry import quotas
23+
from sentry.models.project import Project
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
def _anyvalue(value: bool | str | int | float) -> AnyValue:
29+
if isinstance(value, bool):
30+
return AnyValue(bool_value=value)
31+
elif isinstance(value, str):
32+
return AnyValue(string_value=value)
33+
elif isinstance(value, int):
34+
return AnyValue(int_value=value)
35+
elif isinstance(value, float):
36+
return AnyValue(double_value=value)
37+
else:
38+
raise ValueError(f"Invalid value type for AnyValue: {type(value)}")
39+
40+
41+
def ms_to_us(milliseconds: float | int) -> int:
42+
"""Convert milliseconds to microseconds."""
43+
return int(milliseconds * 1000)
44+
45+
46+
def _timestamp(timestamp_ms: float) -> Timestamp:
47+
"""Convert timestamp in milliseconds to protobuf Timestamp."""
48+
timestamp = Timestamp()
49+
timestamp.FromMilliseconds(int(timestamp_ms))
50+
return timestamp
51+
52+
53+
def convert_uptime_request_to_trace_item(
54+
project: Project,
55+
result: CheckResult,
56+
request_info: RequestInfo | None,
57+
request_sequence: int,
58+
item_id: bytes,
59+
) -> TraceItem:
60+
"""
61+
Convert an individual request to a denormalized UptimeResult TraceItem.
62+
63+
This creates a TraceItem that includes both check-level metadata, which is duplicated
64+
and request-specific data for unified querying.
65+
66+
In the case of misses, we'll have one row and the request_info will be empty.
67+
"""
68+
attributes: MutableMapping[str, AnyValue] = {}
69+
70+
attributes["guid"] = _anyvalue(result["guid"])
71+
attributes["subscription_id"] = _anyvalue(result["subscription_id"])
72+
attributes["check_status"] = _anyvalue(result["status"])
73+
if "region" in result:
74+
attributes["region"] = _anyvalue(result["region"])
75+
76+
attributes["scheduled_check_time_us"] = _anyvalue(ms_to_us(result["scheduled_check_time_ms"]))
77+
attributes["actual_check_time_us"] = _anyvalue(ms_to_us(result["actual_check_time_ms"]))
78+
79+
duration_ms = result["duration_ms"]
80+
if duration_ms is not None:
81+
attributes["check_duration_us"] = _anyvalue(ms_to_us(duration_ms))
82+
83+
status_reason = result["status_reason"]
84+
if status_reason is not None:
85+
attributes["status_reason_type"] = _anyvalue(status_reason["type"])
86+
attributes["status_reason_description"] = _anyvalue(status_reason["description"])
87+
88+
if "request_info_list" in result and result["request_info_list"]:
89+
first_request = result["request_info_list"][0]
90+
attributes["method"] = _anyvalue(first_request["request_type"])
91+
if "url" in first_request:
92+
# This should always be here once we start passing url, but for backwards compat
93+
# we should be cautious here
94+
attributes["original_url"] = _anyvalue(first_request["url"])
95+
96+
attributes["check_id"] = _anyvalue(result["guid"])
97+
attributes["request_sequence"] = _anyvalue(request_sequence)
98+
99+
if request_info is not None:
100+
attributes["request_type"] = _anyvalue(request_info["request_type"])
101+
http_status_code = request_info["http_status_code"]
102+
if http_status_code is not None:
103+
attributes["http_status_code"] = _anyvalue(http_status_code)
104+
105+
if "url" in request_info:
106+
attributes["request_url"] = _anyvalue(request_info["url"])
107+
if "request_body_size_bytes" in request_info:
108+
attributes["request_body_size_bytes"] = _anyvalue(
109+
request_info["request_body_size_bytes"]
110+
)
111+
if "response_body_size_bytes" in request_info:
112+
attributes["response_body_size_bytes"] = _anyvalue(
113+
request_info["response_body_size_bytes"]
114+
)
115+
if "request_duration_us" in request_info:
116+
attributes["request_duration_us"] = _anyvalue(request_info["request_duration_us"])
117+
118+
if "durations" in request_info:
119+
durations = request_info["durations"]
120+
for phase_name in RequestDurations.__annotations__.keys():
121+
if phase_name in durations:
122+
timing = durations[phase_name] # type: ignore[literal-required]
123+
attributes[f"{phase_name}_start_us"] = _anyvalue(timing["start_us"])
124+
attributes[f"{phase_name}_duration_us"] = _anyvalue(timing["duration_us"])
125+
126+
return TraceItem(
127+
organization_id=project.organization_id,
128+
project_id=project.id,
129+
trace_id=result["trace_id"],
130+
item_id=item_id,
131+
item_type=TraceItemType.TRACE_ITEM_TYPE_UPTIME_RESULT,
132+
timestamp=_timestamp(result["scheduled_check_time_ms"]),
133+
attributes=attributes,
134+
client_sample_rate=1.0,
135+
server_sample_rate=1.0,
136+
retention_days=quotas.backend.get_event_retention(organization=project.organization) or 90,
137+
received=_timestamp(result["actual_check_time_ms"]),
138+
)
139+
140+
141+
def convert_uptime_result_to_trace_items(
142+
project: Project,
143+
result: CheckResult,
144+
) -> list[TraceItem]:
145+
"""
146+
Convert a complete uptime result to a list of denormalized TraceItems.
147+
148+
Returns one TraceItem per HTTP request in the redirect chain, each containing
149+
both check-level metadata (duplicated) and request-specific data for unified querying.
150+
"""
151+
trace_items = []
152+
153+
request_info_list = result.get("request_info_list", []) # Optional field
154+
if not request_info_list:
155+
request_info = result["request_info"]
156+
if request_info is not None:
157+
request_info_list = [request_info]
158+
159+
for sequence, request_info in enumerate(request_info_list):
160+
if sequence == 0:
161+
# First request uses the span_id directly
162+
item_id = result["span_id"].encode("utf-8")[:16].ljust(16, b"\x00")
163+
else:
164+
request_id = f"{result['span_id']}_req_{sequence}"
165+
item_id = request_id.encode("utf-8")[:16].ljust(16, b"\x00")
166+
167+
request_item = convert_uptime_request_to_trace_item(
168+
project, result, request_info, sequence, item_id
169+
)
170+
trace_items.append(request_item)
171+
172+
return trace_items

0 commit comments

Comments
 (0)