Skip to content

feat(profiling): add _process_vroomrs_transaction_profile #95223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements-base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ typing-extensions>=4.9.0
ua-parser>=0.10.0
unidiff>=0.7.4
urllib3[brotli]>=2.2.2
vroomrs==0.1.7
vroomrs==0.1.11
pyuwsgi==2.0.28.post1
zstandard>=0.18.0
sentry-usage-accountant==0.0.10
Expand Down
4 changes: 2 additions & 2 deletions requirements-dev-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ sentry-forked-djangorestframework-stubs==3.16.0.post1
sentry-forked-email-reply-parser==0.5.12.post1
sentry-kafka-schemas==1.3.14
sentry-ophio==1.1.3
sentry-protos==0.2.1
sentry-protos==0.3.1
sentry-redis-tools==0.5.0
sentry-relay==0.9.10
sentry-sdk==2.29.1
Expand Down Expand Up @@ -240,7 +240,7 @@ uritemplate==4.1.1
urllib3==2.2.2
vine==5.1.0
virtualenv==20.26.6
vroomrs==0.1.7
vroomrs==0.1.11
wcwidth==0.2.13
werkzeug==3.0.6
wheel==0.38.4
Expand Down
4 changes: 2 additions & 2 deletions requirements-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ sentry-arroyo==2.26.0
sentry-forked-email-reply-parser==0.5.12.post1
sentry-kafka-schemas==1.3.14
sentry-ophio==1.1.3
sentry-protos==0.2.1
sentry-protos==0.3.1
sentry-redis-tools==0.5.0
sentry-relay==0.9.10
sentry-sdk==2.29.1
Expand All @@ -152,7 +152,7 @@ unidiff==0.7.4
uritemplate==4.1.1
urllib3==2.2.2
vine==5.1.0
vroomrs==0.1.7
vroomrs==0.1.11
wcwidth==0.2.13
xmlsec==1.3.14
zstandard==0.18.0
Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3696,6 +3696,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
SENTRY_PROCESSED_PROFILES_FUTURES_MAX_LIMIT = 10000
SENTRY_PROFILE_FUNCTIONS_FUTURES_MAX_LIMIT = 10000
SENTRY_PROFILE_CHUNKS_FUTURES_MAX_LIMIT = 10000
SENTRY_PROFILE_OCCURRENCES_FUTURES_MAX_LIMIT = 10000

SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT = 10000

Expand Down
154 changes: 152 additions & 2 deletions src/sentry/profiles/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@

UI_PROFILE_PLATFORMS = {"cocoa", "android", "javascript"}

UNSAMPLED_PROFILE_ID = "00000000000000000000000000000000"


def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer:
cluster_name = get_topic_definition(topic)["cluster"]
Expand All @@ -89,6 +91,11 @@ def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer:
max_futures=settings.SENTRY_PROFILE_CHUNKS_FUTURES_MAX_LIMIT,
)

