Skip to content

Commit 617d38a

Browse files
authored
Merge pull request #12 from flix-tech/limit_task_retrial
Limit task retrial
2 parents 0ccea7e + a49a3e2 commit 617d38a

File tree

6 files changed

+68
-34
lines changed

6 files changed

+68
-34
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 1.2.0 - 2024-07-25
4+
5+
* The `reschedule()` method has an optional parameter to decrease the TTL if set to True (False by default).
6+
* The `add()` and `add_many()` methods set the `can_start_at` by default to the current time of the database clock, not Python, for consistency.
7+
38
## 1.1.0 - 2024-07-01
49

510
* The `complete()` method now returns the count of updated tasks, 0 if it was already completed

README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,17 +114,16 @@ It uses row level locks of postgres to mimic the atomic pop and atomic push of r
114114

115115
```sql
116116
UPDATE task_queue
117-
SET processing = true,
118-
deadline =
119-
current_timestamp + CAST(lease_timeout || ' seconds' AS INTERVAL)
117+
SET started_at = current_timestamp
120118
WHERE id = (
121119
SELECT id
122120
FROM task_queue
123121
WHERE completed_at IS NULL
124-
AND processing = false
122+
AND started_at IS NULL
125123
AND queue_name = <your_queue_name>
126124
AND ttl > 0
127-
ORDER BY created_at
125+
AND can_start_at <= current_timestamp
126+
ORDER BY can_start_at
128127
FOR UPDATE SKIP LOCKED
129128
LIMIT 1
130129
)
@@ -137,10 +136,11 @@ Let's say two workers try to get a new task at the same time, assuming that they
137136
SELECT id
138137
FROM task_queue
139138
WHERE completed_at IS NULL
140-
AND processing = false
139+
AND started_at IS NULL
141140
AND queue_name = <your_queue_name>
142141
AND ttl > 0
143-
ORDER BY created_at
142+
AND can_start_at <= current_timestamp
143+
ORDER BY can_start_at
144144
```
145145

146146
The first worker locks the row with the `FOR UPDATE` clause until the update is completed and committed. If we hadn't used the `SKIP LOCKED` clause, the second worker would have seen the same row and waited for the first worker to finish the update. However, since the first worker already updated it, the subquery would no longer be valid, and the second worker would return zero rows because `WHERE id = NULL`.

pdm.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

