From 75bdedaf4cf989ff4f50a7a8d1860cb181af905b Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 16 Jun 2025 12:01:18 +0200 Subject: [PATCH 1/2] fix: transactions paging using effective order --- internal/storage/common/paginator_column.go | 24 +++++++++++++++++---- internal/storage/ledger/store.go | 2 ++ internal/storage/system/store.go | 1 + test/e2e/api_transactions_list_test.go | 15 +++++++++++++ 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/internal/storage/common/paginator_column.go b/internal/storage/common/paginator_column.go index 3d61691226..8dcf135bef 100644 --- a/internal/storage/common/paginator_column.go +++ b/internal/storage/common/paginator_column.go @@ -5,6 +5,7 @@ import ( "github.com/formancehq/go-libs/v3/bun/bunpaginate" "github.com/formancehq/go-libs/v3/time" "github.com/uptrace/bun" + "github.com/uptrace/bun/schema" "math/big" "reflect" "strings" @@ -14,6 +15,7 @@ import ( type ColumnPaginator[ResourceType, OptionsType any] struct { DefaultPaginationColumn string DefaultOrder bunpaginate.Order + Table *schema.Table } //nolint:unused @@ -23,6 +25,7 @@ func (o ColumnPaginator[ResourceType, OptionsType]) Paginate(sb *bun.SelectQuery if query.Column != "" { paginationColumn = query.Column } + originalOrder := o.DefaultOrder if query.Order != nil { originalOrder = *query.Order @@ -42,19 +45,23 @@ func (o ColumnPaginator[ResourceType, OptionsType]) Paginate(sb *bun.SelectQuery sb = sb.ColumnExpr("row_number() OVER (ORDER BY " + orderExpression + ")") if query.PaginationID != nil { + paginationID := convertPaginationIDToSQLType( + o.Table.FieldMap[paginationColumn].DiscoveredSQLType, + query.PaginationID, + ) if query.Reverse { switch originalOrder { case bunpaginate.OrderAsc: - sb = sb.Where(fmt.Sprintf("%s < ?", paginationColumn), query.PaginationID) + sb = sb.Where(fmt.Sprintf("%s < ?", paginationColumn), paginationID) case bunpaginate.OrderDesc: - sb = sb.Where(fmt.Sprintf("%s > ?", paginationColumn), query.PaginationID) + sb = sb.Where(fmt.Sprintf("%s > ?", paginationColumn), paginationID) } } else { switch originalOrder { case bunpaginate.OrderAsc: - sb = sb.Where(fmt.Sprintf("%s >= ?", paginationColumn), query.PaginationID) + sb = sb.Where(fmt.Sprintf("%s >= ?", paginationColumn), paginationID) case bunpaginate.OrderDesc: - sb = sb.Where(fmt.Sprintf("%s <= ?", paginationColumn), query.PaginationID) + sb = sb.Where(fmt.Sprintf("%s <= ?", paginationColumn), paginationID) } } } @@ -62,6 +69,15 @@ func (o ColumnPaginator[ResourceType, OptionsType]) Paginate(sb *bun.SelectQuery return sb, nil } +func convertPaginationIDToSQLType(sqlType string, id *big.Int) any { + switch sqlType { + case "timestamp without time zone", "timestamp": + return libtime.UnixMicro(id.Int64()) + default: + return id + } +} + //nolint:unused func (o ColumnPaginator[ResourceType, OptionsType]) BuildCursor(ret []ResourceType, query ColumnPaginatedQuery[OptionsType]) (*bunpaginate.Cursor[ResourceType], error) { diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index b5be23bcb3..e8882a68a2 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -67,6 +67,7 @@ func (store *Store) Transactions() common.PaginatedResource[ return common.NewPaginatedResourceRepository(&transactionsResourceHandler{store: store}, common.ColumnPaginator[ledger.Transaction, any]{ DefaultPaginationColumn: "id", DefaultOrder: bunpaginate.OrderDesc, + Table: store.db.Dialect().Tables().ByName("transactions"), }) } @@ -79,6 +80,7 @@ func (store *Store) Logs() common.PaginatedResource[ }, common.ColumnPaginator[Log, any]{ DefaultPaginationColumn: "id", DefaultOrder: bunpaginate.OrderDesc, + Table: store.db.Dialect().Tables().ByName("logs"), }) } diff --git a/internal/storage/system/store.go b/internal/storage/system/store.go index b293fb7f08..298d9595aa 100644 --- a/internal/storage/system/store.go +++ b/internal/storage/system/store.go @@ -102,6 +102,7 @@ func (d *DefaultStore) Ledgers() common.PaginatedResource[ return common.NewPaginatedResourceRepository(&ledgersResourceHandler{store: d}, common.ColumnPaginator[ledger.Ledger, any]{ DefaultPaginationColumn: "id", DefaultOrder: bunpaginate.OrderAsc, + Table: d.db.Dialect().Tables().ByName("_system.ledgers"), }) } diff --git a/test/e2e/api_transactions_list_test.go b/test/e2e/api_transactions_list_test.go index 2147b3b416..b3f3de67b0 100644 --- a/test/e2e/api_transactions_list_test.go +++ b/test/e2e/api_transactions_list_test.go @@ -220,6 +220,21 @@ var _ = Context("Ledger transactions list API tests", func() { slices.Reverse(page) Expect(rsp.V2TransactionsCursorResponse.Cursor.Data).To(Equal(page)) }) + When("using next page", func() { + JustBeforeEach(func(specContext SpecContext) { + rsp, err = Wait(specContext, DeferClient(testServer)).Ledger.V2.ListTransactions( + ctx, + operations.V2ListTransactionsRequest{ + Ledger: "default", + Cursor: rsp.V2TransactionsCursorResponse.Cursor.Next, + }, + ) + Expect(err).ToNot(HaveOccurred()) + }) + It("Should return next elements", func() { + Expect(rsp.V2TransactionsCursorResponse.Cursor.Data).To(HaveLen(int(pageSize))) + }) + }) }) It("Should be ok", func() { Expect(rsp.V2TransactionsCursorResponse.Cursor.PageSize).To(Equal(pageSize)) From a2a7f3ef0d7f5710141c5e78090e47b3fcfcfc43 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 16 Jun 2025 12:33:29 +0200 Subject: [PATCH 2/2] fix: missing type registration in cas of server restart --- internal/storage/driver/module.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/storage/driver/module.go b/internal/storage/driver/module.go index 938bacc903..280fab1940 100644 --- a/internal/storage/driver/module.go +++ b/internal/storage/driver/module.go @@ -2,6 +2,7 @@ package driver import ( "context" + ledger "github.com/formancehq/ledger/internal" "github.com/formancehq/ledger/internal/storage/bucket" ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" systemstore "github.com/formancehq/ledger/internal/storage/system" @@ -21,6 +22,15 @@ func NewFXModule() fx.Option { fx.Provide(fx.Annotate(func(tracerProvider trace.TracerProvider) bucket.Factory { return bucket.NewDefaultFactory(bucket.WithTracer(tracerProvider.Tracer("store"))) })), + fx.Invoke(func(db *bun.DB) { + db.Dialect().Tables().Register( + &ledger.Transaction{}, + &ledger.Log{}, + &ledger.Account{}, + &ledger.Move{}, + &ledger.Ledger{}, + ) + }), fx.Provide(func(params struct { fx.In