Skip to content

fix: configure sequences after import #946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 71 additions & 12 deletions internal/controller/system/state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/uptrace/bun"
"sync"
)

type controllerFacade struct {
ledgercontroller.Controller
mu sync.RWMutex
ledger ledger.Ledger
}

func (c *controllerFacade) handleState(ctx context.Context, dryRun bool, fn func(ctrl ledgercontroller.Controller) error) error {
if dryRun || c.ledger.State == ledger.StateInUse {
c.mu.RLock()
l := c.ledger
c.mu.RUnlock()

if l.State == ledger.StateInUse {
return fn(c.Controller)
}

Expand All @@ -30,24 +36,77 @@
}()

if err := withLock(ctx, ctrl, func(ctrl ledgercontroller.Controller, conn bun.IDB) error {
return fn(ctrl)

// todo: remove that in a later version
ret, err := tx.NewUpdate().
Model(&l).
Set("state = ?", ledger.StateInUse).
Where("id = ? and state = ?", l.ID, ledger.StateInitializing).
Exec(ctx)
if err != nil {
return err
}

Check warning on line 48 in internal/controller/system/state_tracker.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/system/state_tracker.go#L47-L48

Added lines #L47 - L48 were not covered by tests

rowsAffected, err := ret.RowsAffected()
if err != nil {
return err
}

Check warning on line 53 in internal/controller/system/state_tracker.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/system/state_tracker.go#L52-L53

Added lines #L52 - L53 were not covered by tests

if rowsAffected > 0 {
_, err := tx.NewRaw(
fmt.Sprintf(`
select setval(
'"%s"."transaction_id_%d"',
(
select max(id) from "%s".transactions where ledger = '%s'
)::bigint
)
`, l.Bucket, l.ID, l.Bucket, l.Name),
).Exec(ctx)
if err != nil {
return fmt.Errorf("failed to update transactions sequence value: %w", err)
}

Check warning on line 68 in internal/controller/system/state_tracker.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/system/state_tracker.go#L67-L68

Added lines #L67 - L68 were not covered by tests

_, err = tx.NewRaw(
fmt.Sprintf(`
select setval(
'"%s"."log_id_%d"',
(
select max(id) from "%s".logs where ledger = '%s'
)::bigint
)
`, l.Bucket, l.ID, l.Bucket, l.Name),
).Exec(ctx)
if err != nil {
return fmt.Errorf("failed to update logs sequence value: %w", err)
}

Check warning on line 82 in internal/controller/system/state_tracker.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/system/state_tracker.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}

if err := fn(ctrl); err != nil {
return err
}

return nil
}); err != nil {
return err
}

c.ledger.State = ledger.StateInUse
if !dryRun {
if err := ctrl.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

Check warning on line 97 in internal/controller/system/state_tracker.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/system/state_tracker.go#L96-L97

Added lines #L96 - L97 were not covered by tests

// todo: remove that in a later version
_, err = tx.NewUpdate().
Model(&c.ledger).
Set("state = ?", c.ledger.State).
Where("id = ?", c.ledger.ID).
Exec(ctx)
if err != nil {
return err
c.mu.Lock()
c.ledger.State = ledger.StateInUse
c.mu.Unlock()
} else {
if err := ctrl.Rollback(ctx); err != nil {
return fmt.Errorf("failed to rollback transaction: %w", err)
}

Check warning on line 105 in internal/controller/system/state_tracker.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/system/state_tracker.go#L104-L105

Added lines #L104 - L105 were not covered by tests
}

return ctrl.Commit(ctx)

return nil
}

func (c *controllerFacade) CreateTransaction(ctx context.Context, parameters ledgercontroller.Parameters[ledgercontroller.CreateTransaction]) (*ledger.Log, *ledger.CreatedTransaction, error) {
Expand Down
38 changes: 36 additions & 2 deletions test/e2e/api_ledgers_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,39 @@ var _ = Context("Ledger engine tests", func() {
Expect(err).To(Succeed())

Expect(accountsFromOriginalLedger.V2AccountsCursorResponse.Cursor.Data).To(Equal(accountsFromNewLedger.V2AccountsCursorResponse.Cursor.Data))

By("Checking sequence restoration by creating a new transaction with dry run", func() {
tx, err := Wait(specContext, DeferClient(testServer)).Ledger.V2.CreateTransaction(ctx, operations.V2CreateTransactionRequest{
Ledger: createLedgerRequest.Ledger,
DryRun: pointer.For(true),
V2PostTransaction: components.V2PostTransaction{
Postings: []components.V2Posting{{
Source: "world",
Destination: "dst",
Asset: "USD",
Amount: big.NewInt(100),
}},
},
})
Expect(err).To(BeNil())
Expect(tx.V2CreateTransactionResponse.Data.ID.Uint64()).To(Equal(transactionsFromOriginalLedger.V2TransactionsCursorResponse.Cursor.Data[0].ID.Uint64() + 1))
})

By("Checking sequence restoration by creating a new transaction", func() {
tx, err := Wait(specContext, DeferClient(testServer)).Ledger.V2.CreateTransaction(ctx, operations.V2CreateTransactionRequest{
Ledger: createLedgerRequest.Ledger,
V2PostTransaction: components.V2PostTransaction{
Postings: []components.V2Posting{{
Source: "world",
Destination: "dst",
Asset: "USD",
Amount: big.NewInt(100),
}},
},
})
Expect(err).To(BeNil())
Expect(tx.V2CreateTransactionResponse.Data.ID.Uint64()).To(Equal(transactionsFromOriginalLedger.V2TransactionsCursorResponse.Cursor.Data[0].ID.Uint64() + 2))
})
Comment on lines +352 to +383
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix test logic for sequence restoration verification.

The sequence restoration tests are creating transactions on the original ledger (createLedgerRequest.Ledger) instead of the imported ledger (ledgerCopyName). This doesn't verify that the import process correctly restored sequences in the copied ledger.

Additionally, the test assumes transactionsFromOriginalLedger.V2TransactionsCursorResponse.Cursor.Data[0] is the first transaction by ID, but this depends on the ordering of the response data.

Apply this diff to test sequence restoration on the correct ledger and use a more reliable approach:

 							By("Checking sequence restoration by creating a new transaction with dry run", func() {
+								// Get the highest transaction ID from the imported ledger
+								maxTxID := uint64(0)
+								for _, tx := range transactionsFromNewLedger.V2TransactionsCursorResponse.Cursor.Data {
+									if tx.ID.Uint64() > maxTxID {
+										maxTxID = tx.ID.Uint64()
+									}
+								}
+
 								tx, err := Wait(specContext, DeferClient(testServer)).Ledger.V2.CreateTransaction(ctx, operations.V2CreateTransactionRequest{
-									Ledger: createLedgerRequest.Ledger,
+									Ledger: ledgerCopyName,
 									DryRun: pointer.For(true),
 									V2PostTransaction: components.V2PostTransaction{
 										Postings: []components.V2Posting{{
 											Source:      "world",
 											Destination: "dst",
 											Asset:       "USD",
 											Amount:      big.NewInt(100),
 										}},
 									},
 								})
 								Expect(err).To(BeNil())
-								Expect(tx.V2CreateTransactionResponse.Data.ID.Uint64()).To(Equal(transactionsFromOriginalLedger.V2TransactionsCursorResponse.Cursor.Data[0].ID.Uint64() + 1))
+								Expect(tx.V2CreateTransactionResponse.Data.ID.Uint64()).To(Equal(maxTxID + 1))
 							})

 							By("Checking sequence restoration by creating a new transaction", func() {
 								tx, err := Wait(specContext, DeferClient(testServer)).Ledger.V2.CreateTransaction(ctx, operations.V2CreateTransactionRequest{
-									Ledger: createLedgerRequest.Ledger,
+									Ledger: ledgerCopyName,
 									V2PostTransaction: components.V2PostTransaction{
 										Postings: []components.V2Posting{{
 											Source:      "world",
 											Destination: "dst",
 											Asset:       "USD",
 											Amount:      big.NewInt(100),
 										}},
 									},
 								})
 								Expect(err).To(BeNil())
-								Expect(tx.V2CreateTransactionResponse.Data.ID.Uint64()).To(Equal(transactionsFromOriginalLedger.V2TransactionsCursorResponse.Cursor.Data[0].ID.Uint64() + 2))
+								Expect(tx.V2CreateTransactionResponse.Data.ID.Uint64()).To(Equal(maxTxID + 2))
 							})

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In test/e2e/api_ledgers_import_test.go around lines 352 to 383, the test is
incorrectly creating transactions on the original ledger instead of the imported
ledger, which fails to verify sequence restoration on the copied ledger. Update
the ledger field in the CreateTransaction requests to use the imported ledger
identifier (ledgerCopyName). Also, replace the reliance on the first element of
transactionsFromOriginalLedger.V2TransactionsCursorResponse.Cursor.Data for ID
comparison with a more reliable method, such as explicitly determining the
highest existing transaction ID before the test and comparing against that to
ensure correct sequence increment.

})
})
Context("with state to 'in-use'", func() {
Expand Down Expand Up @@ -425,12 +458,13 @@ var _ = Context("Ledger engine tests", func() {
}).Should(Equal(1))

// check postgres locks
// since we have locked the 'logs' table, the insertion of the log must block
// since we have locked the 'logs' table,
// the attempt to define the value of the transaction sequence by reading the max log id block
Eventually(func(g Gomega) int {
count, err := db.NewSelect().
Table("pg_stat_activity").
Where("state <> 'idle' and pid <> pg_backend_pid()").
Where(`query like 'INSERT INTO "_default".logs%'`).
Where(`query like '%select setval%'`).
Count(ctx)
g.Expect(err).To(BeNil())
return count
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/app_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ var _ = Context("Ledger application lifecycle tests", func() {
}).
WithTimeout(10 * time.Second).
// Once all the transactions are in pending state, and since the ledger is still in an 'initializing' state
// we should have countTransactions+1 (+1 for the advisory lock used for logs sync hashing) active advisory locks
Should(BeNumerically("==", countTransactions+1))
// we should have countTransactions active advisory locks
Should(BeNumerically("==", countTransactions))
})
When("restarting the service", func() {
BeforeEach(func(specContext SpecContext) {
Expand Down
Loading