postgrestq/task_queue.py

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,14 @@ def add(
156156
job dies.
157157
can_start_at : datetime
158158
The earliest time the task can be started.
159-
If None, set current time. A task will not be started before this
160-
time.
159+
If None, set current time. For consistency the time is
160+
from the database clock. A task will not be started before
161+
this time.
161162
Returns
162163
-------
163164
task_id :
164165
The random UUID that was generated for this task
165166
"""
166-
if can_start_at is None:
167-
can_start_at = datetime.now(UTC)
168167
# make sure the timeout is an actual number, otherwise we'll run
169168
# into problems later when we calculate the actual deadline
170169
lease_timeout = float(lease_timeout)
@@ -186,7 +185,7 @@ def add(
186185
lease_timeout,
187186
can_start_at
188187
)
189-
VALUES (%s, %s, %s, %s, %s, %s)
188+
VALUES (%s, %s, %s, %s, %s, COALESCE(%s, current_timestamp))
190189
"""
191190
).format(sql.Identifier(self._table_name)),
192191
(
@@ -222,16 +221,15 @@ def add_many(
222221
job dies.
223222
can_start_at : datetime
224223
The earliest time the task can be started.
225-
If None, set current time. A task will not be started before this
224+
If None, set current time. For consistency the time is
225+
from the database clock. A task will not be started before this
226226
time.
227227
Returns
228228
-------
229229
task_ids :
230230
List of random UUIDs that were generated for this task.
231231
The order is the same of the given tasks
232232
"""
233-
if can_start_at is None:
234-
can_start_at = datetime.now(UTC)
235233
# make sure the timeout is an actual number, otherwise we'll run
236234
# into problems later when we calculate the actual deadline
237235
lease_timeout = float(lease_timeout)
@@ -253,7 +251,9 @@ def add_many(
253251
lease_timeout,
254252
can_start_at
255253
)
256-
VALUES (%s, %s, %s, %s, %s, %s)
254+
VALUES (
255+
%s, %s, %s, %s, %s, COALESCE(%s, current_timestamp)
256+
)
257257
"""
258258
).format(sql.Identifier(self._table_name)),
259259
(
@@ -273,8 +273,8 @@ def get(self) -> Tuple[
273273
"""Get a task from the task queue (non-blocking).
274274
275275
This statement marks the next available task in the queue as
276-
"processing" and returns its ID and task details. The query
277-
uses a FOR UPDATE SKIP LOCKED clause to lock the selected
276+
started (being processed) and returns its ID and task details.
277+
The query uses a FOR UPDATE SKIP LOCKED clause to lock the selected
278278
task so that other workers can't select the same task simultaneously.
279279
280280
After executing the query, the method fetches the result using
@@ -291,7 +291,7 @@ def get(self) -> Tuple[
291291
>>> taskqueue.complete(task_id)
292292
293293
After some time (i.e. `lease_timeout`) tasks expire and are
294-
marked as not processing and the TTL is decreased by
294+
marked as not being processed and the TTL is decreased by
295295
one. If TTL is still > 0 the task will be retried.
296296
297297
Note, this method is non-blocking, i.e. it returns immediately
@@ -525,7 +525,7 @@ def get_updated_expired_task(
525525
) -> Tuple[Optional[str], Optional[int]]:
526526
"""
527527
Given the id of an expired task, it tries to reschedule it by
528-
marking it as not processing, resetting the deadline
528+
marking it as not started, resetting the deadline
529529
and decreasing TTL by one. It returns None if the task is
530530
already updated (or being updated) by another worker.
531531
@@ -579,18 +579,29 @@ def _serialize(self, task: Any) -> str:
579579
def _deserialize(self, blob: str) -> Any:
580580
return json.loads(blob)
581581

582-
def reschedule(self, task_id: Optional[UUID]) -> None:
583-
"""Move a task back from the processing- to the task queue.
582+
def reschedule(
583+
self,
584+
task_id: UUID,
585+
decrease_ttl: Optional[bool] = False,
586+
) -> None:
587+
"""Move a task back from being processed to the task queue.
584588
585589
Workers can use this method to "drop" a work unit in case of
586-
eviction.
590+
eviction (because of an external issue like terminating a machine
591+
by AWS and not because of a failure).
592+
Rescheduled work units are immediately available for processing again,
593+
and unless decrease_ttl is set to True, the TTL is not modified.
587594
588-
This function does not modify the TTL.
595+
This function can optionally modify the TTL, setting decrease_ttl to
596+
True. This allows to handle a failure quickly without waiting the
597+
lease_timeout.
589598
590599
Parameters
591600
----------
592-
task_id : str
601+
task_id : UUID
593602
the task ID
603+
decrease_ttl : bool
604+
If True, decrease the TTL by one
594605
595606
Raises
596607
------
@@ -602,13 +613,17 @@ def reschedule(self, task_id: Optional[UUID]) -> None:
602613
if not isinstance(task_id, UUID):
603614
raise ValueError("task_id must be a UUID")
604615
logger.info(f"Rescheduling task {task_id}..")
616+
decrease_ttl_sql = ""
617+
if decrease_ttl:
618+
decrease_ttl_sql = "ttl = ttl - 1,"
619+
605620
conn = self.conn
606621
with conn.cursor() as cur:
607622
cur.execute(
608623
sql.SQL(
609624
"""
610625
UPDATE {}
611-
SET started_at = NULL
626+
SET {} started_at = NULL
612627
WHERE id = (
613628
SELECT id
614629
FROM {}
@@ -619,6 +634,7 @@ def reschedule(self, task_id: Optional[UUID]) -> None:
619634
RETURNING id;"""
620635
).format(
621636
sql.Identifier(self._table_name),
637+
sql.SQL(decrease_ttl_sql),
622638
sql.Identifier(self._table_name),
623639
),
624640
(task_id,),

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ log_cli_date_format = "%Y-%m-%d %H:%M:%S"
1818

1919
[project]
2020
name = "postgres-tq"
21-
version = "1.1.0"
21+
version = "1.2.0"
2222
description = "Postgres Based Task Queue"
2323
authors = [
2424
{name = "FlixTech", email = "open-source@flixbus.com"},

tests/test_task_queue.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,19 @@ def test_reschedule(task_queue: TaskQueue):
176176
assert qname == "test_queue"
177177

178178

179+
def test_reschedule_with_ttl(task_queue: TaskQueue):
180+
task_queue.add({"foo": 1}, LEASE_TIMEOUT, 2)
181+
_, id_, qname = task_queue.get()
182+
# task queue should be empty as 'foo' is in the processing queue
183+
assert task_queue.get() == (None, None, None)
184+
assert qname == "test_queue"
185+
task_queue.reschedule(id_, decrease_ttl=True)
186+
task, _, qname = task_queue.get()
187+
assert task == {"foo": 1}
188+
# task queue should be empty because the task is expired(ttl=0)
189+
assert task_queue.get() == (None, None, None)
190+
191+
179192
def test_reschedule_error(task_queue: TaskQueue):
180193
with pytest.raises(ValueError):
181194
task_queue.reschedule("bar")

0 commit comments

Comments
 (0)