diff --git a/internal/controller/system/state_tracker.go b/internal/controller/system/state_tracker.go index 14ad13e964..581e2e1d8f 100644 --- a/internal/controller/system/state_tracker.go +++ b/internal/controller/system/state_tracker.go @@ -9,15 +9,21 @@ import ( ledger "github.com/formancehq/ledger/internal" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "github.com/uptrace/bun" + "sync" ) type controllerFacade struct { ledgercontroller.Controller + mu sync.RWMutex ledger ledger.Ledger } func (c *controllerFacade) handleState(ctx context.Context, dryRun bool, fn func(ctrl ledgercontroller.Controller) error) error { - if dryRun || c.ledger.State == ledger.StateInUse { + c.mu.RLock() + l := c.ledger + c.mu.RUnlock() + + if l.State == ledger.StateInUse { return fn(c.Controller) } @@ -30,24 +36,77 @@ func (c *controllerFacade) handleState(ctx context.Context, dryRun bool, fn func }() if err := withLock(ctx, ctrl, func(ctrl ledgercontroller.Controller, conn bun.IDB) error { - return fn(ctrl) + + // todo: remove that in a later version + ret, err := tx.NewUpdate(). + Model(&l). + Set("state = ?", ledger.StateInUse). + Where("id = ? and state = ?", l.ID, ledger.StateInitializing). + Exec(ctx) + if err != nil { + return err + } + + rowsAffected, err := ret.RowsAffected() + if err != nil { + return err + } + + if rowsAffected > 0 { + _, err := tx.NewRaw( + fmt.Sprintf(` + select setval( + '"%s"."transaction_id_%d"', + ( + select max(id) from "%s".transactions where ledger = '%s' + )::bigint + ) + `, l.Bucket, l.ID, l.Bucket, l.Name), + ).Exec(ctx) + if err != nil { + return fmt.Errorf("failed to update transactions sequence value: %w", err) + } + + _, err = tx.NewRaw( + fmt.Sprintf(` + select setval( + '"%s"."log_id_%d"', + ( + select max(id) from "%s".logs where ledger = '%s' + )::bigint + ) + `, l.Bucket, l.ID, l.Bucket, l.Name), + ).Exec(ctx) + if err != nil { + return fmt.Errorf("failed to update logs sequence value: %w", err) + } + } + + if err := fn(ctrl); err != nil { + return err + } + + return nil }); err != nil { return err } - c.ledger.State = ledger.StateInUse + if !dryRun { + if err := ctrl.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } - // todo: remove that in a later version - _, err = tx.NewUpdate(). - Model(&c.ledger). - Set("state = ?", c.ledger.State). - Where("id = ?", c.ledger.ID). - Exec(ctx) - if err != nil { - return err + c.mu.Lock() + c.ledger.State = ledger.StateInUse + c.mu.Unlock() + } else { + if err := ctrl.Rollback(ctx); err != nil { + return fmt.Errorf("failed to rollback transaction: %w", err) + } } - return ctrl.Commit(ctx) + + return nil } func (c *controllerFacade) CreateTransaction(ctx context.Context, parameters ledgercontroller.Parameters[ledgercontroller.CreateTransaction]) (*ledger.Log, *ledger.CreatedTransaction, error) { diff --git a/test/e2e/api_ledgers_import_test.go b/test/e2e/api_ledgers_import_test.go index 9302bfac1c..a2df7b7ea7 100644 --- a/test/e2e/api_ledgers_import_test.go +++ b/test/e2e/api_ledgers_import_test.go @@ -348,6 +348,39 @@ var _ = Context("Ledger engine tests", func() { Expect(err).To(Succeed()) Expect(accountsFromOriginalLedger.V2AccountsCursorResponse.Cursor.Data).To(Equal(accountsFromNewLedger.V2AccountsCursorResponse.Cursor.Data)) + + By("Checking sequence restoration by creating a new transaction with dry run", func() { + tx, err := Wait(specContext, DeferClient(testServer)).Ledger.V2.CreateTransaction(ctx, operations.V2CreateTransactionRequest{ + Ledger: createLedgerRequest.Ledger, + DryRun: pointer.For(true), + V2PostTransaction: components.V2PostTransaction{ + Postings: []components.V2Posting{{ + Source: "world", + Destination: "dst", + Asset: "USD", + Amount: big.NewInt(100), + }}, + }, + }) + Expect(err).To(BeNil()) + Expect(tx.V2CreateTransactionResponse.Data.ID.Uint64()).To(Equal(transactionsFromOriginalLedger.V2TransactionsCursorResponse.Cursor.Data[0].ID.Uint64() + 1)) + }) + + By("Checking sequence restoration by creating a new transaction", func() { + tx, err := Wait(specContext, DeferClient(testServer)).Ledger.V2.CreateTransaction(ctx, operations.V2CreateTransactionRequest{ + Ledger: createLedgerRequest.Ledger, + V2PostTransaction: components.V2PostTransaction{ + Postings: []components.V2Posting{{ + Source: "world", + Destination: "dst", + Asset: "USD", + Amount: big.NewInt(100), + }}, + }, + }) + Expect(err).To(BeNil()) + Expect(tx.V2CreateTransactionResponse.Data.ID.Uint64()).To(Equal(transactionsFromOriginalLedger.V2TransactionsCursorResponse.Cursor.Data[0].ID.Uint64() + 2)) + }) }) }) Context("with state to 'in-use'", func() { @@ -425,12 +458,13 @@ var _ = Context("Ledger engine tests", func() { }).Should(Equal(1)) // check postgres locks - // since we have locked the 'logs' table, the insertion of the log must block + // since we have locked the 'logs' table, + // the attempt to define the value of the transaction sequence by reading the max log id block Eventually(func(g Gomega) int { count, err := db.NewSelect(). Table("pg_stat_activity"). Where("state <> 'idle' and pid <> pg_backend_pid()"). - Where(`query like 'INSERT INTO "_default".logs%'`). + Where(`query like '%select setval%'`). Count(ctx) g.Expect(err).To(BeNil()) return count diff --git a/test/e2e/app_lifecycle_test.go b/test/e2e/app_lifecycle_test.go index 68bfecc1d8..9f4a32b682 100644 --- a/test/e2e/app_lifecycle_test.go +++ b/test/e2e/app_lifecycle_test.go @@ -136,8 +136,8 @@ var _ = Context("Ledger application lifecycle tests", func() { }). WithTimeout(10 * time.Second). // Once all the transactions are in pending state, and since the ledger is still in an 'initializing' state - // we should have countTransactions+1 (+1 for the advisory lock used for logs sync hashing) active advisory locks - Should(BeNumerically("==", countTransactions+1)) + // we should have countTransactions active advisory locks + Should(BeNumerically("==", countTransactions)) }) When("restarting the service", func() { BeforeEach(func(specContext SpecContext) {