Skip to content

Commit 2f13e8b

Browse files
authored
Fix poll_for_tasks behaviour (#131)
* refactor: optimize polling function for task processing and timeout handling - Consolidated SQL logic to improve performance and clarity - Removed redundant CTEs and combined message reading with task processing - Implemented batch timeout setting to reduce per-row calls - Enhanced comments for better maintainability and understanding * fix: update poll_for_tasks function to use separate statement for polling Refactors the poll_for_tasks stored procedure to improve performance by executing the set_vt call as a separate statement, reducing overhead during polling. Also adds the new migration file to the schema migrations list for consistency. * fix(core): improve poll_for_tasks to reduce latency by splitting read statements The implementation was calling read_with_poll and SELECT in the same statement, causing new tasks inserted during polling to be missed. Now, poll_for_tasks is split into separate statements, ensuring tasks created during polling are immediately detected.
1 parent e66c78c commit 2f13e8b

File tree

4 files changed

+204
-79
lines changed

4 files changed

+204
-79
lines changed

.changeset/blue-mirrors-roll.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
'@pgflow/core': patch
3+
---
4+
5+
Fix `poll_for_tasks` latency
6+
7+
The previous implementation were calling `read_with_poll` in same statement
8+
as the `SELECT FROM step_tasks`, which resulted in new tasks that were inserted
9+
after the `read_with_poll` started were not discovered as those were not visible
10+
in the statement.
11+
12+
Now `poll_for_tasks` is split to separate statements so step tasks created
13+
during the `poll_for_tasks` will be immediately picked up.

pkgs/core/schemas/0090_function_poll_for_tasks.sql

Lines changed: 88 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -9,92 +9,102 @@ returns setof pgflow.step_task_record
99
volatile
1010
set search_path to ''
1111
as $$
12-
13-
with read_messages as (
14-
select *
12+
declare
13+
msg_ids bigint[];
14+
begin
15+
-- First statement: Read messages and capture their IDs
16+
-- This gets its own snapshot and can see newly committed messages
17+
select array_agg(msg_id)
18+
into msg_ids
1519
from pgflow.read_with_poll(
1620
queue_name,
1721
vt,
1822
qty,
1923
max_poll_seconds,
2024
poll_interval_ms
25+
);
26+
27+
-- If no messages were read, return empty set
28+
if msg_ids is null or array_length(msg_ids, 1) is null then
29+
return;
30+
end if;
31+
32+
-- Second statement: Process tasks with fresh snapshot
33+
-- This can now see step_tasks that were committed during the poll
34+
return query
35+
with tasks as (
36+
select
37+
task.flow_slug,
38+
task.run_id,
39+
task.step_slug,
40+
task.task_index,
41+
task.message_id
42+
from pgflow.step_tasks as task
43+
where task.message_id = any(msg_ids)
44+
and task.status = 'queued'
45+
),
46+
increment_attempts as (
47+
update pgflow.step_tasks
48+
set attempts_count = attempts_count + 1
49+
from tasks
50+
where step_tasks.message_id = tasks.message_id
51+
and status = 'queued'
52+
),
53+
runs as (
54+
select
55+
r.run_id,
56+
r.input
57+
from pgflow.runs r
58+
where r.run_id in (select run_id from tasks)
59+
),
60+
deps as (
61+
select
62+
st.run_id,
63+
st.step_slug,
64+
dep.dep_slug,
65+
dep_task.output as dep_output
66+
from tasks st
67+
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
68+
join pgflow.step_tasks dep_task on
69+
dep_task.run_id = st.run_id and
70+
dep_task.step_slug = dep.dep_slug and
71+
dep_task.status = 'completed'
72+
),
73+
deps_outputs as (
74+
select
75+
d.run_id,
76+
d.step_slug,
77+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
78+
from deps d
79+
group by d.run_id, d.step_slug
80+
),
81+
timeouts as (
82+
select
83+
task.message_id,
84+
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
85+
from tasks task
86+
join pgflow.flows flow on flow.flow_slug = task.flow_slug
87+
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
2188
)
22-
),
23-
tasks as (
24-
select
25-
task.flow_slug,
26-
task.run_id,
27-
task.step_slug,
28-
task.task_index,
29-
task.message_id
30-
from pgflow.step_tasks as task
31-
join read_messages as message on message.msg_id = task.message_id
32-
where task.message_id = message.msg_id
33-
and task.status = 'queued'
34-
),
35-
increment_attempts as (
36-
update pgflow.step_tasks
37-
set attempts_count = attempts_count + 1
38-
from tasks
39-
where step_tasks.message_id = tasks.message_id
40-
and status = 'queued'
41-
),
42-
runs as (
43-
select
44-
r.run_id,
45-
r.input
46-
from pgflow.runs r
47-
where r.run_id in (select run_id from tasks)
48-
),
49-
deps as (
5089
select
90+
st.flow_slug,
5191
st.run_id,
5292
st.step_slug,
53-
dep.dep_slug,
54-
dep_task.output as dep_output
93+
jsonb_build_object('run', r.input) ||
94+
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
95+
st.message_id as msg_id
5596
from tasks st
56-
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
57-
join pgflow.step_tasks dep_task on
58-
dep_task.run_id = st.run_id and
59-
dep_task.step_slug = dep.dep_slug and
60-
dep_task.status = 'completed'
61-
),
62-
deps_outputs as (
63-
select
64-
d.run_id,
65-
d.step_slug,
66-
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
67-
from deps d
68-
group by d.run_id, d.step_slug
69-
),
70-
timeouts as (
71-
select
72-
task.message_id,
73-
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
74-
from tasks task
75-
join pgflow.flows flow on flow.flow_slug = task.flow_slug
76-
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
77-
)
78-
79-
select
80-
st.flow_slug,
81-
st.run_id,
82-
st.step_slug,
83-
jsonb_build_object('run', r.input) ||
84-
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
85-
st.message_id as msg_id
86-
from tasks st
87-
join runs r on st.run_id = r.run_id
88-
left join deps_outputs dep_out on
89-
dep_out.run_id = st.run_id and
90-
dep_out.step_slug = st.step_slug
91-
cross join lateral (
92-
-- TODO: this is slow because it calls set_vt for each row, and set_vt
93-
-- builds dynamic query from string every time it is called
94-
-- implement set_vt_batch(msgs_ids bigint[], vt_delays int[])
95-
select pgmq.set_vt(queue_name, st.message_id,
96-
(select t.vt_delay from timeouts t where t.message_id = st.message_id)
97-
)
98-
) set_vt;
99-
100-
$$ language sql;
97+
join runs r on st.run_id = r.run_id
98+
left join deps_outputs dep_out on
99+
dep_out.run_id = st.run_id and
100+
dep_out.step_slug = st.step_slug
101+
cross join lateral (
102+
-- TODO: this is slow because it calls set_vt for each row, and set_vt
103+
-- builds dynamic query from string every time it is called
104+
-- implement set_vt_batch(msgs_ids bigint[], vt_delays int[])
105+
select pgmq.set_vt(queue_name, st.message_id,
106+
(select t.vt_delay from timeouts t where t.message_id = st.message_id)
107+
)
108+
) set_vt;
109+
end;
110+
$$ language plpgsql;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
-- Modify "poll_for_tasks" function
2+
CREATE OR REPLACE FUNCTION "pgflow"."poll_for_tasks" ("queue_name" text, "vt" integer, "qty" integer, "max_poll_seconds" integer DEFAULT 5, "poll_interval_ms" integer DEFAULT 100) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE plpgsql SET "search_path" = '' AS $$
3+
declare
4+
msg_ids bigint[];
5+
begin
6+
-- First statement: Read messages and capture their IDs
7+
-- This gets its own snapshot and can see newly committed messages
8+
select array_agg(msg_id)
9+
into msg_ids
10+
from pgflow.read_with_poll(
11+
queue_name,
12+
vt,
13+
qty,
14+
max_poll_seconds,
15+
poll_interval_ms
16+
);
17+
18+
-- If no messages were read, return empty set
19+
if msg_ids is null or array_length(msg_ids, 1) is null then
20+
return;
21+
end if;
22+
23+
-- Second statement: Process tasks with fresh snapshot
24+
-- This can now see step_tasks that were committed during the poll
25+
return query
26+
with tasks as (
27+
select
28+
task.flow_slug,
29+
task.run_id,
30+
task.step_slug,
31+
task.task_index,
32+
task.message_id
33+
from pgflow.step_tasks as task
34+
where task.message_id = any(msg_ids)
35+
and task.status = 'queued'
36+
),
37+
increment_attempts as (
38+
update pgflow.step_tasks
39+
set attempts_count = attempts_count + 1
40+
from tasks
41+
where step_tasks.message_id = tasks.message_id
42+
and status = 'queued'
43+
),
44+
runs as (
45+
select
46+
r.run_id,
47+
r.input
48+
from pgflow.runs r
49+
where r.run_id in (select run_id from tasks)
50+
),
51+
deps as (
52+
select
53+
st.run_id,
54+
st.step_slug,
55+
dep.dep_slug,
56+
dep_task.output as dep_output
57+
from tasks st
58+
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
59+
join pgflow.step_tasks dep_task on
60+
dep_task.run_id = st.run_id and
61+
dep_task.step_slug = dep.dep_slug and
62+
dep_task.status = 'completed'
63+
),
64+
deps_outputs as (
65+
select
66+
d.run_id,
67+
d.step_slug,
68+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
69+
from deps d
70+
group by d.run_id, d.step_slug
71+
),
72+
timeouts as (
73+
select
74+
task.message_id,
75+
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
76+
from tasks task
77+
join pgflow.flows flow on flow.flow_slug = task.flow_slug
78+
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
79+
)
80+
select
81+
st.flow_slug,
82+
st.run_id,
83+
st.step_slug,
84+
jsonb_build_object('run', r.input) ||
85+
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
86+
st.message_id as msg_id
87+
from tasks st
88+
join runs r on st.run_id = r.run_id
89+
left join deps_outputs dep_out on
90+
dep_out.run_id = st.run_id and
91+
dep_out.step_slug = st.step_slug
92+
cross join lateral (
93+
-- TODO: this is slow because it calls set_vt for each row, and set_vt
94+
-- builds dynamic query from string every time it is called
95+
-- implement set_vt_batch(msgs_ids bigint[], vt_delays int[])
96+
select pgmq.set_vt(queue_name, st.message_id,
97+
(select t.vt_delay from timeouts t where t.message_id = st.message_id)
98+
)
99+
) set_vt;
100+
end;
101+
$$;
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
h1:5sJG/Yil5GcUJAlskLtmre1qrjjUGcdEHZ2MNdsYrtE=
1+
h1:pehH0TtRVi2zy/qfwKzryU0kiMAI8oUUa8pzE1IjTps=
22
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
3+
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=

0 commit comments

Comments
 (0)