Skip to content

Commit 6327368

Browse files
authored
feat(taskworker) Add option to disable fetch_next behavior (#94994)
We have a few processing pools that are showing higher than desirable contention on the child_task queue. Our theory is that having both the main worker loop + fetch_next flow appending into the child_task queue is causing contention resulting in lower throughput. These changes introduce an option that allow us to disable fetch_next logic at the pool level, this will allow us to confirm two theories: 1. That fetch_next is contributing to contention. 2. That we can get more throughput and use brokers more effectively as GetTask and SetTaskStatus without fetch_next are faster than with fetch_next. I've reduced the maximum timeout for fetch_task to help reduce processing latency when brokers are running on low throughput topics, and cleaned up some now unused options.
1 parent 130345e commit 6327368

File tree

5 files changed

+56
-29
lines changed

5 files changed

+56
-29
lines changed

src/sentry/options/defaults.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3186,32 +3186,24 @@
31863186

31873187
# Taskbroker flags
31883188
register(
3189-
"taskworker.try_compress.profile_metrics",
3190-
default=0.0,
3191-
type=Float,
3192-
flags=FLAG_AUTOMATOR_MODIFIABLE,
3193-
)
3194-
3195-
register(
3196-
"taskworker.try_compress.profile_metrics.rollout",
3197-
default=0.0,
3198-
type=Float,
3189+
"taskworker.route.overrides",
3190+
default={},
31993191
flags=FLAG_AUTOMATOR_MODIFIABLE,
32003192
)
3201-
3202-
# Taskbroker flags
32033193
register(
32043194
"taskworker.try_compress.profile_metrics.level",
32053195
default=6,
32063196
type=Int,
32073197
flags=FLAG_AUTOMATOR_MODIFIABLE,
32083198
)
3209-
32103199
register(
3211-
"taskworker.route.overrides",
3212-
default={},
3200+
"taskworker.fetch_next.disabled_pools",
3201+
default=[],
32133202
flags=FLAG_AUTOMATOR_MODIFIABLE,
32143203
)
3204+
3205+
3206+
# Taskbroker rollout flags
32153207
register(
32163208
"taskworker.deletions.rollout",
32173209
default={},

src/sentry/taskworker/worker.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from django.conf import settings
1616
from sentry_protos.taskbroker.v1.taskbroker_pb2 import FetchNextTask
1717

18+
from sentry import options
1819
from sentry.taskworker.client.client import HostTemporarilyUnavailable, TaskworkerClient
1920
from sentry.taskworker.client.inflight_task_activation import InflightTaskActivation
2021
from sentry.taskworker.client.processing_result import ProcessingResult
@@ -177,6 +178,10 @@ def _add_task(self) -> bool:
177178
tags={"processing_pool": self._processing_pool_name},
178179
)
179180
except queue.Full:
181+
metrics.incr(
182+
"taskworker.worker.child_tasks.put.full",
183+
tags={"processing_pool": self._processing_pool_name},
184+
)
180185
logger.warning(
181186
"taskworker.add_task.child_task_queue_full",
182187
extra={
@@ -204,9 +209,13 @@ def result_thread() -> None:
204209
iopool = ThreadPoolExecutor(max_workers=self._concurrency)
205210
with iopool as executor:
206211
while not self._shutdown_event.is_set():
212+
fetch_next = self._processing_pool_name not in options.get(
213+
"taskworker.fetch_next.disabled_pools"
214+
)
215+
207216
try:
208217
result = self._processed_tasks.get(timeout=1.0)
209-
executor.submit(self._send_result, result)
218+
executor.submit(self._send_result, result, fetch_next)
210219
except queue.Empty:
211220
metrics.incr(
212221
"taskworker.worker.result_thread.queue_empty",
@@ -342,7 +351,7 @@ def fetch_task(self) -> InflightTaskActivation | None:
342351
extra={"error": e, "processing_pool": self._processing_pool_name},
343352
)
344353

345-
self._gettask_backoff_seconds = min(self._gettask_backoff_seconds + 2, 10)
354+
self._gettask_backoff_seconds = min(self._gettask_backoff_seconds + 1, 5)
346355
return None
347356

348357
if not activation:
@@ -354,9 +363,7 @@ def fetch_task(self) -> InflightTaskActivation | None:
354363
"taskworker.fetch_task.not_found",
355364
extra={"processing_pool": self._processing_pool_name},
356365
)
357-
358-
# TODO cap backoff to 5 seconds instead?
359-
self._gettask_backoff_seconds = min(self._gettask_backoff_seconds + 1, 10)
366+
self._gettask_backoff_seconds = min(self._gettask_backoff_seconds + 1, 5)
360367
return None
361368

362369
self._gettask_backoff_seconds = 0

src/sentry/taskworker/workerchild.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,16 +171,9 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
171171
)
172172
break
173173

174-
child_tasks_get_start = time.monotonic()
175174
try:
176-
# If the queue is empty, this could block for a second.
177175
inflight = child_tasks.get(timeout=1.0)
178176
except queue.Empty:
179-
metrics.distribution(
180-
"taskworker.worker.child_task_queue_empty.wait_duration",
181-
time.monotonic() - child_tasks_get_start,
182-
tags={"processing_pool": processing_pool_name},
183-
)
184177
metrics.incr(
185178
"taskworker.worker.child_task_queue_empty",
186179
tags={"processing_pool": processing_pool_name},

tests/sentry/processing/backpressure/test_checking.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
"backpressure.checking.interval": 5,
3131
"backpressure.monitoring.enabled": True,
3232
"backpressure.status_ttl": 60,
33-
"taskworker.try_compress.profile_metrics.rollout": 0,
3433
}
3534
)
3635
def test_backpressure_unhealthy_profiles():
@@ -54,7 +53,6 @@ def test_backpressure_unhealthy_profiles():
5453
"backpressure.checking.interval": 5,
5554
"backpressure.monitoring.enabled": False,
5655
"backpressure.status_ttl": 60,
57-
"taskworker.try_compress.profile_metrics.rollout": 0,
5856
}
5957
)
6058
def test_bad_config():