profile_occurrences_producer = SingletonProducer(
lambda: _get_profiles_producer_from_topic(Topic.INGEST_OCCURRENCES),
max_futures=settings.SENTRY_PROFILE_OCCURRENCES_FUTURES_MAX_LIMIT,
)

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -245,6 +252,9 @@ def process_profile_task(
if (
features.has("projects:continuous-profiling-vroomrs-processing", project)
and "profiler_id" in profile
) or (
features.has("projects:transaction-profiling-vroomrs-processing", project)
and "profiler_id" not in profile
):
if not _process_vroomrs_profile(profile, project):
return
Expand Down Expand Up @@ -1306,12 +1316,84 @@ def is_sdk_deprecated(event_type: EventType, sdk_name: str, sdk_version: str) ->

@metrics.wraps("process_profile.process_vroomrs_profile")
def _process_vroomrs_profile(profile: Profile, project: Project) -> bool:
if "profiler_id" in profile and _process_vroomrs_chunk_profile(profile=profile):
return True
if "profiler_id" in profile:
if _process_vroomrs_chunk_profile(profile):
return True
else:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we be explicit here and check that this is a transaction profile

if _process_vroomrs_transaction_profile(profile):
return True
_track_failed_outcome(profile, project, "profiling_failed_vroomrs_processing")
return False


def _process_vroomrs_transaction_profile(profile: Profile) -> bool:
with sentry_sdk.start_span(op="task.profiling.process_vroomrs_transaction_profile"):
try:
# todo (improvement): check the feasibility of passing the profile
# dict directly to the PyO3 module to avoid json serialization/deserialization
with sentry_sdk.start_span(op="json.dumps"):
json_profile = json.dumps(profile)
metrics.distribution(
"profiling.profile.payload.size",
len(json_profile),
tags={"type": "profile", "platform": profile["platform"]},
)
with sentry_sdk.start_span(op="json.unmarshal"):
prof = vroomrs.profile_from_json_str(json_profile, profile["platform"])
prof.normalize()
if not prof.is_sampled():
# if we're dealing with an unsampled profile
# we'll assign the special "000....00" profile ID
# so that we can handle it accordingly either in
# either of snuba/sentry/front-end
prof.set_profile_id(UNSAMPLED_PROFILE_ID)
if prof.is_sampled():
with sentry_sdk.start_span(op="gcs.write", name="compress and write"):
storage = get_profiles_storage()
compressed_profile = prof.compress()
storage.save(prof.storage_path(), io.BytesIO(compressed_profile))
# we only run find_occurrences for sampled profiles, unsampled profiles
# are skipped
with sentry_sdk.start_span(op="processing", name="find occurrences"):
occurrences = prof.find_occurrences()
occurrences.filter_none_type_issues()
occs = occurrences.occurrences
if occs is not None and len(occs) > 0:
payload = KafkaPayload(None, occurrences.to_json_str().encode("utf-8"), [])
topic = ArroyoTopic(
get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"]
)
profile_occurrences_producer.produce(topic, payload)
# function metrics are extracted for both sampled and unsampled profiles
with sentry_sdk.start_span(op="processing", name="extract functions metrics"):
functions = prof.extract_functions_metrics(
min_depth=1, filter_system_frames=True, max_unique_functions=100
)
if functions is not None and len(functions) > 0:
payload = build_profile_functions_kafka_message(prof, functions)
topic = ArroyoTopic(
get_topic_definition(Topic.PROFILES_CALL_TREE)["real_topic_name"]
)
profile_functions_producer.produce(topic, payload)
if prof.is_sampled():
# Send profile metadata to Kafka
with sentry_sdk.start_span(op="processing", name="send profile kafka message"):
payload = build_profile_kafka_message(prof)
topic = ArroyoTopic(
get_topic_definition(Topic.PROCESSED_PROFILES)["real_topic_name"]
)
processed_profiles_producer.produce(topic, payload)
return True
except Exception as e:
sentry_sdk.capture_exception(e)
metrics.incr(
"process_profile.process_vroomrs_profile.error",
tags={"platform": profile["platform"], "reason": "encountered error"},
sample_rate=1.0,
)
return False


def _process_vroomrs_chunk_profile(profile: Profile) -> bool:
with sentry_sdk.start_span(op="task.profiling.process_vroomrs_chunk_profile"):
try:
Expand Down Expand Up @@ -1405,3 +1487,71 @@ def build_chunk_functions_kafka_message(
"materialization_version": 1,
}
return KafkaPayload(None, json.dumps(data).encode("utf-8"), [])


def build_profile_functions_kafka_message(
profile: vroomrs.Profile, functions: list[vroomrs.CallTreeFunction]
) -> KafkaPayload:
data = {
"environment": profile.get_environment() or "",
"functions": [
{
"fingerprint": f.get_fingerprint(),
"function": f.get_function(),
"package": f.get_package(),
"in_app": f.get_in_app(),
"self_times_ns": f.get_self_times_ns(),
"thread_id": f.get_thread_id(),
}
for f in functions
],
"profile_id": profile.get_profile_id(),
"platform": profile.get_platform(),
"project_id": profile.get_project_id(),
"received": int(profile.get_received()),
"release": profile.get_release() or "",
"retention_days": profile.get_retention_days(),
"timestamp": int(profile.get_timestamp()),
"transaction_name": profile.get_transaction().name,
"materialization_version": 1,
}
return KafkaPayload(None, json.dumps(data).encode("utf-8"), [])


def build_profile_kafka_message(profile: vroomrs.Profile) -> KafkaPayload:
t = profile.get_transaction()
m = profile.get_metadata()
data = {
"device_locale": m.device_locale or "",
"device_manufacturer": m.device_manufacturer or "",
"device_model": m.device_model,
"device_os_name": m.device_os_name,
"device_os_version": m.device_os_version,
"duration_ns": profile.duration_ns(),
"profile_id": profile.get_profile_id(),
"organization_id": profile.get_organization_id(),
"platform": profile.get_platform(),
"project_id": profile.get_project_id(),
"received": int(profile.get_received()),
"retention_days": profile.get_retention_days(),
"trace_id": t.trace_id,
"transaction_id": t.id,
"transaction_name": t.name,
"version_code": m.version_code or "",
"version_name": m.version_name or "",
}
if (android_api_level := m.android_api_level) is not None:
data["android_api_level"] = android_api_level
if (architecture := m.architecture) is not None:
data["architecture"] = architecture
if (device_classification := m.device_classification) is not None:
data["device_classification"] = device_classification
if (environment := profile.get_environment()) is not None:
data["environment"] = environment
if (device_os_build_number := m.device_os_build_number) is not None:
data["device_os_build_number"] = device_os_build_number
if (sdk_name := m.sdk_name) is not None:
data["sdk_name"] = sdk_name
if (sdk_version := m.sdk_version) is not None:
data["sdk_version"] = sdk_version
return KafkaPayload(None, json.dumps(data).encode("utf-8"), [])
Loading