diff --git a/internal/storage/bucket/bucket.go b/internal/storage/bucket/bucket.go index ac271f91e6..51ca38d18b 100644 --- a/internal/storage/bucket/bucket.go +++ b/internal/storage/bucket/bucket.go @@ -17,6 +17,7 @@ type Bucket interface { IsUpToDate(ctx context.Context, db bun.IDB) (bool, error) GetMigrationsInfo(ctx context.Context, db bun.IDB) ([]migrations.Info, error) IsInitialized(context.Context, bun.IDB) (bool, error) + GetLastVersion(ctx context.Context, db bun.IDB) (int, error) } type Factory interface { diff --git a/internal/storage/bucket/bucket_generated_test.go b/internal/storage/bucket/bucket_generated_test.go index 075824b9cb..6957cba29f 100644 --- a/internal/storage/bucket/bucket_generated_test.go +++ b/internal/storage/bucket/bucket_generated_test.go @@ -52,6 +52,21 @@ func (mr *MockBucketMockRecorder) AddLedger(ctx, db, ledger any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLedger", reflect.TypeOf((*MockBucket)(nil).AddLedger), ctx, db, ledger) } +// GetLastVersion mocks base method. +func (m *MockBucket) GetLastVersion(ctx context.Context, db bun.IDB) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLastVersion", ctx, db) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetLastVersion indicates an expected call of GetLastVersion. +func (mr *MockBucketMockRecorder) GetLastVersion(ctx, db any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastVersion", reflect.TypeOf((*MockBucket)(nil).GetLastVersion), ctx, db) +} + // GetMigrationsInfo mocks base method. func (m *MockBucket) GetMigrationsInfo(ctx context.Context, db bun.IDB) ([]migrations.Info, error) { m.ctrl.T.Helper() diff --git a/internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql b/internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql index 150a5e90f6..92b4d062b1 100644 --- a/internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql +++ b/internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql @@ -12,7 +12,7 @@ select ('{' '"transaction": {' '"id": ' || (seq/5) + (seq % 5) || ',' - '"timestamp": ' || to_json(now()::timestamp without time zone) || ',' + '"timestamp": "' || (to_json(now()::timestamp without time zone)#>>'{}') || 'Z",' '"postings": [' '{' '"destination": "sellers:' || (seq % 5) || '",' @@ -58,7 +58,7 @@ select ('{' '"transaction": {' '"id": ' || (seq/5) + (seq % 5) || ',' - '"timestamp": ' || to_json(now()::timestamp without time zone) || ',' + '"timestamp": "' || (to_json(now()::timestamp without time zone)#>>'{}') || 'Z",' '"postings": [' '{' '"source": "sellers:' || (seq % 5) || '",' diff --git a/internal/storage/bucket/migrations/32-fix-log-data-for-reverted-transactions/up_tests_after.sql b/internal/storage/bucket/migrations/32-fix-log-data-for-reverted-transactions/up_tests_after.sql index 1cbe797fa8..04f17625b2 100644 --- a/internal/storage/bucket/migrations/32-fix-log-data-for-reverted-transactions/up_tests_after.sql +++ b/internal/storage/bucket/migrations/32-fix-log-data-for-reverted-transactions/up_tests_after.sql @@ -1,9 +1,9 @@ do $$ declare expected varchar = '{"transaction": {"id": 22, "metadata": {"tax": "1%"}, "postings": [{"asset": "USD", "amount": 99, "source": "sellers:0", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "fees", "destination": "orders:10"}, {"asset": "USD", "amount": 100, "source": "orders:10", "destination": "world"}, {"asset": "SELL", "amount": 1, "source": "sellers:0", "destination": "world"}], ' || - '"timestamp": ' || - (select to_json(timestamp) from "{{.Schema}}".transactions where id = 22 and ledger = 'ledger0') - || '}, "revertedTransaction": {"id": 2, "metadata": {"tax": "1%"}, "postings": [{"asset": "SELL", "amount": 1, "source": "world", "destination": "sellers:0"}, {"asset": "USD", "amount": 100, "source": "world", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "orders:10", "destination": "fees"}, {"asset": "USD", "amount": 99, "source": "orders:10", "destination": "sellers:0"}], "reverted": true, "reference": null, "timestamp": ' || + '"timestamp": "' || + (select to_json(timestamp)#>>'{}' from "{{.Schema}}".transactions where id = 22 and ledger = 'ledger0') + || 'Z"}, "revertedTransaction": {"id": 2, "metadata": {"tax": "1%"}, "postings": [{"asset": "SELL", "amount": 1, "source": "world", "destination": "sellers:0"}, {"asset": "USD", "amount": 100, "source": "world", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "orders:10", "destination": "fees"}, {"asset": "USD", "amount": 99, "source": "orders:10", "destination": "sellers:0"}], "reverted": true, "reference": null, "timestamp": ' || (select to_json(timestamp) from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') || ', "insertedAt": ' || (select to_json(inserted_at) from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') || diff --git a/internal/storage/bucket/migrations/33-fix-invalid-date-format/notes.yaml b/internal/storage/bucket/migrations/33-fix-invalid-date-format/notes.yaml new file mode 100644 index 0000000000..ff026572c6 --- /dev/null +++ b/internal/storage/bucket/migrations/33-fix-invalid-date-format/notes.yaml @@ -0,0 +1 @@ +name: Fix invalid date format diff --git a/internal/storage/bucket/migrations/33-fix-invalid-date-format/up.sql b/internal/storage/bucket/migrations/33-fix-invalid-date-format/up.sql new file mode 100644 index 0000000000..963a99eb11 --- /dev/null +++ b/internal/storage/bucket/migrations/33-fix-invalid-date-format/up.sql @@ -0,0 +1,69 @@ +do $$ + declare + _offset integer := 0; + _batch_size integer := 1000; + begin + set search_path = '{{ .Schema }}'; + + drop table if exists txs_view; + + create temp table txs_view as + with reversed as ( + select + ledger, + id, + (convert_from(memento, 'UTF-8')::jsonb ->> 'revertedTransactionID')::numeric as revertedTransactionID + from logs + where type = 'REVERTED_TRANSACTION' and data->>'revertedTransactionID' is not null + ) + select reversed.id as log_id, transactions.* + from transactions + join reversed on + reversed.revertedTransactionID = transactions.id and + reversed.ledger = transactions.ledger; + + create index txs_view_idx on txs_view(log_id, id); + + if (select count(*) from txs_view) = 0 then + return; + end if; + + perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from txs_view)); + + loop + with data as ( + select * + from txs_view + order by ledger, log_id, id + offset _offset + limit _batch_size + ) + update logs + set data = data || jsonb_build_object('revertedTransaction', jsonb_build_object( + 'id', data.id, + 'postings', data.postings::jsonb, + 'metadata', data.metadata, + 'reverted', true, + 'revertedAt', to_json(data.reverted_at)#>>'{}' || 'Z', + 'insertedAt', to_json(data.inserted_at)#>>'{}' || 'Z', + 'timestamp', to_json(data.timestamp)#>>'{}' || 'Z', + 'reference', case when data.reference is not null and data.reference <> '' then data.reference end, + 'postCommitVolumes', data.post_commit_volumes + )) + from data + where logs.id = data.log_id and + logs.ledger = data.ledger; + + exit when not found; + + _offset = _offset + _batch_size; + + perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); + + commit; + end loop; + + drop table if exists txs_view; + end +$$; + diff --git a/internal/storage/bucket/migrations/33-fix-invalid-date-format/up_tests_after.sql b/internal/storage/bucket/migrations/33-fix-invalid-date-format/up_tests_after.sql new file mode 100644 index 0000000000..4f4afd2279 --- /dev/null +++ b/internal/storage/bucket/migrations/33-fix-invalid-date-format/up_tests_after.sql @@ -0,0 +1,18 @@ +do $$ + declare + expected varchar = '{"transaction": {"id": 22, "metadata": {"tax": "1%"}, "postings": [{"asset": "USD", "amount": 99, "source": "sellers:0", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "fees", "destination": "orders:10"}, {"asset": "USD", "amount": 100, "source": "orders:10", "destination": "world"}, {"asset": "SELL", "amount": 1, "source": "sellers:0", "destination": "world"}], ' || + '"timestamp": "' || + (select to_json(timestamp)#>>'{}' from "{{.Schema}}".transactions where id = 22 and ledger = 'ledger0') + || 'Z"}, "revertedTransaction": {"id": 2, "metadata": {"tax": "1%"}, "postings": [{"asset": "SELL", "amount": 1, "source": "world", "destination": "sellers:0"}, {"asset": "USD", "amount": 100, "source": "world", "destination": "orders:10"}, {"asset": "USD", "amount": 1, "source": "orders:10", "destination": "fees"}, {"asset": "USD", "amount": 99, "source": "orders:10", "destination": "sellers:0"}], "reverted": true, "reference": null, "timestamp": "' || + (select to_json(timestamp)#>>'{}' from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') || + 'Z", "insertedAt": "' || + (select to_json(inserted_at)#>>'{}' from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') || + 'Z", "revertedAt": "' || + (select to_json(reverted_at)#>>'{}' from "{{.Schema}}".transactions where id = 2 and ledger = 'ledger0') || + 'Z", "postCommitVolumes": {"fees": {"USD": {"input": 3, "output": 0}}, "world": {"USD": {"input": 0, "output": 300}, "SELL": {"input": 0, "output": 3}}, "orders:10": {"USD": {"input": 100, "output": 100}}, "sellers:0": {"USD": {"input": 297, "output": 0}, "SELL": {"input": 3, "output": 0}}}}, "revertedTransactionID": "2"}'; + begin + set search_path = '{{.Schema}}'; + assert (select data::varchar from logs where id = 22 and ledger = 'ledger0') = expected, + 'data should be equals to ' || expected || ' but was ' || (select data::varchar from logs where id = 22 and ledger = 'ledger0'); + end; +$$ \ No newline at end of file diff --git a/internal/storage/bucket/migrations_test.go b/internal/storage/bucket/migrations_test.go index 083cce8bd0..8654ea3d24 100644 --- a/internal/storage/bucket/migrations_test.go +++ b/internal/storage/bucket/migrations_test.go @@ -3,19 +3,24 @@ package bucket_test import ( + "context" "errors" "fmt" "github.com/formancehq/go-libs/v2/bun/bunconnect" + "github.com/formancehq/go-libs/v2/bun/bunpaginate" "github.com/formancehq/go-libs/v2/logging" "github.com/formancehq/go-libs/v2/migrations" "github.com/formancehq/go-libs/v2/pointer" ledger "github.com/formancehq/ledger/internal" + ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "github.com/formancehq/ledger/internal/storage/bucket" + ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" "github.com/formancehq/ledger/internal/storage/system" "github.com/google/uuid" _ "github.com/jackc/pgx/v5/stdlib" "github.com/stretchr/testify/require" "github.com/uptrace/bun/extra/bundebug" + "go.opentelemetry.io/otel/trace/noop" "io/fs" "testing" ) @@ -35,6 +40,7 @@ func TestMigrations(t *testing.T) { bucketName := uuid.NewString()[:8] migrator := bucket.GetMigrator(db, bucketName) + ledgers := make([]ledger.Ledger, 0) for i := 0; i < 5; i++ { l, err := ledger.New(fmt.Sprintf("ledger%d", i), ledger.Configuration{ @@ -42,6 +48,8 @@ func TestMigrations(t *testing.T) { }) require.NoError(t, err) require.NoError(t, system.New(db).CreateLedger(ctx, l)) + + ledgers = append(ledgers, *l) } _, err = bucket.WalkMigrations(bucket.MigrationsFS, func(entry fs.DirEntry) (*struct{}, error) { @@ -79,4 +87,22 @@ func TestMigrations(t *testing.T) { return pointer.For(struct{}{}), nil }) require.NoError(t, err) + + for i := 0; i < 5; i++ { + store := ledgerstore.New(db, bucket.NewDefault(noop.Tracer{}, bucketName), ledgers[i]) + + require.NoError(t, bunpaginate.Iterate( + ctx, + ledgercontroller.ColumnPaginatedQuery[any]{ + PageSize: 100, + Order: pointer.For(bunpaginate.Order(bunpaginate.OrderAsc)), + }, + func(ctx context.Context, q ledgercontroller.ColumnPaginatedQuery[any]) (*bunpaginate.Cursor[ledger.Log], error) { + return store.Logs().Paginate(ctx, q) + }, + func(cursor *bunpaginate.Cursor[ledger.Log]) error { + return nil + }, + )) + } } diff --git a/internal/storage/driver/buckets_generated_test.go b/internal/storage/driver/buckets_generated_test.go index 5cf59a265a..7331e1912c 100644 --- a/internal/storage/driver/buckets_generated_test.go +++ b/internal/storage/driver/buckets_generated_test.go @@ -53,6 +53,21 @@ func (mr *MockBucketMockRecorder) AddLedger(ctx, db, ledger any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLedger", reflect.TypeOf((*MockBucket)(nil).AddLedger), ctx, db, ledger) } +// GetLastVersion mocks base method. +func (m *MockBucket) GetLastVersion(ctx context.Context, db bun.IDB) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLastVersion", ctx, db) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetLastVersion indicates an expected call of GetLastVersion. +func (mr *MockBucketMockRecorder) GetLastVersion(ctx, db any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastVersion", reflect.TypeOf((*MockBucket)(nil).GetLastVersion), ctx, db) +} + // GetMigrationsInfo mocks base method. func (m *MockBucket) GetMigrationsInfo(ctx context.Context, db bun.IDB) ([]migrations.Info, error) { m.ctrl.T.Helper() diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index 654e7d4877..3211c5619d 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -14,13 +14,15 @@ import ( ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" ) +const fillAccountsVolumeHistoryMigration = 21 + func (store *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQuery) (ledgercontroller.Balances, error) { - isUpToDate, err := store.bucket.IsUpToDate(ctx, store.db) + lastVersion, err := store.bucket.GetLastVersion(ctx, store.db) if err != nil { return nil, err } - if isUpToDate { + if lastVersion >= fillAccountsVolumeHistoryMigration { return store.GetBalancesAfterUpgrade(ctx, query) } else { return store.GetBalancesWhenUpgrading(ctx, query)