tests/sentry/taskworker/test_worker.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,43 @@ def update_task_response(*args, **kwargs):
224224
)
225225
assert mock_client.update_task.call_args.args[1] is None
226226

227+
@override_options({"taskworker.fetch_next.disabled_pools": ["testing"]})
228+
def test_run_once_with_fetch_next_disabled(self) -> None:
229+
# Cover the scenario where taskworker.fetch_next.disabled_pools is defined
230+
max_runtime = 5
231+
taskworker = TaskWorker(
232+
rpc_host="127.0.0.1:50051",
233+
num_brokers=1,
234+
max_child_task_count=1,
235+
process_type="fork",
236+
processing_pool_name="testing",
237+
)
238+
with mock.patch.object(taskworker, "client") as mock_client:
239+
mock_client.update_task.return_value = None
240+
mock_client.get_task.return_value = SIMPLE_TASK
241+
taskworker.start_result_thread()
242+
taskworker.start_spawn_children_thread()
243+
244+
# Run until two tasks have been processed
245+
start = time.time()
246+
while True:
247+
taskworker.run_once()
248+
if mock_client.update_task.call_count >= 2:
249+
break
250+
if time.time() - start > max_runtime:
251+
taskworker.shutdown()
252+
raise AssertionError("Timeout waiting for update_task to be called")
253+
254+
taskworker.shutdown()
255+
assert mock_client.get_task.called
256+
assert mock_client.update_task.call_count == 2
257+
assert mock_client.update_task.call_args.args[0].host == "localhost:50051"
258+
assert mock_client.update_task.call_args.args[0].task_id == SIMPLE_TASK.activation.id
259+
assert (
260+
mock_client.update_task.call_args.args[0].status == TASK_ACTIVATION_STATUS_COMPLETE
261+
)
262+
assert mock_client.update_task.call_args.args[1] is None
263+
227264
def test_run_once_with_update_failure(self) -> None:
228265
# Cover the scenario where update_task fails a few times in a row
229266
# We should retain the result until RPC succeeds.

0 commit comments

Comments
 (0)