From 5570936fafaf21f0ea634a6d6daeed509c4a491a Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Fri, 28 Mar 2025 10:12:05 +0100 Subject: [PATCH] fix: invalid pcv --- .../0-init-schema/up_tests_after.sql | 6 ++ .../19-transactions-fill-pcv/up.sql | 35 ++++++----- .../up_tests_after.sql | 5 +- .../migrations/27-fix-invalid-pcv/up.sql | 42 +++++++------ .../28-fix-pcv-missing-asset/notes.yaml | 1 + .../28-fix-pcv-missing-asset/up.sql | 61 +++++++++++++++++++ 6 files changed, 112 insertions(+), 38 deletions(-) create mode 100644 internal/storage/bucket/migrations/28-fix-pcv-missing-asset/notes.yaml create mode 100644 internal/storage/bucket/migrations/28-fix-pcv-missing-asset/up.sql diff --git a/internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql b/internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql index 12170baa57..946180a5ab 100644 --- a/internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql +++ b/internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql @@ -14,6 +14,12 @@ select '"id": ' || (seq/5) + (seq % 5) || ',' '"timestamp": "' || now() || '",' '"postings": [' + '{' + '"destination": "sellers:' || (seq % 5) || '",' + '"source": "world",' + '"asset": "SELL",' + '"amount": 1' + '},' '{' '"source": "world",' '"destination": "orders:' || seq || '",' diff --git a/internal/storage/bucket/migrations/19-transactions-fill-pcv/up.sql b/internal/storage/bucket/migrations/19-transactions-fill-pcv/up.sql index 744aa7415d..d7865df07d 100644 --- a/internal/storage/bucket/migrations/19-transactions-fill-pcv/up.sql +++ b/internal/storage/bucket/migrations/19-transactions-fill-pcv/up.sql @@ -8,21 +8,25 @@ do $$ drop table if exists moves_view; create temp table moves_view as - select transactions_id::numeric, public.aggregate_objects(json_build_object(accounts_address, json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs)))::jsonb) as volumes + select transactions_seq, public.aggregate_objects(jsonb_build_object(accounts_address, volumes)) as volumes from ( - SELECT DISTINCT ON (moves.transactions_id, accounts_address, asset) moves.transactions_id, accounts_address, asset, - first_value(post_commit_volumes) OVER ( - PARTITION BY moves.transactions_id, accounts_address, asset + select transactions_seq::numeric, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes + from ( + SELECT DISTINCT ON (moves.transactions_seq, accounts_address, asset) moves.transactions_seq, accounts_address, asset, + first_value(post_commit_volumes) OVER ( + PARTITION BY moves.transactions_seq, accounts_address, asset ORDER BY seq DESC - ) AS post_commit_volumes - FROM moves - where insertion_date < ( - select tstamp from goose_db_version where version_id = 12 - ) - ) moves - group by transactions_id; + ) AS post_commit_volumes + FROM moves + where insertion_date < ( + select tstamp from goose_db_version where version_id = 12 + ) + ) moves + group by transactions_seq, accounts_address + ) data + group by transactions_seq; - create index moves_view_idx on moves_view(transactions_id); + create index moves_view_idx on moves_view(transactions_seq); if (select count(*) from moves_view) = 0 then return; @@ -32,15 +36,15 @@ do $$ loop with data as ( - select transactions_id, volumes + select transactions_seq, volumes from moves_view -- play better than offset/limit - where transactions_id >= _offset and transactions_id < _offset + _batch_size + where transactions_seq >= _offset and transactions_seq < _offset + _batch_size ) update transactions set post_commit_volumes = data.volumes from data - where transactions.id = data.transactions_id; + where transactions.seq = data.transactions_seq; exit when not found; @@ -59,3 +63,4 @@ do $$ not valid; end $$; + diff --git a/internal/storage/bucket/migrations/19-transactions-fill-pcv/up_tests_after.sql b/internal/storage/bucket/migrations/19-transactions-fill-pcv/up_tests_after.sql index 8a1cd7488e..0324ee5751 100644 --- a/internal/storage/bucket/migrations/19-transactions-fill-pcv/up_tests_after.sql +++ b/internal/storage/bucket/migrations/19-transactions-fill-pcv/up_tests_after.sql @@ -1,10 +1,9 @@ do $$ declare - expected varchar = '{"fees": {"USD": {"input": 1, "output": 0}}, "world": {"USD": {"input": 0, "output": 100}}, "orders:0": {"USD": {"input": 100, "output": 100}}, "sellers:0": {"USD": {"input": 99, "output": 0}}}'; + expected varchar = '{"fees": {"USD": {"input": 1, "output": 0}}, "world": {"USD": {"input": 0, "output": 100}, "SELL": {"input": 0, "output": 1}}, "orders:0": {"USD": {"input": 100, "output": 100}}, "sellers:0": {"USD": {"input": 99, "output": 0}, "SELL": {"input": 1, "output": 0}}}'; begin set search_path = '{{.Schema}}'; assert (select post_commit_volumes::varchar from transactions where id = 0) = expected, 'post_commit_volumes should be equals to ' || expected || ' but was ' || (select to_jsonb(post_commit_volumes) from transactions where id = 0); end; -$$ - +$$ \ No newline at end of file diff --git a/internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql b/internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql index a4d4b2cd6a..33d9751a72 100644 --- a/internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql +++ b/internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql @@ -8,24 +8,25 @@ do $$ drop table if exists moves_view; create temp table moves_view as - select transactions_id::numeric, public.aggregate_objects(json_build_object(accounts_address, json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs)))::jsonb) as volumes + select transactions_seq, public.aggregate_objects(jsonb_build_object(accounts_address, volumes)) as volumes from ( - SELECT DISTINCT ON (moves.transactions_id, accounts_address, asset) - moves.transactions_id, - accounts_address, - asset, - first_value(post_commit_volumes) OVER ( - PARTITION BY moves.transactions_id, accounts_address, asset - ORDER BY seq DESC - ) AS post_commit_volumes - FROM moves - where insertion_date < ( - select tstamp from goose_db_version where version_id = 12 - ) - ) moves - group by transactions_id; - - create index moves_view_idx on moves_view(transactions_id); + select transactions_seq::numeric, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes + from ( + SELECT DISTINCT ON (moves.transactions_seq, accounts_address, asset) moves.transactions_seq, accounts_address, asset, + first_value(post_commit_volumes) OVER ( + PARTITION BY moves.transactions_seq, accounts_address, asset + ORDER BY seq DESC + ) AS post_commit_volumes + FROM moves + where insertion_date < ( + select tstamp from goose_db_version where version_id = 12 + ) + ) moves + group by transactions_seq, accounts_address + ) data + group by transactions_seq; + + create index moves_view_idx on moves_view(transactions_seq); if (select count(*) from moves_view) = 0 then return; @@ -35,15 +36,15 @@ do $$ loop with data as ( - select transactions_id, volumes + select transactions_seq, volumes from moves_view -- play better than offset/limit - where transactions_id >= _offset and transactions_id < _offset + _batch_size + where transactions_seq >= _offset and transactions_seq < _offset + _batch_size ) update transactions set post_commit_volumes = data.volumes from data - where transactions.id = data.transactions_id; + where transactions.seq = data.transactions_seq; exit when not found; @@ -57,3 +58,4 @@ do $$ drop table if exists moves_view; end $$; + diff --git a/internal/storage/bucket/migrations/28-fix-pcv-missing-asset/notes.yaml b/internal/storage/bucket/migrations/28-fix-pcv-missing-asset/notes.yaml new file mode 100644 index 0000000000..fe854873f4 --- /dev/null +++ b/internal/storage/bucket/migrations/28-fix-pcv-missing-asset/notes.yaml @@ -0,0 +1 @@ +name: Fill invalid post_commit_volumes (missing asset) diff --git a/internal/storage/bucket/migrations/28-fix-pcv-missing-asset/up.sql b/internal/storage/bucket/migrations/28-fix-pcv-missing-asset/up.sql new file mode 100644 index 0000000000..33d9751a72 --- /dev/null +++ b/internal/storage/bucket/migrations/28-fix-pcv-missing-asset/up.sql @@ -0,0 +1,61 @@ +do $$ + declare + _offset integer := 0; + _batch_size integer := 1000; + begin + set search_path = '{{ .Schema }}'; + + drop table if exists moves_view; + + create temp table moves_view as + select transactions_seq, public.aggregate_objects(jsonb_build_object(accounts_address, volumes)) as volumes + from ( + select transactions_seq::numeric, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes + from ( + SELECT DISTINCT ON (moves.transactions_seq, accounts_address, asset) moves.transactions_seq, accounts_address, asset, + first_value(post_commit_volumes) OVER ( + PARTITION BY moves.transactions_seq, accounts_address, asset + ORDER BY seq DESC + ) AS post_commit_volumes + FROM moves + where insertion_date < ( + select tstamp from goose_db_version where version_id = 12 + ) + ) moves + group by transactions_seq, accounts_address + ) data + group by transactions_seq; + + create index moves_view_idx on moves_view(transactions_seq); + + if (select count(*) from moves_view) = 0 then + return; + end if; + + perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from moves_view)); + + loop + with data as ( + select transactions_seq, volumes + from moves_view + -- play better than offset/limit + where transactions_seq >= _offset and transactions_seq < _offset + _batch_size + ) + update transactions + set post_commit_volumes = data.volumes + from data + where transactions.seq = data.transactions_seq; + + exit when not found; + + _offset = _offset + _batch_size; + + perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); + + commit; + end loop; + + drop table if exists moves_view; + end +$$; +