Skip to content

Commit 8f96c06

Browse files
authored
feat(taskworker): Add compression in taskworker producer and worker (#95153)
The taskbroker system does not handle large payloads well. We first experienced the effects of this with ingest-profiles when rolling out to smaller environments. We also observed slow throughput and sqlite issues for processing pools handling sentryapp and seer tasks (2-8MB payloads). This PR is responsible for adding the ability to enable task parameter compression in taskworkers. Flow: 1. User defines a `CompressionType` attribute on the `@instrumented_task` decorator which chooses the compression algorithm (only supports ZSTD and PLAINTEXT). Defaults to PLAINTEXT. 2. In the taskworker producer layer, parameters gets compression and serialized. A task header is added indicating the compression type 3. Parameters stay compressed in kafka and taskbroker storage. 4. Using the tasks header, the worker determines whether the task needs to be decompressed. If so, base64 decode and decompress the message. Rollout: To rollout this change, we update a task's decorator with `CompressionType`, then update the sentry option's compression rollout rate. Depending on the rate, tasks will be sampled for compression. By using the task header, this enables same tasks to be incrementally rolled out. Testing: - Local testing - Unit tests - Will start with 1% of a single task in S4S. Observe compression metrics (duration and size) to ensure similar performance.
1 parent 7d026ea commit 8f96c06

File tree

8 files changed

+175
-5
lines changed

8 files changed

+175
-5
lines changed

src/sentry/options/defaults.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3452,6 +3452,14 @@
34523452
flags=FLAG_AUTOMATOR_MODIFIABLE,
34533453
)
34543454

3455+
# Taskbroker compression flag
3456+
register(
3457+
"taskworker.enable_compression.rollout",
3458+
default=0.0,
3459+
type=Float,
3460+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3461+
)
3462+
34553463
# Orgs for which compression should be disabled in the chunk upload endpoint.
34563464
# This is intended to circumvent sporadic 503 errors reported by some customers.
34573465
register("chunk-upload.no-compression", default=[], flags=FLAG_AUTOMATOR_MODIFIABLE)

src/sentry/taskworker/constants.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from enum import Enum
2+
13
DEFAULT_PROCESSING_DEADLINE = 10
24
"""
35
The fallback/default processing_deadline that tasks
@@ -32,3 +34,12 @@
3234
The number of tasks a worker child process will process
3335
before being restarted.
3436
"""
37+
38+
39+
class CompressionType(Enum):
40+
"""
41+
The type of compression used for task parameters.
42+
"""
43+
44+
ZSTD = "zstd"
45+
PLAINTEXT = "plaintext"

src/sentry/taskworker/registry.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from sentry_sdk.consts import OP, SPANDATA
1616

1717
from sentry.conf.types.kafka_definition import Topic
18-
from sentry.taskworker.constants import DEFAULT_PROCESSING_DEADLINE
18+
from sentry.taskworker.constants import DEFAULT_PROCESSING_DEADLINE, CompressionType
1919
from sentry.taskworker.retry import Retry
2020
from sentry.taskworker.router import TaskRouter
2121
from sentry.taskworker.task import P, R, Task
@@ -83,6 +83,7 @@ def register(
8383
processing_deadline_duration: int | datetime.timedelta | None = None,
8484
at_most_once: bool = False,
8585
wait_for_delivery: bool = False,
86+
compression_type: CompressionType = CompressionType.PLAINTEXT,
8687
) -> Callable[[Callable[P, R]], Task[P, R]]:
8788
"""
8889
Register a task.
@@ -108,6 +109,8 @@ def register(
108109
wait_for_delivery: bool
109110
If true, the task will wait for the delivery report to be received
110111
before returning.
112+
compression_type: CompressionType
113+
The compression type to use to compress the task parameters.
111114
"""
112115

113116
def wrapped(func: Callable[P, R]) -> Task[P, R]:
@@ -125,6 +128,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]:
125128
),
126129
at_most_once=at_most_once,
127130
wait_for_delivery=wait_for_delivery,
131+
compression_type=compression_type,
128132
)
129133
# TODO(taskworker) tasks should be registered into the registry
130134
# so that we can ensure task names are globally unique

