Skip to content

Commit 5151349

Browse files
authored
feat(taskworker): Send uncompressed profile tasks to worker (#95783)
The process_profiles worker code now supports uncompressed parameter payloads (#95692). This PR is responsible for removing double compression by changing the process_profile call site to send uncompressed tasks to taskworker as taskworker already handles zstd compression in its platform.
1 parent fb753cc commit 5151349

File tree

3 files changed

+5
-123
lines changed

3 files changed

+5
-123
lines changed

src/sentry/profiles/consumers/process/factory.py

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import time
2-
import zlib
31
from base64 import b64encode
42
from collections.abc import Iterable, Mapping
53

@@ -12,28 +10,14 @@
1210
from sentry import options
1311
from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step
1412
from sentry.profiles.task import process_profile_task
15-
from sentry.utils import metrics
1613

1714

1815
def process_message(message: Message[KafkaPayload]) -> None:
1916
sampled = is_sampled(message.payload.headers)
2017

2118
if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
22-
start_time = time.perf_counter()
23-
b64encoded_compressed = b64encode(
24-
zlib.compress(
25-
message.payload.value,
26-
level=options.get("taskworker.try_compress.profile_metrics.level"),
27-
)
28-
).decode("utf-8")
29-
end_time = time.perf_counter()
30-
metrics.distribution(
31-
"profiling.profile_metrics.compression_time",
32-
end_time - start_time,
33-
)
34-
process_profile_task.delay(
35-
payload=b64encoded_compressed, sampled=sampled, compressed_profile=True
36-
)
19+
b64encoded = b64encode(message.payload.value).decode("utf-8")
20+
process_profile_task.delay(payload=b64encoded, sampled=sampled, compressed_profile=False)
3721

3822

3923
class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):

tests/sentry/profiles/consumers/test_process.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import zlib
43
from base64 import b64encode
54
from datetime import datetime
65
from typing import Any
@@ -55,9 +54,9 @@ def test_basic_profile_to_celery(self, process_profile_task):
5554
processing_strategy.terminate()
5655

5756
process_profile_task.assert_called_with(
58-
payload=b64encode(zlib.compress(payload)).decode("utf-8"),
57+
payload=b64encode(payload).decode("utf-8"),
5958
sampled=True,
60-
compressed_profile=True,
59+
compressed_profile=False,
6160
)
6261

6362

tests/sentry/profiles/test_task.py

Lines changed: 1 addition & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from unittest import mock
88
from unittest.mock import patch
99

10-
import msgpack
1110
import pytest
1211
from django.core.files.uploadedfile import SimpleUploadedFile
1312
from django.urls import reverse
@@ -33,7 +32,7 @@
3332
)
3433
from sentry.profiles.utils import Profile
3534
from sentry.signals import first_profile_received
36-
from sentry.testutils.cases import TestCase, TransactionTestCase
35+
from sentry.testutils.cases import TransactionTestCase
3736
from sentry.testutils.factories import Factories, get_fixture_path
3837
from sentry.testutils.helpers import Feature, override_options
3938
from sentry.testutils.pytest.fixtures import django_db_all
@@ -1172,103 +1171,3 @@ def test_process_profile_task_should_flip_project_flag(
11721171
)
11731172
project.refresh_from_db()
11741173
assert project.flags.has_profiles
1175-
1176-
1177-
class TestProcessProfileTaskDoubleCompression(TestCase):
1178-
"""
1179-
TODO(taskworker): Remove this test once we have deleted zlib compression.
1180-
Test class for validating the double compression flow:
1181-
1. Consumer does zlib compression and calls process_profile_task.delay()
1182-
2. Taskworker does zstd compression on the task parameters
1183-
3. Task worker decompresses zstd and task decompresses zlib
1184-
"""
1185-
1186-
@patch("sentry.profiles.task._track_outcome")
1187-
@patch("sentry.profiles.task._track_duration_outcome")
1188-
@patch("sentry.profiles.task._symbolicate_profile")
1189-
@patch("sentry.profiles.task._deobfuscate_profile")
1190-
@patch("sentry.profiles.task._push_profile_to_vroom")
1191-
def test_consumer_to_task_double_compression_flow(
1192-
self,
1193-
_push_profile_to_vroom,
1194-
_deobfuscate_profile,
1195-
_symbolicate_profile,
1196-
_track_duration_outcome,
1197-
_track_outcome,
1198-
):
1199-
"""
1200-
Test that the full consumer -> task flow works with double compression.
1201-
1202-
This test validates:
1203-
1. process_message in factory.py does zlib compression
1204-
2. taskworker layer does zstd compression
1205-
3. Both decompressions work correctly in the task execution
1206-
"""
1207-
from datetime import datetime
1208-
1209-
from arroyo.backends.kafka import KafkaPayload
1210-
from arroyo.types import BrokerValue, Message, Partition, Topic
1211-
from django.utils import timezone
1212-
1213-
from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory
1214-
1215-
# Mock the task functions
1216-
_push_profile_to_vroom.return_value = True
1217-
_deobfuscate_profile.return_value = True
1218-
_symbolicate_profile.return_value = True
1219-
1220-
# Get the profile fixture data
1221-
profile = generate_sample_v2_profile()
1222-
1223-
# Create a message dict like the consumer would receive from Kafka
1224-
message_dict = {
1225-
"organization_id": self.organization.id,
1226-
"project_id": self.project.id,
1227-
"key_id": 1,
1228-
"received": int(timezone.now().timestamp()),
1229-
"payload": json.dumps(profile),
1230-
}
1231-
1232-
# Pack the message with msgpack (like the consumer receives from Kafka)
1233-
payload = msgpack.packb(message_dict)
1234-
1235-
# Create the processing strategy (this will call process_message)
1236-
processing_strategy = ProcessProfileStrategyFactory().create_with_partitions(
1237-
commit=mock.Mock(), partitions={}
1238-
)
1239-
1240-
# Use self.tasks() to run the actual task with both compression layers
1241-
with self.tasks():
1242-
# Submit the message to the processing strategy
1243-
# This calls process_message which does:
1244-
# 1. zlib compression of the msgpack data
1245-
# 2. process_profile_task.delay() which adds zstd compression
1246-
processing_strategy.submit(
1247-
Message(
1248-
BrokerValue(
1249-
KafkaPayload(
1250-
b"key",
1251-
payload,
1252-
[],
1253-
),
1254-
Partition(Topic("profiles"), 1),
1255-
1,
1256-
datetime.now(),
1257-
)
1258-
)
1259-
)
1260-
processing_strategy.poll()
1261-
processing_strategy.join(1)
1262-
processing_strategy.terminate()
1263-
1264-
# Verify the task was executed successfully
1265-
assert _push_profile_to_vroom.call_count == 1
1266-
assert _deobfuscate_profile.call_count == 1
1267-
assert _symbolicate_profile.call_count == 1
1268-
assert _track_duration_outcome.call_count == 1
1269-
1270-
# Verify the profile was processed with correct data
1271-
processed_profile = _push_profile_to_vroom.call_args[0][0]
1272-
assert processed_profile["organization_id"] == self.organization.id
1273-
assert processed_profile["project_id"] == self.project.id
1274-
assert processed_profile["platform"] == profile["platform"]

0 commit comments

Comments
 (0)