diff --git a/requirements-base.txt b/requirements-base.txt index c1c6d693332814..9f808b3f108aac 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -84,7 +84,7 @@ typing-extensions>=4.9.0 ua-parser>=0.10.0 unidiff>=0.7.4 urllib3[brotli]>=2.2.2 -vroomrs==0.1.7 +vroomrs==0.1.11 pyuwsgi==2.0.28.post1 zstandard>=0.18.0 sentry-usage-accountant==0.0.10 diff --git a/requirements-dev-frozen.txt b/requirements-dev-frozen.txt index eeba408f84bf07..66354cf2c432d4 100644 --- a/requirements-dev-frozen.txt +++ b/requirements-dev-frozen.txt @@ -188,7 +188,7 @@ sentry-forked-djangorestframework-stubs==3.16.0.post1 sentry-forked-email-reply-parser==0.5.12.post1 sentry-kafka-schemas==1.3.14 sentry-ophio==1.1.3 -sentry-protos==0.2.1 +sentry-protos==0.3.1 sentry-redis-tools==0.5.0 sentry-relay==0.9.10 sentry-sdk==2.29.1 @@ -240,7 +240,7 @@ uritemplate==4.1.1 urllib3==2.2.2 vine==5.1.0 virtualenv==20.26.6 -vroomrs==0.1.7 +vroomrs==0.1.11 wcwidth==0.2.13 werkzeug==3.0.6 wheel==0.38.4 diff --git a/requirements-frozen.txt b/requirements-frozen.txt index 99f403f6869cee..c9da052807c5df 100644 --- a/requirements-frozen.txt +++ b/requirements-frozen.txt @@ -125,7 +125,7 @@ sentry-arroyo==2.26.0 sentry-forked-email-reply-parser==0.5.12.post1 sentry-kafka-schemas==1.3.14 sentry-ophio==1.1.3 -sentry-protos==0.2.1 +sentry-protos==0.3.1 sentry-redis-tools==0.5.0 sentry-relay==0.9.10 sentry-sdk==2.29.1 @@ -152,7 +152,7 @@ unidiff==0.7.4 uritemplate==4.1.1 urllib3==2.2.2 vine==5.1.0 -vroomrs==0.1.7 +vroomrs==0.1.11 wcwidth==0.2.13 xmlsec==1.3.14 zstandard==0.18.0 diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 6d43b307a6ccc8..c40d724889f084 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3696,6 +3696,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: SENTRY_PROCESSED_PROFILES_FUTURES_MAX_LIMIT = 10000 SENTRY_PROFILE_FUNCTIONS_FUTURES_MAX_LIMIT = 10000 SENTRY_PROFILE_CHUNKS_FUTURES_MAX_LIMIT = 10000 +SENTRY_PROFILE_OCCURRENCES_FUTURES_MAX_LIMIT = 10000 SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT = 10000 diff --git a/src/sentry/profiles/task.py b/src/sentry/profiles/task.py index ae301c9c4e364c..79a27afc64b8f2 100644 --- a/src/sentry/profiles/task.py +++ b/src/sentry/profiles/task.py @@ -65,6 +65,8 @@ UI_PROFILE_PLATFORMS = {"cocoa", "android", "javascript"} +UNSAMPLED_PROFILE_ID = "00000000000000000000000000000000" + def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer: cluster_name = get_topic_definition(topic)["cluster"] @@ -89,6 +91,11 @@ def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer: max_futures=settings.SENTRY_PROFILE_CHUNKS_FUTURES_MAX_LIMIT, ) +profile_occurrences_producer = SingletonProducer( + lambda: _get_profiles_producer_from_topic(Topic.INGEST_OCCURRENCES), + max_futures=settings.SENTRY_PROFILE_OCCURRENCES_FUTURES_MAX_LIMIT, +) + logger = logging.getLogger(__name__) @@ -245,6 +252,9 @@ def process_profile_task( if ( features.has("projects:continuous-profiling-vroomrs-processing", project) and "profiler_id" in profile + ) or ( + features.has("projects:transaction-profiling-vroomrs-processing", project) + and "profiler_id" not in profile ): if not _process_vroomrs_profile(profile, project): return @@ -1306,12 +1316,84 @@ def is_sdk_deprecated(event_type: EventType, sdk_name: str, sdk_version: str) -> @metrics.wraps("process_profile.process_vroomrs_profile") def _process_vroomrs_profile(profile: Profile, project: Project) -> bool: - if "profiler_id" in profile and _process_vroomrs_chunk_profile(profile=profile): - return True + if "profiler_id" in profile: + if _process_vroomrs_chunk_profile(profile): + return True + else: + if _process_vroomrs_transaction_profile(profile): + return True _track_failed_outcome(profile, project, "profiling_failed_vroomrs_processing") return False +def _process_vroomrs_transaction_profile(profile: Profile) -> bool: + with sentry_sdk.start_span(op="task.profiling.process_vroomrs_transaction_profile"): + try: + # todo (improvement): check the feasibility of passing the profile + # dict directly to the PyO3 module to avoid json serialization/deserialization + with sentry_sdk.start_span(op="json.dumps"): + json_profile = json.dumps(profile) + metrics.distribution( + "profiling.profile.payload.size", + len(json_profile), + tags={"type": "profile", "platform": profile["platform"]}, + ) + with sentry_sdk.start_span(op="json.unmarshal"): + prof = vroomrs.profile_from_json_str(json_profile, profile["platform"]) + prof.normalize() + if not prof.is_sampled(): + # if we're dealing with an unsampled profile + # we'll assign the special "000....00" profile ID + # so that we can handle it accordingly either in + # either of snuba/sentry/front-end + prof.set_profile_id(UNSAMPLED_PROFILE_ID) + if prof.is_sampled(): + with sentry_sdk.start_span(op="gcs.write", name="compress and write"): + storage = get_profiles_storage() + compressed_profile = prof.compress() + storage.save(prof.storage_path(), io.BytesIO(compressed_profile)) + # we only run find_occurrences for sampled profiles, unsampled profiles + # are skipped + with sentry_sdk.start_span(op="processing", name="find occurrences"): + occurrences = prof.find_occurrences() + occurrences.filter_none_type_issues() + occs = occurrences.occurrences + if occs is not None and len(occs) > 0: + payload = KafkaPayload(None, occurrences.to_json_str().encode("utf-8"), []) + topic = ArroyoTopic( + get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"] + ) + profile_occurrences_producer.produce(topic, payload) + # function metrics are extracted for both sampled and unsampled profiles + with sentry_sdk.start_span(op="processing", name="extract functions metrics"): + functions = prof.extract_functions_metrics( + min_depth=1, filter_system_frames=True, max_unique_functions=100 + ) + if functions is not None and len(functions) > 0: + payload = build_profile_functions_kafka_message(prof, functions) + topic = ArroyoTopic( + get_topic_definition(Topic.PROFILES_CALL_TREE)["real_topic_name"] + ) + profile_functions_producer.produce(topic, payload) + if prof.is_sampled(): + # Send profile metadata to Kafka + with sentry_sdk.start_span(op="processing", name="send profile kafka message"): + payload = build_profile_kafka_message(prof) + topic = ArroyoTopic( + get_topic_definition(Topic.PROCESSED_PROFILES)["real_topic_name"] + ) + processed_profiles_producer.produce(topic, payload) + return True + except Exception as e: + sentry_sdk.capture_exception(e) + metrics.incr( + "process_profile.process_vroomrs_profile.error", + tags={"platform": profile["platform"], "reason": "encountered error"}, + sample_rate=1.0, + ) + return False + + def _process_vroomrs_chunk_profile(profile: Profile) -> bool: with sentry_sdk.start_span(op="task.profiling.process_vroomrs_chunk_profile"): try: @@ -1405,3 +1487,71 @@ def build_chunk_functions_kafka_message( "materialization_version": 1, } return KafkaPayload(None, json.dumps(data).encode("utf-8"), []) + + +def build_profile_functions_kafka_message( + profile: vroomrs.Profile, functions: list[vroomrs.CallTreeFunction] +) -> KafkaPayload: + data = { + "environment": profile.get_environment() or "", + "functions": [ + { + "fingerprint": f.get_fingerprint(), + "function": f.get_function(), + "package": f.get_package(), + "in_app": f.get_in_app(), + "self_times_ns": f.get_self_times_ns(), + "thread_id": f.get_thread_id(), + } + for f in functions + ], + "profile_id": profile.get_profile_id(), + "platform": profile.get_platform(), + "project_id": profile.get_project_id(), + "received": int(profile.get_received()), + "release": profile.get_release() or "", + "retention_days": profile.get_retention_days(), + "timestamp": int(profile.get_timestamp()), + "transaction_name": profile.get_transaction().name, + "materialization_version": 1, + } + return KafkaPayload(None, json.dumps(data).encode("utf-8"), []) + + +def build_profile_kafka_message(profile: vroomrs.Profile) -> KafkaPayload: + t = profile.get_transaction() + m = profile.get_metadata() + data = { + "device_locale": m.device_locale or "", + "device_manufacturer": m.device_manufacturer or "", + "device_model": m.device_model, + "device_os_name": m.device_os_name, + "device_os_version": m.device_os_version, + "duration_ns": profile.duration_ns(), + "profile_id": profile.get_profile_id(), + "organization_id": profile.get_organization_id(), + "platform": profile.get_platform(), + "project_id": profile.get_project_id(), + "received": int(profile.get_received()), + "retention_days": profile.get_retention_days(), + "trace_id": t.trace_id, + "transaction_id": t.id, + "transaction_name": t.name, + "version_code": m.version_code or "", + "version_name": m.version_name or "", + } + if (android_api_level := m.android_api_level) is not None: + data["android_api_level"] = android_api_level + if (architecture := m.architecture) is not None: + data["architecture"] = architecture + if (device_classification := m.device_classification) is not None: + data["device_classification"] = device_classification + if (environment := profile.get_environment()) is not None: + data["environment"] = environment + if (device_os_build_number := m.device_os_build_number) is not None: + data["device_os_build_number"] = device_os_build_number + if (sdk_name := m.sdk_name) is not None: + data["sdk_name"] = sdk_name + if (sdk_version := m.sdk_version) is not None: + data["sdk_version"] = sdk_version + return KafkaPayload(None, json.dumps(data).encode("utf-8"), [])