src/sentry/taskworker/task.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
from __future__ import annotations
22

3+
import base64
34
import datetime
5+
import random
6+
import time
47
from collections.abc import Callable, Collection, Mapping, MutableMapping
58
from functools import update_wrapper
69
from typing import TYPE_CHECKING, Any, Generic, ParamSpec, TypeVar
710
from uuid import uuid4
811

912
import orjson
1013
import sentry_sdk
14+
import zstandard as zstd
1115
from django.conf import settings
1216
from django.utils import timezone
1317
from google.protobuf.timestamp_pb2 import Timestamp
@@ -17,8 +21,10 @@
1721
TaskActivation,
1822
)
1923

20-
from sentry.taskworker.constants import DEFAULT_PROCESSING_DEADLINE
24+
from sentry import options
25+
from sentry.taskworker.constants import DEFAULT_PROCESSING_DEADLINE, CompressionType
2126
from sentry.taskworker.retry import Retry
27+
from sentry.utils import metrics
2228

2329
if TYPE_CHECKING:
2430
from sentry.taskworker.registry import TaskNamespace
@@ -39,6 +45,7 @@ def __init__(
3945
processing_deadline_duration: int | datetime.timedelta | None = None,
4046
at_most_once: bool = False,
4147
wait_for_delivery: bool = False,
48+
compression_type: CompressionType = CompressionType.PLAINTEXT,
4249
):
4350
self.name = name
4451
self._func = func
@@ -58,6 +65,7 @@ def __init__(
5865
self._retry = retry
5966
self.at_most_once = at_most_once
6067
self.wait_for_delivery = wait_for_delivery
68+
self.compression_type = compression_type
6169
update_wrapper(self, func)
6270

6371
@property
@@ -169,12 +177,41 @@ def create_activation(
169177
f"The `{key}` header value is of type {type(value)}"
170178
)
171179

180+
parameters_json = orjson.dumps({"args": args, "kwargs": kwargs})
181+
if self.compression_type == CompressionType.ZSTD:
182+
# TODO(taskworker): Nesting this conditional avoids django_db fixtures in tests.
183+
# Once we have rolled out compression safely, we can remove this conditional.
184+
compression_rollout_rate = options.get("taskworker.enable_compression.rollout")
185+
if compression_rollout_rate and compression_rollout_rate > random.random():
186+
# Worker uses this header to determine if the parameters are decompressed
187+
headers["compression-type"] = CompressionType.ZSTD.value
188+
start_time = time.perf_counter()
189+
parameters_data = zstd.compress(parameters_json)
190+
# Compressed data is binary and needs base64 encoding for transport
191+
parameters_str = base64.b64encode(parameters_data).decode("utf8")
192+
end_time = time.perf_counter()
193+
194+
metrics.distribution(
195+
"taskworker.producer.compressed_parameters_size",
196+
len(parameters_str),
197+
tags={"namespace": self._namespace.name, "taskname": self.name},
198+
)
199+
metrics.distribution(
200+
"taskworker.producer.compression_time",
201+
end_time - start_time,
202+
tags={"namespace": self._namespace.name, "taskname": self.name},
203+
)
204+
else:
205+
parameters_str = parameters_json.decode("utf8")
206+
else:
207+
parameters_str = parameters_json.decode("utf8")
208+
172209
return TaskActivation(
173210
id=uuid4().hex,
174211
namespace=self._namespace.name,
175212
taskname=self.name,
176213
headers=headers,
177-
parameters=orjson.dumps({"args": args, "kwargs": kwargs}).decode("utf8"),
214+
parameters=parameters_str,
178215
retry_state=self._create_retry_state(),
179216
received_at=received_at,
180217
processing_deadline_duration=processing_deadline,

src/sentry/taskworker/tasks/examples.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from time import sleep
55
from typing import Any
66

7+
from sentry.taskworker.constants import CompressionType
78
from sentry.taskworker.namespaces import exampletasks
89
from sentry.taskworker.retry import LastAction, NoRetriesRemainingError, Retry, RetryError
910
from sentry.taskworker.retry import retry_task as retry_task_helper
@@ -87,3 +88,9 @@ def at_most_once_task() -> None:
8788
def timed_task(sleep_seconds: float | str, *args: list[Any], **kwargs: dict[str, Any]) -> None:
8889
sleep(float(sleep_seconds))
8990
logger.debug("timed_task complete")
91+
92+
93+
@exampletasks.register(name="examples.simple_task", compression_type=CompressionType.ZSTD)
94+
def simple_task_compressed(*args: list[Any], **kwargs: dict[str, Any]) -> None:
95+
sleep(0.1)
96+
logger.debug("simple_task_compressed complete")

src/sentry/taskworker/workerchild.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import base64
34
import contextlib
45
import logging
56
import queue
@@ -13,6 +14,7 @@
1314
# XXX: Don't import any modules that will import django here, do those within child_process
1415
import orjson
1516
import sentry_sdk
17+
import zstandard as zstd
1618
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
1719
TASK_ACTIVATION_STATUS_COMPLETE,
1820
TASK_ACTIVATION_STATUS_FAILURE,
@@ -25,6 +27,7 @@
2527

2628
from sentry.taskworker.client.inflight_task_activation import InflightTaskActivation
2729
from sentry.taskworker.client.processing_result import ProcessingResult
30+
from sentry.taskworker.constants import CompressionType
2831

2932
logger = logging.getLogger("sentry.taskworker.worker")
3033

@@ -77,6 +80,19 @@ def get_at_most_once_key(namespace: str, taskname: str, task_id: str) -> str:
7780
return f"tw:amo:{namespace}:{taskname}:{task_id}"
7881

7982

83+
def load_parameters(data: str, headers: dict[str, str]) -> dict[str, Any]:
84+
compression_type = headers.get("compression-type", None)
85+
if not compression_type or compression_type == CompressionType.PLAINTEXT.value:
86+
return orjson.loads(data)
87+
elif compression_type == CompressionType.ZSTD.value:
88+
return orjson.loads(zstd.decompress(base64.b64decode(data)))
89+
else:
90+
logger.error(
91+
"Unsupported compression type: %s. Continuing with plaintext.", compression_type
92+
)
93+
return orjson.loads(data)
94+
95+
8096
def status_name(status: TaskActivationStatus.ValueType) -> str:
8197
"""Convert a TaskActivationStatus to a human readable name"""
8298
if status == TASK_ACTIVATION_STATUS_COMPLETE:
@@ -316,10 +332,11 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
316332

317333
def _execute_activation(task_func: Task[Any, Any], activation: TaskActivation) -> None:
318334
"""Invoke a task function with the activation parameters."""
319-
parameters = orjson.loads(activation.parameters)
335+
headers = {k: v for k, v in activation.headers.items()}
336+
parameters = load_parameters(activation.parameters, headers)
337+
320338
args = parameters.get("args", [])
321339
kwargs = parameters.get("kwargs", {})
322-
headers = {k: v for k, v in activation.headers.items()}
323340

324341
transaction = sentry_sdk.continue_trace(
325342
environ_or_headers=headers,

tests/sentry/taskworker/test_registry.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1+
import base64
12
from concurrent.futures import Future
23
from unittest.mock import Mock
34

5+
import orjson
46
import pytest
7+
import zstandard as zstd
58
from django.test.utils import override_settings
69
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
710
ON_ATTEMPTS_EXCEEDED_DEADLETTER,
811
ON_ATTEMPTS_EXCEEDED_DISCARD,
912
)
1013

1114
from sentry.conf.types.kafka_definition import Topic
15+
from sentry.taskworker.constants import CompressionType
1216
from sentry.taskworker.registry import TaskNamespace, TaskRegistry
1317
from sentry.taskworker.retry import LastAction, Retry
1418
from sentry.taskworker.router import DefaultRouter
1519
from sentry.taskworker.task import Task
20+
from sentry.testutils.helpers.options import override_options
1621

1722

1823
def test_namespace_register_task() -> None:
@@ -133,6 +138,34 @@ def simple_task() -> None:
133138
assert proto_message == activation.SerializeToString()
134139

135140

141+
@pytest.mark.django_db
142+
def test_namespace_send_task_with_compression() -> None:
143+
namespace = TaskNamespace(
144+
name="tests",
145+
router=DefaultRouter(),
146+
retry=None,
147+
)
148+
149+
@namespace.register(name="test.compression_task", compression_type=CompressionType.ZSTD)
150+
def simple_task_with_compression(param: str) -> None:
151+
raise NotImplementedError
152+
153+
with override_options({"taskworker.enable_compression.rollout": 1.0}):
154+
activation = simple_task_with_compression.create_activation(
155+
args=["test_arg"], kwargs={"test_key": "test_value"}
156+
)
157+
158+
assert activation.headers.get("compression-type") == CompressionType.ZSTD.value
159+
160+
expected_params = {"args": ["test_arg"], "kwargs": {"test_key": "test_value"}}
161+
162+
decoded_data = base64.b64decode(activation.parameters.encode("utf-8"))
163+
decompressed_data = zstd.decompress(decoded_data)
164+
actual_params = orjson.loads(decompressed_data)
165+
166+
assert actual_params == expected_params
167+
168+
136169
@pytest.mark.django_db
137170
def test_namespace_send_task_with_retry() -> None:
138171
namespace = TaskNamespace(

tests/sentry/taskworker/test_worker.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import base64
12
import queue
23
import time
34
from multiprocessing import Event
45
from unittest import mock
56

67
import grpc
8+
import orjson
79
import pytest
10+
import zstandard as zstd
811
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
912
ON_ATTEMPTS_EXCEEDED_DISCARD,
1013
TASK_ACTIVATION_STATUS_COMPLETE,
@@ -18,6 +21,7 @@
1821

1922
from sentry.taskworker.client.inflight_task_activation import InflightTaskActivation
2023
from sentry.taskworker.client.processing_result import ProcessingResult
24+
from sentry.taskworker.constants import CompressionType
2125
from sentry.taskworker.retry import NoRetriesRemainingError
2226
from sentry.taskworker.state import current_task
2327
from sentry.taskworker.worker import TaskWorker
@@ -120,6 +124,30 @@
120124
),
121125
)
122126

127+
COMPRESSED_TASK = InflightTaskActivation(
128+
host="localhost:50051",
129+
receive_timestamp=0,
130+
activation=TaskActivation(
131+
id="compressed_task_123",
132+
taskname="examples.simple_task",
133+
namespace="examples",
134+
parameters=base64.b64encode(
135+
zstd.compress(
136+
orjson.dumps(
137+
{
138+
"args": ["test_arg1", "test_arg2"],
139+
"kwargs": {"test_key": "test_value", "number": 42},
140+
}
141+
)
142+
)
143+
).decode("utf8"),
144+
headers={
145+
"compression-type": CompressionType.ZSTD.value,
146+
},
147+
processing_deadline_duration=2,
148+
),
149+
)
150+
123151

124152
@pytest.mark.django_db
125153
class TestTaskWorker(TestCase):
@@ -672,3 +700,28 @@ def test_child_process_terminate_task(mock_capture: mock.Mock) -> None:
672700
assert result.status == TASK_ACTIVATION_STATUS_FAILURE
673701
assert mock_capture.call_count == 1
674702
assert type(mock_capture.call_args.args[0]) is ProcessingDeadlineExceeded
703+
704+
705+
@pytest.mark.django_db
706+
@mock.patch("sentry.taskworker.workerchild.capture_checkin")
707+
def test_child_process_decompression(mock_capture_checkin) -> None:
708+
709+
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
710+
processed: queue.Queue[ProcessingResult] = queue.Queue()
711+
shutdown = Event()
712+
713+
todo.put(COMPRESSED_TASK)
714+
child_process(
715+
todo,
716+
processed,
717+
shutdown,
718+
max_task_count=1,
719+
processing_pool_name="test",
720+
process_type="fork",
721+
)
722+
723+
assert todo.empty()
724+
result = processed.get()
725+
assert result.task_id == COMPRESSED_TASK.activation.id
726+
assert result.status == TASK_ACTIVATION_STATUS_COMPLETE
727+
assert mock_capture_checkin.call_count == 0

0 commit comments

Comments
 (0)