Skip to content

Commit 9d02635

Browse files
authored
Add way to prune old records (#118)
* feat: add SQL function and tests for pruning old records from pgflow tables Introduce a stored procedure to clean up old flow-related records based on retention days. Add comprehensive test suite to verify pruning logic, timestamps updates, and record counts. * feat: add and update SQL scripts for pruning old records in pgflow - Removed unnecessary file references from SQL comments - Updated test SQL to reflect correct worker counts and record states - Enhanced pruning function with comprehensive deletion logic for multiple tables - Added documentation for scheduling and manual execution of pruning - Included performance tip for large tables to run during off-peak hours - Created new documentation page explaining how to prune old records and schedule maintenance routines * feat: update documentation and embed SQL pruning function - Added import for Code component in documentation - Embedded SQL code for prune_old_records function into the documentation - Updated instructions and comments within the SQL function - Included a performance tip for large tables These changes enhance the documentation clarity and provide the full SQL implementation for record pruning functionality. * refactor: update prune_old_records function to simplify deletion logic and improve testing - Replaced complex CTE-based deletion with direct DELETE statements for each table - Removed redundant return values, now the function returns void - Enhanced test SQL to better simulate old and recent records, including timestamp updates - Added tests to verify pruning behavior, especially for still-running flows - Improved clarity and maintainability of the pruning procedure and associated tests * feat: extend pruning function to include archived messages from PGMQ tables - Prune old records from pgflow tables including workers, step_tasks, step_states, and runs - Add dynamic deletion of archived messages from pgmq archive tables based on flow slugs - Implement dynamic SQL to handle variable archive table names and ensure proper cleanup * chore(tests): add comprehensive tests for pruning functions and timestamp updates Introduce new SQL test files for pruning archived messages, completed flows, failed flows, and workers. Add tests that verify pruning logic based on timestamps, flow states, and flow statuses. Implement setup, execution, and validation steps to ensure correct pruning behavior across scenarios. * test: update pruning test to account for additional worker and adjust expectations Enhance the prune_workers.test.sql to include a new worker at the exact cutoff time, modify the initial worker count, and update assertions to reflect the correct number of remaining workers after pruning. Also, add a check to ensure the worker at the cutoff is not pruned. These changes improve test coverage for boundary conditions. * chore: update test helpers, add SQL path, and improve migration concatenation - Added sql_paths for test_helpers.sql in config.toml - Renamed seed.sql to test_helpers.sql and updated related references - Enhanced test helper functions for setting flow timestamps in SQL scripts - Updated test setup scripts to use raw SQL files for pruning tests - Fixed import path in documentation to load correct SQL file - Modified migration concatenation script to include test_helpers.sql - Improved test SQL files with consistent formatting and comments * fix(supabase): correct SQL function for pruning old records and archive tables Refactors the prune_old_records function to fix syntax issues, improve table existence checks, and ensure proper deletion of outdated data across multiple related tables and archive tables. Also updates the default retention period parameter. * feat: add documentation and implementation options for pruning old records Update the docs to clarify the purpose and usage of the pgflow.prune_old_records function, including examples and notes on its current status. Introduce scheduling options with pg_cron and manual execution steps. Include the SQL function code as a tip and add performance advice. * refactor: rename prune_old_records to prune_data_older_than in tests and documentation Update all test scripts, SQL imports, and documentation references to use the new function name for clarity and consistency. * chore: update pruning function to accept interval parameter and improve documentation - Modified pgflow.prune_data_older_than to use an INTERVAL parameter instead of days - Updated related SQL functions and migration snippets for flexible data retention - Enhanced documentation with examples and scheduling instructions for automated pruning - Adjusted test scripts to use make_interval for better clarity and flexibility - Added notes on performance considerations and manual setup guidance * fix(tests): update interval parameter to use make_interval in pruning tests - Changed calls to prune_data_older_than from passing a numeric value to using make_interval(days => 30 or 7) - Ensured consistency across multiple test files for pruning functions - Preserved rollback commands and test structure without modifications
1 parent 93a8106 commit 9d02635

File tree

10 files changed

+768
-2
lines changed

10 files changed

+768
-2
lines changed

pkgs/core/supabase/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ major_version = 15
88
# disable unused features
99
[db.seed]
1010
enabled = true
11+
sql_paths = [
12+
"test_helpers.sql",
13+
]
1114

1215
[api]
1316
port = 50421 # Add custom port for Kong

pkgs/core/supabase/seed.sql renamed to pkgs/core/supabase/test_helpers.sql

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,107 @@ BEGIN
200200
);
201201
END;
202202
$$ language plpgsql;
203+
204+
/**
205+
* Helper functions for pruning tests.
206+
* Contains basic setup code that would otherwise be repeated.
207+
*/
208+
209+
-- Set timestamps for completed runs/steps/tasks to be older than the cutoff
210+
create or replace function pgflow_tests.set_completed_flow_timestamps(
211+
p_flow_slug text,
212+
days_old integer
213+
) returns void language plpgsql as $$
214+
begin
215+
-- Set timestamps for step_tasks
216+
update pgflow.step_tasks
217+
set
218+
queued_at = now() - interval '1 day' - (days_old * interval '1 day'),
219+
completed_at = now() - (days_old * interval '1 day'),
220+
status = 'completed'
221+
where flow_slug = p_flow_slug;
222+
223+
-- Set timestamps for step_states
224+
update pgflow.step_states
225+
set
226+
created_at = now() - interval '2 days' - (days_old * interval '1 day'),
227+
started_at = now() - interval '1 day' - (days_old * interval '1 day'),
228+
completed_at = now() - (days_old * interval '1 day'),
229+
status = 'completed',
230+
remaining_tasks = 0
231+
where flow_slug = p_flow_slug;
232+
233+
-- Set timestamps for runs
234+
update pgflow.runs
235+
set
236+
started_at = now() - interval '2 days' - (days_old * interval '1 day'),
237+
completed_at = now() - (days_old * interval '1 day'),
238+
status = 'completed',
239+
remaining_steps = 0
240+
where flow_slug = p_flow_slug;
241+
end;
242+
$$;
243+
244+
-- Set timestamps for failed runs/steps/tasks to be older than the cutoff
245+
create or replace function pgflow_tests.set_failed_flow_timestamps(
246+
p_flow_slug text,
247+
days_old integer
248+
) returns void language plpgsql as $$
249+
begin
250+
-- Set timestamps for step_tasks
251+
update pgflow.step_tasks
252+
set
253+
queued_at = now() - interval '1 day' - (days_old * interval '1 day'),
254+
failed_at = now() - (days_old * interval '1 day'),
255+
status = 'failed',
256+
error_message = 'Test failure'
257+
where flow_slug = p_flow_slug;
258+
259+
-- Set timestamps for step_states
260+
update pgflow.step_states
261+
set
262+
created_at = now() - interval '2 days' - (days_old * interval '1 day'),
263+
started_at = now() - interval '1 day' - (days_old * interval '1 day'),
264+
failed_at = now() - (days_old * interval '1 day'),
265+
status = 'failed'
266+
where flow_slug = p_flow_slug;
267+
268+
-- Set timestamps for runs
269+
update pgflow.runs
270+
set
271+
started_at = now() - interval '2 days' - (days_old * interval '1 day'),
272+
failed_at = now() - (days_old * interval '1 day'),
273+
status = 'failed'
274+
where flow_slug = p_flow_slug;
275+
end;
276+
$$;
277+
278+
-- Set timestamps for running flows to be older than the cutoff
279+
create or replace function pgflow_tests.set_running_flow_timestamps(
280+
p_flow_slug text,
281+
days_old integer
282+
) returns void language plpgsql as $$
283+
begin
284+
-- Set timestamps for step_tasks
285+
update pgflow.step_tasks
286+
set
287+
queued_at = now() - (days_old * interval '1 day'),
288+
status = 'queued'
289+
where flow_slug = p_flow_slug;
290+
291+
-- Set timestamps for step_states
292+
update pgflow.step_states
293+
set
294+
created_at = now() - (days_old * interval '1 day'),
295+
started_at = now() - (days_old * interval '1 day'),
296+
status = 'started'
297+
where flow_slug = p_flow_slug;
298+
299+
-- Set timestamps for runs
300+
update pgflow.runs
301+
set
302+
started_at = now() - (days_old * interval '1 day'),
303+
status = 'started'
304+
where flow_slug = p_flow_slug;
305+
end;
306+
$$;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Prunes old records from pgflow tables and PGMQ archive tables.
3+
*
4+
* @param retention_interval - Interval of recent records to keep (e.g., interval '28 days', interval '3 months')
5+
*/
6+
create or replace function pgflow.prune_data_older_than(
7+
retention_interval INTERVAL
8+
) returns void language plpgsql as $$
9+
DECLARE
10+
cutoff_timestamp TIMESTAMPTZ := now() - retention_interval;
11+
flow_record RECORD;
12+
archive_table TEXT;
13+
dynamic_sql TEXT;
14+
BEGIN
15+
-- Delete old worker records
16+
DELETE FROM pgflow.workers
17+
WHERE last_heartbeat_at < cutoff_timestamp;
18+
19+
-- Delete old step_tasks records
20+
DELETE FROM pgflow.step_tasks
21+
WHERE (
22+
(completed_at IS NOT NULL AND completed_at < cutoff_timestamp) OR
23+
(failed_at IS NOT NULL AND failed_at < cutoff_timestamp)
24+
);
25+
26+
-- Delete old step_states records
27+
DELETE FROM pgflow.step_states
28+
WHERE (
29+
(completed_at IS NOT NULL AND completed_at < cutoff_timestamp) OR
30+
(failed_at IS NOT NULL AND failed_at < cutoff_timestamp)
31+
);
32+
33+
-- Delete old runs records
34+
DELETE FROM pgflow.runs
35+
WHERE (
36+
(completed_at IS NOT NULL AND completed_at < cutoff_timestamp) OR
37+
(failed_at IS NOT NULL AND failed_at < cutoff_timestamp)
38+
);
39+
40+
-- Prune archived messages from PGMQ archive tables (pgmq.a_{flow_slug})
41+
-- For each flow, delete old archived messages
42+
FOR flow_record IN SELECT DISTINCT flow_slug FROM pgflow.flows
43+
LOOP
44+
-- Build the archive table name
45+
archive_table := pgmq.format_table_name(flow_record.flow_slug, 'a');
46+
47+
-- Check if the archive table exists
48+
IF EXISTS (
49+
SELECT 1 FROM information_schema.tables
50+
WHERE table_schema = 'pgmq' AND table_name = archive_table
51+
) THEN
52+
-- Build and execute a dynamic SQL statement to delete old archive records
53+
dynamic_sql := format('
54+
DELETE FROM pgmq.%I
55+
WHERE archived_at < $1
56+
', archive_table);
57+
58+
EXECUTE dynamic_sql USING cutoff_timestamp;
59+
END IF;
60+
END LOOP;
61+
END
62+
$$;
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
begin;
2+
select plan(4);
3+
select pgflow_tests.reset_db();
4+
5+
-- Load the prune_data_older_than function
6+
\i _shared/prune_data_older_than.sql.raw
7+
8+
-- Create test flows with sequential structure to ensure message archiving
9+
select pgflow_tests.setup_flow('sequential');
10+
11+
-- Start the flow and complete the first step to create an archived message
12+
select pgflow.start_flow('sequential', '{"test": true}'::JSONB);
13+
14+
-- Get the run_id for the first flow
15+
\set run_id_1 `echo "SELECT run_id FROM pgflow.runs LIMIT 1;" | psql -t -A`
16+
17+
-- Complete the first task using poll_and_complete to ensure proper handling
18+
select pgflow_tests.poll_and_complete('sequential');
19+
20+
-- Verify that a message was archived
21+
select is(
22+
(select count(*)::INT from pgmq.a_sequential),
23+
1::INT,
24+
'One message should be archived initially'
25+
);
26+
27+
-- Create a second flow and complete its first task to archive another message
28+
select pgflow.start_flow('sequential', '{"test": "second run"}'::JSONB);
29+
select pgflow_tests.poll_and_complete('sequential');
30+
31+
-- Now we have two archived messages
32+
select is(
33+
(select count(*)::INT from pgmq.a_sequential),
34+
2::INT,
35+
'Two messages should be archived'
36+
);
37+
38+
-- Set different timestamps for the archived messages
39+
-- Make one message old (31 days)
40+
update pgmq.a_sequential
41+
set archived_at = now() - INTERVAL '31 days'
42+
where msg_id = (select min(msg_id) from pgmq.a_sequential);
43+
44+
-- Leave the other message recent (5 days)
45+
update pgmq.a_sequential
46+
set archived_at = now() - INTERVAL '5 days'
47+
where msg_id = (select max(msg_id) from pgmq.a_sequential);
48+
49+
-- Prune with 30-day retention
50+
select pgflow.prune_data_older_than(make_interval(days => 30));
51+
52+
-- TEST: Only the old archived message should be pruned
53+
select is(
54+
(select count(*)::INT from pgmq.a_sequential),
55+
1::INT,
56+
'Only one message should remain after pruning'
57+
);
58+
59+
-- TEST: Check that the remaining message is the recent one
60+
select is(
61+
(
62+
select (extract(day from now() - archived_at) < 10)::BOOLEAN
63+
from pgmq.a_sequential
64+
),
65+
true,
66+
'The remaining message should be the recent one'
67+
);
68+
69+
select finish();
70+
rollback;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
begin;
2+
select plan(4);
3+
select pgflow_tests.reset_db();
4+
5+
-- Load the prune_data_older_than function
6+
\i _shared/prune_data_older_than.sql.raw
7+
8+
-- Create test flows
9+
select pgflow.create_flow('old_completed_flow', max_attempts => 0);
10+
select pgflow.add_step('old_completed_flow', 'step');
11+
12+
select pgflow.create_flow('recent_completed_flow', max_attempts => 0);
13+
select pgflow.add_step('recent_completed_flow', 'step');
14+
15+
select pgflow.create_flow('running_old_flow', max_attempts => 0);
16+
select pgflow.add_step('running_old_flow', 'step');
17+
18+
-- Start and complete flows
19+
select pgflow.start_flow('old_completed_flow', '{}'::jsonb);
20+
select pgflow_tests.poll_and_complete('old_completed_flow');
21+
22+
select pgflow.start_flow('recent_completed_flow', '{}'::jsonb);
23+
select pgflow_tests.poll_and_complete('recent_completed_flow');
24+
25+
select pgflow.start_flow('running_old_flow', '{}'::jsonb);
26+
27+
-- Set timestamps to simulate age
28+
-- Old completed flow: 31 days old
29+
select pgflow_tests.set_completed_flow_timestamps('old_completed_flow', 31);
30+
31+
-- Recent completed flow: 25 days old (within 30-day retention)
32+
select pgflow_tests.set_completed_flow_timestamps('recent_completed_flow', 25);
33+
34+
-- Running flow: 40 days old but still running (should not be pruned)
35+
select pgflow_tests.set_running_flow_timestamps('running_old_flow', 40);
36+
37+
-- Verify setup
38+
select is(
39+
(select count(*) from pgflow.runs),
40+
3::bigint,
41+
'Should have 3 runs before pruning'
42+
);
43+
44+
-- Prune old records with 30-day retention
45+
select pgflow.prune_data_older_than(make_interval(days => 30));
46+
47+
-- TEST: Only the old completed flow should be pruned
48+
select is(
49+
(select array_agg(flow_slug order by flow_slug) from pgflow.runs),
50+
array['recent_completed_flow', 'running_old_flow'],
51+
'Only old completed flow should be pruned'
52+
);
53+
54+
-- TEST: Step states for old completed flow should be pruned
55+
select is(
56+
(select count(*) from pgflow.step_states where flow_slug = 'old_completed_flow'),
57+
0::bigint,
58+
'Step states for old completed flow should be pruned'
59+
);
60+
61+
-- TEST: Step tasks for old completed flow should be pruned
62+
select is(
63+
(select count(*) from pgflow.step_tasks where flow_slug = 'old_completed_flow'),
64+
0::bigint,
65+
'Step tasks for old completed flow should be pruned'
66+
);
67+
68+
select finish();
69+
rollback;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
begin;
2+
select plan(4);
3+
select pgflow_tests.reset_db();
4+
5+
-- Load the prune_data_older_than function
6+
\i _shared/prune_data_older_than.sql.raw
7+
8+
-- Create test flows
9+
select pgflow.create_flow('old_failed_flow', max_attempts => 0);
10+
select pgflow.add_step('old_failed_flow', 'step');
11+
12+
select pgflow.create_flow('recent_failed_flow', max_attempts => 0);
13+
select pgflow.add_step('recent_failed_flow', 'step');
14+
15+
select pgflow.create_flow('running_old_flow', max_attempts => 0);
16+
select pgflow.add_step('running_old_flow', 'step');
17+
18+
-- Start and fail flows
19+
select pgflow.start_flow('old_failed_flow', '{}'::jsonb);
20+
select pgflow_tests.poll_and_fail('old_failed_flow');
21+
22+
select pgflow.start_flow('recent_failed_flow', '{}'::jsonb);
23+
select pgflow_tests.poll_and_fail('recent_failed_flow');
24+
25+
select pgflow.start_flow('running_old_flow', '{}'::jsonb);
26+
27+
-- Set timestamps to simulate age
28+
-- Old failed flow: 31 days old
29+
select pgflow_tests.set_failed_flow_timestamps('old_failed_flow', 31);
30+
31+
-- Recent failed flow: 25 days old (within 30-day retention)
32+
select pgflow_tests.set_failed_flow_timestamps('recent_failed_flow', 25);
33+
34+
-- Running flow: 40 days old but still running (should not be pruned)
35+
select pgflow_tests.set_running_flow_timestamps('running_old_flow', 40);
36+
37+
-- Verify setup
38+
select is(
39+
(select count(*) from pgflow.runs),
40+
3::bigint,
41+
'Should have 3 runs before pruning'
42+
);
43+
44+
-- Prune old records with 30-day retention
45+
select pgflow.prune_data_older_than(make_interval(days => 30));
46+
47+
-- TEST: Only the old failed flow should be pruned
48+
select is(
49+
(select array_agg(flow_slug order by flow_slug) from pgflow.runs),
50+
array['recent_failed_flow', 'running_old_flow'],
51+
'Only old failed flow should be pruned'
52+
);
53+
54+
-- TEST: Step states for old failed flow should be pruned
55+
select is(
56+
(select count(*) from pgflow.step_states where flow_slug = 'old_failed_flow'),
57+
0::bigint,
58+
'Step states for old failed flow should be pruned'
59+
);
60+
61+
-- TEST: Step tasks for old failed flow should be pruned
62+
select is(
63+
(select count(*) from pgflow.step_tasks where flow_slug = 'old_failed_flow'),
64+
0::bigint,
65+
'Step tasks for old failed flow should be pruned'
66+
);
67+
68+
select finish();
69+
rollback;

0 commit comments

Comments
 (0)