Skip to content

Commit 985176e

Browse files
authored
Add step_index to steps and status timestamps to runtime tables (#89)
* feat: add step_index column, enforce uniqueness, auto-increment, and isolate step indexes per flow - Added step_index column with default 0 and unique constraint within each flow - Updated add_step function to assign step_index automatically based on max existing index - Implemented tests for step_index auto-increment, uniqueness across flows, and flow isolation - Enhanced flow step management to ensure step_index consistency and isolation between flows * refactor: update pgflow migration SQL to add step_index and constraints Renamed migration file for consistency, modified steps table to include step_index with unique constraint, and adjusted add_step function for new schema. Also updated checksum file accordingly. * feat: add timestamp columns and constraints for improved auditability and data integrity - Added created_at columns with default now() to tables: flows, steps, deps, step_states, step_tasks - Included completed_at, failed_at, started_at columns with relevant constraints for timing consistency - Updated task and step state functions to set completed_at, failed_at, and started_at timestamps - Enhanced run status updates to record failed_at timestamps on failure - Improved start ready steps function to timestamp started_at when transitioning steps to 'started' status * fix(timestamps): Ensure timestamp fields are correctly set and validated in various workflows - Corrected SQL constraint syntax in runtime schema - Added comprehensive tests for creation, start, complete, and fail timestamps - Included tests for timestamp fields in flow creation, step start, task completion, and failure - Ensured timestamps are set within expected time bounds during operations - Improved timestamp validation across multiple test scenarios for consistency and accuracy * feat: add timestamps to migration tables and update run completion logic - Renamed migration file for consistency - Enhanced tables with created_at, started_at, completed_at, and failed_at timestamps - Updated pgflow.maybe_complete_run to set completed_at and output - Modified step and task update queries to record timestamps on status changes - Adjusted indexes to include new timestamp columns - Updated test SQL to verify timestamp fields are correctly set during task completion * fix: Correct timestamp constraints and rename initial migration for consistency - Updated timestamp constraints to ensure logical ordering in tables - Renamed initial migration file for clarity and versioning - Adjusted index creation statements to match updated table definitions * fix: update verify-migrations inputs to include migrations directory - Changed the verify-migrations task to depend on the 'migrations' input - Ensures proper cache invalidation and accurate verification process * fix: Correct constraints to prevent both completed_at and failed_at being non-null simultaneously - Updated check constraints in tables to ensure only one of completed_at or failed_at can be set - Renamed migration file for consistency and versioning - Adjusted index creation statements for clarity and correctness - Ensured all constraints align with intended workflow logic and data integrity standards * fix: update failed_at logic in SQL migrations and seed script for consistency - Corrected failed_at assignment to set timestamp only when task has exceeded max attempts - Applied the same fix across multiple migration files to ensure consistent failure handling - Updated seed.sql to reflect task failure behavior in testing setup - Renamed migration files for clarity and versioning consistency - Minor adjustment in test script to use the correct polling function for failure simulation * fix: standardize timestamp comparisons and fix SQL syntax in test scripts - Corrected timestamp comparison operators from '>' to '>=' for consistency - Fixed array syntax in add_step calls to use lowercase 'array' for compatibility - Ensured all test assertions check for timestamps being after or equal to previous ones - Updated SQL queries for better readability and adherence to conventions - Minor formatting adjustments across multiple test scripts to improve consistency * fix: update migration filenames and associated checksum references Renamed the initial pgflow migration file for consistency and updated the checksum in the atlas.sum file to match the new filename, ensuring migration integrity. * test: update SQL test assertions and add missing plus signs for task completion * fix(tests): update task completion to use 'last' status and verify timestamps - Changed 'third' to 'last' in SQL test cases for consistency - Ensured completed_at timestamp is set and failed_at remains null after task completion - Updated related test assertions to reflect the new status value * feat: add step_index and status timestamps to runtime tables - Introduced step_index to steps - Added various status timestamp fields to runtime tables * test: add verification for null timestamps in dependent step states Update SQL tests to ensure that dependent steps further down the line have null values for started_at, completed_at, and failed_at, confirming correct initial state setup in the workflow.
1 parent 09e3210 commit 985176e

27 files changed

+784
-26
lines changed

.changeset/twenty-wasps-admire.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@pgflow/core': patch
3+
---
4+
5+
Add step_index to steps and various status timestamps to runtime tables

pkgs/core/project.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
"verify-migrations": {
4545
"executor": "nx:run-commands",
4646
"dependsOn": ["verify-schemas-synced", "supabase:ensure-started"],
47-
"inputs": ["migrationVerificationCache"],
47+
"inputs": ["migrations", "migrationVerificationCache"],
4848
"outputs": ["{projectRoot}/.nx-inputs/verify-migrations.txt"],
4949
"options": {
5050
"cwd": "{projectRoot}",

pkgs/core/schemas/0050_tables_definitions.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ create table pgflow.flows (
66
opt_max_attempts int not null default 3,
77
opt_base_delay int not null default 1,
88
opt_timeout int not null default 60,
9+
created_at timestamptz not null default now(),
910
constraint slug_is_valid check (pgflow.is_valid_slug(flow_slug)),
1011
constraint opt_max_attempts_is_nonnegative check (opt_max_attempts >= 0),
1112
constraint opt_base_delay_is_nonnegative check (opt_base_delay >= 0),
@@ -17,11 +18,14 @@ create table pgflow.steps (
1718
flow_slug text not null references pgflow.flows (flow_slug),
1819
step_slug text not null,
1920
step_type text not null default 'single',
21+
step_index int not null default 0,
2022
deps_count int not null default 0 check (deps_count >= 0),
2123
opt_max_attempts int,
2224
opt_base_delay int,
2325
opt_timeout int,
26+
created_at timestamptz not null default now(),
2427
primary key (flow_slug, step_slug),
28+
unique (flow_slug, step_index), -- Ensure step_index is unique within a flow
2529
check (pgflow.is_valid_slug(step_slug)),
2630
check (step_type in ('single')),
2731
constraint opt_max_attempts_is_nonnegative check (opt_max_attempts is null or opt_max_attempts >= 0),
@@ -34,6 +38,7 @@ create table pgflow.deps (
3438
flow_slug text not null references pgflow.flows (flow_slug),
3539
dep_slug text not null, -- slug of the dependency
3640
step_slug text not null, -- slug of the dependent
41+
created_at timestamptz not null default now(),
3742
primary key (flow_slug, dep_slug, step_slug),
3843
foreign key (flow_slug, dep_slug)
3944
references pgflow.steps (flow_slug, step_slug),

pkgs/core/schemas/0060_tables_runtime.sql

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@ create table pgflow.runs (
88
input jsonb not null,
99
output jsonb,
1010
remaining_steps int not null default 0 check (remaining_steps >= 0),
11-
check (status in ('started', 'failed', 'completed'))
11+
started_at timestamptz not null default now(),
12+
completed_at timestamptz,
13+
failed_at timestamptz,
14+
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
15+
constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at),
16+
constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at),
17+
constraint status_is_valid check (status in ('started', 'failed', 'completed'))
1218
);
1319

1420
create index if not exists idx_runs_flow_slug on pgflow.runs (flow_slug);
@@ -22,11 +28,19 @@ create table pgflow.step_states (
2228
status text not null default 'created',
2329
remaining_tasks int not null default 1 check (remaining_tasks >= 0),
2430
remaining_deps int not null default 0 check (remaining_deps >= 0),
31+
created_at timestamptz not null default now(),
32+
started_at timestamptz,
33+
completed_at timestamptz,
34+
failed_at timestamptz,
2535
primary key (run_id, step_slug),
2636
foreign key (flow_slug, step_slug)
2737
references pgflow.steps (flow_slug, step_slug),
28-
check (status in ('created', 'started', 'completed', 'failed')),
29-
check (status != 'completed' or remaining_tasks = 0)
38+
constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed')),
39+
constraint status_and_remaining_tasks_match check (status != 'completed' or remaining_tasks = 0),
40+
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
41+
constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at),
42+
constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at),
43+
constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at)
3044
);
3145

3246
create index if not exists idx_step_states_ready on pgflow.step_states (run_id, status, remaining_deps) where status
@@ -46,6 +60,9 @@ create table pgflow.step_tasks (
4660
attempts_count int not null default 0,
4761
error_message text,
4862
output jsonb,
63+
queued_at timestamptz not null default now(),
64+
completed_at timestamptz,
65+
failed_at timestamptz,
4966
constraint step_tasks_pkey primary key (run_id, step_slug, task_index),
5067
foreign key (run_id, step_slug)
5168
references pgflow.step_states (run_id, step_slug),
@@ -56,7 +73,10 @@ create table pgflow.step_tasks (
5673
output is null or status = 'completed'
5774
),
5875
constraint only_single_task_per_step check (task_index = 0),
59-
constraint attempts_count_nonnegative check (attempts_count >= 0)
76+
constraint attempts_count_nonnegative check (attempts_count >= 0),
77+
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
78+
constraint completed_at_is_after_queued_at check (completed_at is null or completed_at >= queued_at),
79+
constraint failed_at_is_after_queued_at check (failed_at is null or failed_at >= queued_at)
6080
);
6181

6282
create index if not exists idx_step_tasks_message_id on pgflow.step_tasks (message_id);

pkgs/core/schemas/0100_function_add_step.sql

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@ set search_path to ''
1212
volatile
1313
as $$
1414
WITH
15+
next_index AS (
16+
SELECT COALESCE(MAX(step_index) + 1, 0) as idx
17+
FROM pgflow.steps
18+
WHERE flow_slug = add_step.flow_slug
19+
),
1520
create_step AS (
16-
INSERT INTO pgflow.steps (flow_slug, step_slug, deps_count, opt_max_attempts, opt_base_delay, opt_timeout)
17-
VALUES (flow_slug, step_slug, COALESCE(array_length(deps_slugs, 1), 0), max_attempts, base_delay, timeout)
21+
INSERT INTO pgflow.steps (flow_slug, step_slug, step_index, deps_count, opt_max_attempts, opt_base_delay, opt_timeout)
22+
SELECT add_step.flow_slug, add_step.step_slug, idx, COALESCE(array_length(deps_slugs, 1), 0), max_attempts, base_delay, timeout
23+
FROM next_index
1824
ON CONFLICT (flow_slug, step_slug)
1925
DO UPDATE SET step_slug = pgflow.steps.step_slug
2026
RETURNING *

pkgs/core/schemas/0100_function_complete_task.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ task AS (
2626
UPDATE pgflow.step_tasks
2727
SET
2828
status = 'completed',
29+
completed_at = now(),
2930
output = complete_task.output
3031
WHERE pgflow.step_tasks.run_id = complete_task.run_id
3132
AND pgflow.step_tasks.step_slug = complete_task.step_slug
@@ -39,6 +40,10 @@ step_state AS (
3940
WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement
4041
ELSE 'started'
4142
END,
43+
completed_at = CASE
44+
WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement
45+
ELSE NULL
46+
END,
4247
remaining_tasks = pgflow.step_states.remaining_tasks - 1
4348
FROM task
4449
WHERE pgflow.step_states.run_id = complete_task.run_id

pkgs/core/schemas/0100_function_fail_task.sql

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ fail_or_retry_task as (
4444
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued'
4545
ELSE 'failed'
4646
END,
47+
failed_at = CASE
48+
WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now()
49+
ELSE NULL
50+
END,
4751
error_message = fail_task.error_message
4852
WHERE task.run_id = fail_task.run_id
4953
AND task.step_slug = fail_task.step_slug
@@ -57,7 +61,11 @@ maybe_fail_step AS (
5761
status = CASE
5862
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed'
5963
ELSE pgflow.step_states.status
60-
END
64+
END,
65+
failed_at = CASE
66+
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now()
67+
ELSE NULL
68+
END
6169
FROM fail_or_retry_task
6270
WHERE pgflow.step_states.run_id = fail_task.run_id
6371
AND pgflow.step_states.step_slug = fail_task.step_slug
@@ -67,7 +75,11 @@ UPDATE pgflow.runs
6775
SET status = CASE
6876
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
6977
ELSE status
70-
END
78+
END,
79+
failed_at = CASE
80+
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
81+
ELSE NULL
82+
END
7183
WHERE pgflow.runs.run_id = fail_task.run_id;
7284

7385
-- For queued tasks: delay the message for retry with exponential backoff

pkgs/core/schemas/0100_function_maybe_complete_run.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ as $$
99
UPDATE pgflow.runs
1010
SET
1111
status = 'completed',
12+
completed_at = now(),
1213
output = (
1314
-- Get outputs from final steps (steps that are not dependencies for other steps)
1415
SELECT jsonb_object_agg(st.step_slug, st.output)

pkgs/core/schemas/0100_function_start_ready_steps.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ WITH ready_steps AS (
1515
),
1616
started_step_states AS (
1717
UPDATE pgflow.step_states
18-
SET status = 'started'
18+
SET status = 'started',
19+
started_at = now()
1920
FROM ready_steps
2021
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
2122
AND pgflow.step_states.step_slug = ready_steps.step_slug

0 commit comments

Comments
 (0)