From 974c78d683ccb4cc5e9b9f5a6eecb8f91cee6b22 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Tue, 1 Apr 2025 11:20:00 +0200 Subject: [PATCH] feat: add traces all along the migration path --- cmd/buckets_upgrade.go | 62 +++++---- cmd/root.go | 34 +++-- cmd/serve.go | 1 + go.mod | 2 +- go.sum | 4 +- internal/api/bulking/bulker.go | 3 +- internal/api/common/errors.go | 7 +- internal/api/router.go | 3 +- internal/storage/bucket/default_bucket.go | 2 +- internal/storage/bucket/migrations.go | 3 + internal/storage/driver/driver.go | 151 +++++++++++----------- internal/storage/driver/driver_test.go | 31 ++++- internal/storage/driver/module.go | 34 ++--- internal/storage/ledger/main_test.go | 1 + internal/storage/module.go | 31 +++-- internal/storage/system/factory.go | 23 ++++ internal/storage/system/store.go | 36 +++++- internal/tracing/tracing.go | 15 ++- internal/worker/async_block.go | 35 ++++- internal/worker/fx.go | 9 +- test/migrations/upgrade_test.go | 2 + tools/generator/go.mod | 5 +- tools/generator/go.sum | 4 +- 23 files changed, 317 insertions(+), 181 deletions(-) create mode 100644 internal/storage/system/factory.go diff --git a/cmd/buckets_upgrade.go b/cmd/buckets_upgrade.go index e7a763429c..fc3b6a9baf 100644 --- a/cmd/buckets_upgrade.go +++ b/cmd/buckets_upgrade.go @@ -3,12 +3,13 @@ package cmd import ( "github.com/formancehq/go-libs/v2/bun/bunconnect" "github.com/formancehq/go-libs/v2/logging" + "github.com/formancehq/go-libs/v2/otlp" + "github.com/formancehq/go-libs/v2/otlp/otlptraces" "github.com/formancehq/go-libs/v2/service" - "github.com/formancehq/ledger/internal/storage/bucket" + "github.com/formancehq/ledger/internal/storage" "github.com/formancehq/ledger/internal/storage/driver" - "github.com/formancehq/ledger/internal/storage/ledger" "github.com/spf13/cobra" - "github.com/uptrace/bun" + "go.uber.org/fx" ) func NewBucketUpgrade() *cobra.Command { @@ -17,22 +18,13 @@ func NewBucketUpgrade() *cobra.Command { Args: cobra.ExactArgs(1), SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - logger := logging.NewDefaultLogger(cmd.OutOrStdout(), service.IsDebug(cmd), false, false) - cmd.SetContext(logging.ContextWithLogger(cmd.Context(), logger)) + return withStorageDriver(cmd, func(driver *driver.Driver) error { + if args[0] == "*" { + return driver.UpgradeAllBuckets(cmd.Context()) + } - driver, db, err := getDriver(cmd) - if err != nil { - return err - } - defer func() { - _ = db.Close() - }() - - if args[0] == "*" { - return driver.UpgradeAllBuckets(cmd.Context()) - } - - return driver.UpgradeBucket(cmd.Context(), args[0]) + return driver.UpgradeBucket(cmd.Context(), args[0]) + }) }, } @@ -42,26 +34,32 @@ func NewBucketUpgrade() *cobra.Command { return cmd } -func getDriver(cmd *cobra.Command) (*driver.Driver, *bun.DB, error) { +func withStorageDriver(cmd *cobra.Command, fn func(driver *driver.Driver) error) error { - connectionOptions, err := bunconnect.ConnectionOptionsFromFlags(cmd) - if err != nil { - return nil, nil, err - } + logger := logging.NewDefaultLogger(cmd.OutOrStdout(), service.IsDebug(cmd), false, false) - db, err := bunconnect.OpenSQLDB(cmd.Context(), *connectionOptions) + connectionOptions, err := bunconnect.ConnectionOptionsFromFlags(cmd) if err != nil { - return nil, nil, err + return err } - driver := driver.New( - db, - ledger.NewFactory(db), - bucket.NewDefaultFactory(), + var d *driver.Driver + app := fx.New( + fx.NopLogger, + otlp.FXModuleFromFlags(cmd, otlp.WithServiceVersion(Version)), + otlptraces.FXModuleFromFlags(cmd), + bunconnect.Module(*connectionOptions, service.IsDebug(cmd)), + storage.NewFXModule(storage.ModuleConfig{}), + fx.Supply(fx.Annotate(logger, fx.As(new(logging.Logger)))), + fx.Populate(&d), ) - if err := driver.Initialize(cmd.Context()); err != nil { - return nil, nil, err + err = app.Start(cmd.Context()) + if err != nil { + return err } + defer func() { + _ = app.Stop(cmd.Context()) + }() - return driver, db, nil + return fn(d) } diff --git a/cmd/root.go b/cmd/root.go index 5db4722315..26fac24fb1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,11 +2,10 @@ package cmd import ( "github.com/formancehq/go-libs/v2/bun/bunmigrate" - "github.com/formancehq/go-libs/v2/logging" + "github.com/formancehq/go-libs/v2/otlp" + "github.com/formancehq/go-libs/v2/otlp/otlptraces" "github.com/formancehq/go-libs/v2/service" - "github.com/formancehq/ledger/internal/storage/bucket" "github.com/formancehq/ledger/internal/storage/driver" - "github.com/formancehq/ledger/internal/storage/ledger" "github.com/spf13/cobra" "github.com/uptrace/bun" ) @@ -35,23 +34,30 @@ func NewRootCommand() *cobra.Command { root.AddCommand(NewWorkerCommand()) root.AddCommand(NewDocsCommand()) - root.AddCommand(bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, db *bun.DB) error { - logger := logging.NewDefaultLogger(cmd.OutOrStdout(), service.IsDebug(cmd), false, false) - cmd.SetContext(logging.ContextWithLogger(cmd.Context(), logger)) - - driver := driver.New(db, ledger.NewFactory(db), bucket.NewDefaultFactory()) - if err := driver.Initialize(cmd.Context()); err != nil { - return err - } - - return driver.UpgradeAllBuckets(cmd.Context()) - })) + root.AddCommand(newMigrationCommand()) root.AddCommand(NewDocsCommand()) + service.AddFlags(root.PersistentFlags()) return root } +func newMigrationCommand() *cobra.Command { + ret := bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, db *bun.DB) error { + return withStorageDriver(cmd, func(driver *driver.Driver) error { + if err := driver.Initialize(cmd.Context()); err != nil { + return err + } + + return driver.UpgradeAllBuckets(cmd.Context()) + }) + }) + otlp.AddFlags(ret.Flags()) + otlptraces.AddFlags(ret.Flags()) + + return ret +} + func Execute() { service.Execute(NewRootCommand()) } diff --git a/cmd/serve.go b/cmd/serve.go index 8c013fd650..7aa7e92cc7 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -174,6 +174,7 @@ func NewServeCommand() *cobra.Command { addWorkerFlags(cmd) bunconnect.AddFlags(cmd.Flags()) + otlp.AddFlags(cmd.Flags()) otlpmetrics.AddFlags(cmd.Flags()) otlptraces.AddFlags(cmd.Flags()) auth.AddFlags(cmd.Flags()) diff --git a/go.mod b/go.mod index 5d43d620e6..0f8922c571 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 github.com/bluele/gcache v0.0.2 github.com/dop251/goja v0.0.0-20241009100908-5f46f2705ca3 - github.com/formancehq/go-libs/v2 v2.2.3-0.20250328155139-11198164d016 + github.com/formancehq/go-libs/v2 v2.2.3-0.20250401141012-7ef088564530 github.com/formancehq/ledger/pkg/client v0.0.0-00010101000000-000000000000 github.com/go-chi/chi/v5 v5.2.1 github.com/go-chi/cors v1.2.1 diff --git a/go.sum b/go.sum index 9b7b2c8452..025640f1f6 100644 --- a/go.sum +++ b/go.sum @@ -104,8 +104,8 @@ github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/formancehq/go-libs/v2 v2.2.3-0.20250328155139-11198164d016 h1:t2REX3SXrD4asj979f8pgRw/SfUPhYGxYZ7hkkP586w= -github.com/formancehq/go-libs/v2 v2.2.3-0.20250328155139-11198164d016/go.mod h1:JvBjEDWNf7izCy2dq/eI3aMc9d28gChBe1rjw5yYlAs= +github.com/formancehq/go-libs/v2 v2.2.3-0.20250401141012-7ef088564530 h1:c7loJTPm5e/AwayJIWgZZQAqMIBR/BrClfF5aldDlxE= +github.com/formancehq/go-libs/v2 v2.2.3-0.20250401141012-7ef088564530/go.mod h1:JvBjEDWNf7izCy2dq/eI3aMc9d28gChBe1rjw5yYlAs= github.com/formancehq/numscript v0.0.11 h1:vZDfRfrhOkuInv5fLIXvWZU3ylK+fVgmR4la01dO5to= github.com/formancehq/numscript v0.0.11/go.mod h1:btuSv05cYwi9BvLRxVs5zrunU+O1vTgigG1T6UsawcY= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= diff --git a/internal/api/bulking/bulker.go b/internal/api/bulking/bulker.go index 3c5158e47e..4576a1f3d4 100644 --- a/internal/api/bulking/bulker.go +++ b/internal/api/bulking/bulker.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/alitto/pond" "github.com/formancehq/go-libs/v2/logging" + "github.com/formancehq/go-libs/v2/otlp" ledger "github.com/formancehq/ledger/internal" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" "go.opentelemetry.io/otel/attribute" @@ -56,7 +57,7 @@ func (b *Bulker) run(ctx context.Context, ctrl ledgercontroller.Controller, bulk ret, logID, err := b.processElement(ctx, ctrl, element) if err != nil { hasError.Store(true) - span.RecordError(err) + otlp.RecordError(ctx, err) result <- BulkElementResult{ Error: err, diff --git a/internal/api/common/errors.go b/internal/api/common/errors.go index 187fff7db0..c0830a9a4f 100644 --- a/internal/api/common/errors.go +++ b/internal/api/common/errors.go @@ -4,8 +4,8 @@ import ( "errors" "github.com/formancehq/go-libs/v2/api" "github.com/formancehq/go-libs/v2/logging" + "github.com/formancehq/go-libs/v2/otlp" "github.com/formancehq/go-libs/v2/platform/postgres" - "go.opentelemetry.io/otel/trace" "net/http" ) @@ -38,10 +38,7 @@ func HandleCommonErrors(w http.ResponseWriter, r *http.Request, err error) { } func InternalServerError(w http.ResponseWriter, r *http.Request, err error) { - span := trace.SpanFromContext(r.Context()) - if span != nil { - span.RecordError(err) - } + otlp.RecordError(r.Context(), err) logging.FromContext(r.Context()).Error(err) api.WriteErrorResponse(w, http.StatusInternalServerError, api.ErrorInternal, errors.New("Internal error. Consult logs/traces to have more details.")) } diff --git a/internal/api/router.go b/internal/api/router.go index d17e4a9ae8..e89fc10e6b 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/formancehq/go-libs/v2/api" "github.com/formancehq/go-libs/v2/bun/bunpaginate" + "github.com/formancehq/go-libs/v2/otlp" "github.com/formancehq/go-libs/v2/service" "github.com/formancehq/ledger/internal/api/bulking" "github.com/formancehq/ledger/internal/controller/system" @@ -60,7 +61,7 @@ func NewRouter( middleware.PrintPrettyStack(rvr) } - trace.SpanFromContext(r.Context()).RecordError(fmt.Errorf("%s", rvr)) + otlp.RecordError(r.Context(), fmt.Errorf("%s", rvr)) w.WriteHeader(http.StatusInternalServerError) } diff --git a/internal/storage/bucket/default_bucket.go b/internal/storage/bucket/default_bucket.go index fdcc13ba50..7b9fd3e528 100644 --- a/internal/storage/bucket/default_bucket.go +++ b/internal/storage/bucket/default_bucket.go @@ -39,7 +39,7 @@ func (b *DefaultBucket) IsUpToDate(ctx context.Context) (bool, error) { } func (b *DefaultBucket) Migrate(ctx context.Context, options ...migrations.Option) error { - return runMigrate(ctx, b.tracer, b.db, b.name, options...) + return runMigrate(ctx, b.tracer, b.db, b.name, append(options, migrations.WithTracer(b.tracer))...) } func (b *DefaultBucket) HasMinimalVersion(ctx context.Context) (bool, error) { diff --git a/internal/storage/bucket/migrations.go b/internal/storage/bucket/migrations.go index 11604f8d06..7a2be00dbc 100644 --- a/internal/storage/bucket/migrations.go +++ b/internal/storage/bucket/migrations.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "github.com/formancehq/go-libs/v2/migrations" + "github.com/formancehq/go-libs/v2/otlp" "github.com/uptrace/bun" "go.opentelemetry.io/otel/trace" "gopkg.in/yaml.v3" @@ -55,6 +56,8 @@ func runMigrate(ctx context.Context, tracer trace.Tracer, db bun.IDB, name strin if errors.Is(err, migrations.ErrAlreadyUpToDate) { return nil } + otlp.RecordError(ctx, err) + return err } } diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index ef074a9885..7c6986d441 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -5,33 +5,31 @@ import ( "errors" "fmt" "github.com/formancehq/ledger/internal/storage/common" + systemstore "github.com/formancehq/ledger/internal/storage/system" + "github.com/formancehq/ledger/internal/tracing" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "sync" "time" "github.com/alitto/pond" - "github.com/formancehq/go-libs/v2/metadata" - "github.com/formancehq/go-libs/v2/platform/postgres" - systemcontroller "github.com/formancehq/ledger/internal/controller/system" - systemstore "github.com/formancehq/ledger/internal/storage/system" - "github.com/uptrace/bun" - "go.opentelemetry.io/otel/metric" - noopmetrics "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/trace" - nooptracer "go.opentelemetry.io/otel/trace/noop" - "github.com/formancehq/go-libs/v2/bun/bunpaginate" "github.com/formancehq/go-libs/v2/logging" + "github.com/formancehq/go-libs/v2/metadata" + "github.com/formancehq/go-libs/v2/platform/postgres" ledger "github.com/formancehq/ledger/internal" + systemcontroller "github.com/formancehq/ledger/internal/controller/system" "github.com/formancehq/ledger/internal/storage/bucket" ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" + "github.com/uptrace/bun" ) type Driver struct { ledgerStoreFactory ledgerstore.Factory db *bun.DB bucketFactory bucket.Factory + systemStoreFactory systemstore.StoreFactory tracer trace.Tracer - meter metric.Meter migrationRetryPeriod time.Duration parallelBucketMigrations int @@ -41,7 +39,7 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto var ret *ledgerstore.Store err := d.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - systemStore := systemstore.New(tx) + systemStore := d.systemStoreFactory.Create(d.db) if err := systemStore.CreateLedger(ctx, l); err != nil { if errors.Is(postgres.ResolveError(err), postgres.ErrConstraintsFailed{}) { @@ -87,7 +85,7 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) { // todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory - ret, err := systemstore.New(d.db).GetLedger(ctx, name) + ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name) if err != nil { return nil, nil, err } @@ -104,7 +102,7 @@ func (d *Driver) Initialize(ctx context.Context) error { return fmt.Errorf("detecting rollbacks: %w", err) } - err = systemstore.New(d.db).Migrate(ctx) + err = d.systemStoreFactory.Create(d.db).Migrate(ctx) if err != nil { return fmt.Errorf("migrating system store: %w", err) } @@ -114,7 +112,7 @@ func (d *Driver) Initialize(ctx context.Context) error { func (d *Driver) detectRollbacks(ctx context.Context) error { - systemStore := systemstore.New(d.db) + systemStore := d.systemStoreFactory.Create(d.db) logging.FromContext(ctx).Debugf("Checking for downgrades on system schema") if err := detectDowngrades(systemStore.GetMigrator(), ctx); err != nil { @@ -175,19 +173,19 @@ func (d *Driver) detectRollbacks(ctx context.Context) error { } func (d *Driver) UpdateLedgerMetadata(ctx context.Context, name string, m metadata.Metadata) error { - return systemstore.New(d.db).UpdateLedgerMetadata(ctx, name, m) + return d.systemStoreFactory.Create(d.db).UpdateLedgerMetadata(ctx, name, m) } func (d *Driver) DeleteLedgerMetadata(ctx context.Context, name string, key string) error { - return systemstore.New(d.db).DeleteLedgerMetadata(ctx, name, key) + return d.systemStoreFactory.Create(d.db).DeleteLedgerMetadata(ctx, name, key) } func (d *Driver) ListLedgers(ctx context.Context, q common.ColumnPaginatedQuery[any]) (*bunpaginate.Cursor[ledger.Ledger], error) { - return systemstore.New(d.db).Ledgers().Paginate(ctx, q) + return d.systemStoreFactory.Create(d.db).Ledgers().Paginate(ctx, q) } func (d *Driver) GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) { - return systemstore.New(d.db).GetLedger(ctx, name) + return d.systemStoreFactory.Create(d.db).GetLedger(ctx, name) } func (d *Driver) UpgradeBucket(ctx context.Context, name string) error { @@ -195,60 +193,68 @@ func (d *Driver) UpgradeBucket(ctx context.Context, name string) error { } func (d *Driver) UpgradeAllBuckets(ctx context.Context) error { + _, err := tracing.Trace(ctx, d.tracer, "UpgradeAllBuckets", tracing.NoResult(func(ctx context.Context) error { + buckets, err := d.systemStoreFactory.Create(d.db).GetDistinctBuckets(ctx) + if err != nil { + return fmt.Errorf("getting distinct buckets: %w", err) + } - buckets, err := systemstore.New(d.db).GetDistinctBuckets(ctx) - if err != nil { - return fmt.Errorf("getting distinct buckets: %w", err) - } - - wp := pond.New(d.parallelBucketMigrations, len(buckets), pond.Context(ctx)) - - for _, bucketName := range buckets { - wp.Submit(func() { - logger := logging.FromContext(ctx).WithFields(map[string]any{ - "bucket": bucketName, - }) - b := d.bucketFactory.Create(bucketName, d.db) - - l: - for { - errChan := make(chan error, 1) - go func() { - logger.Infof("Upgrading...") - errChan <- b.Migrate(logging.ContextWithLogger(ctx, logger)) - }() + wp := pond.New(d.parallelBucketMigrations, len(buckets), pond.Context(ctx)) + for _, bucketName := range buckets { + wp.Submit(func() { + logger := logging.FromContext(ctx).WithFields(map[string]any{ + "bucket": bucketName, + }) + l: for { - logger.Infof("Waiting termination") - select { - case <-ctx.Done(): - return - case err := <-errChan: - if err != nil { - logger.Errorf("Error upgrading: %s", err) - select { - case <-time.After(d.migrationRetryPeriod): - continue l - case <-ctx.Done(): - return - } + if err := d.upgradeBucket(ctx, logger, bucketName); err != nil { + logger.Errorf("Error upgrading: %s", err) + select { + case <-time.After(d.migrationRetryPeriod): + continue l + case <-ctx.Done(): + return } - - logger.Info("Upgrade terminated") - return } + logger.Info("Upgrade terminated") + break } - } - }) - } + }) + } - wp.StopAndWait() + wp.StopAndWait() + + return nil + })) + + return err +} + +func (d *Driver) upgradeBucket(ctx context.Context, logger logging.Logger, bucketName string) error { + ctx, span := d.tracer.Start(ctx, "UpgradeBucket", + trace.WithNewRoot(), + trace.WithLinks( + trace.Link{ + SpanContext: trace.SpanFromContext(ctx).SpanContext(), + }, + ), + ) + defer span.End() + + logger.Infof("Upgrading...") + b := d.bucketFactory.Create(bucketName, d.db) + + err := b.Migrate(logging.ContextWithLogger(ctx, logger)) + if err != nil { + return err + } return nil } func (d *Driver) HasReachMinimalVersion(ctx context.Context) (bool, error) { - systemStore := systemstore.New(d.db) + systemStore := d.systemStoreFactory.Create(d.db) isUpToDate, err := systemStore.IsUpToDate(ctx) if err != nil { @@ -280,12 +286,14 @@ func New( db *bun.DB, ledgerStoreFactory ledgerstore.Factory, bucketFactory bucket.Factory, + systemStoreFactory systemstore.StoreFactory, opts ...Option, ) *Driver { ret := &Driver{ db: db, ledgerStoreFactory: ledgerStoreFactory, bucketFactory: bucketFactory, + systemStoreFactory: systemStoreFactory, } for _, opt := range append(defaultOptions, opts...) { opt(ret) @@ -295,18 +303,6 @@ func New( type Option func(d *Driver) -func WithMeter(m metric.Meter) Option { - return func(d *Driver) { - d.meter = m - } -} - -func WithTracer(tracer trace.Tracer) Option { - return func(d *Driver) { - d.tracer = tracer - } -} - func WithParallelBucketMigration(p int) Option { return func(d *Driver) { d.parallelBucketMigrations = p @@ -319,9 +315,14 @@ func WithMigrationRetryPeriod(p time.Duration) Option { } } +func WithTracer(tracer trace.Tracer) Option { + return func(d *Driver) { + d.tracer = tracer + } +} + var defaultOptions = []Option{ - WithMeter(noopmetrics.Meter{}), - WithTracer(nooptracer.Tracer{}), WithParallelBucketMigration(10), WithMigrationRetryPeriod(5 * time.Second), + WithTracer(noop.Tracer{}), } diff --git a/internal/storage/driver/driver_test.go b/internal/storage/driver/driver_test.go index 6c58be1804..8371276ccf 100644 --- a/internal/storage/driver/driver_test.go +++ b/internal/storage/driver/driver_test.go @@ -12,6 +12,7 @@ import ( "github.com/formancehq/ledger/internal/storage/bucket" "github.com/formancehq/ledger/internal/storage/driver" ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" + "github.com/formancehq/ledger/internal/storage/system" "github.com/google/uuid" "github.com/stretchr/testify/require" "math/rand" @@ -23,10 +24,15 @@ func TestLedgersCreate(t *testing.T) { t.Parallel() ctx := logging.TestingContext() - d := driver.New(db, ledgerstore.NewFactory(db), bucket.NewDefaultFactory()) + d := driver.New( + db, + ledgerstore.NewFactory(db), + bucket.NewDefaultFactory(), + system.NewStoreFactory(), + ) buckets := []string{"bucket1", "bucket2"} - const countLedgers = 80 + const countLedgers = 30 wg := sync.WaitGroup{} wg.Add(countLedgers) @@ -71,7 +77,12 @@ func TestLedgersList(t *testing.T) { ctx := logging.TestingContext() - d := driver.New(db, ledgerstore.NewFactory(db), bucket.NewDefaultFactory()) + d := driver.New( + db, + ledgerstore.NewFactory(db), + bucket.NewDefaultFactory(), + system.NewStoreFactory(), + ) bucket := uuid.NewString()[:8] @@ -105,7 +116,12 @@ func TestLedgerUpdateMetadata(t *testing.T) { ctx := logging.TestingContext() - d := driver.New(db, ledgerstore.NewFactory(db), bucket.NewDefaultFactory()) + d := driver.New( + db, + ledgerstore.NewFactory(db), + bucket.NewDefaultFactory(), + system.NewStoreFactory(), + ) l := ledger.MustNewWithDefault(uuid.NewString()) _, err := d.CreateLedger(ctx, &l) @@ -122,7 +138,12 @@ func TestLedgerDeleteMetadata(t *testing.T) { t.Parallel() ctx := logging.TestingContext() - d := driver.New(db, ledgerstore.NewFactory(db), bucket.NewDefaultFactory()) + d := driver.New( + db, + ledgerstore.NewFactory(db), + bucket.NewDefaultFactory(), + system.NewStoreFactory(), + ) l := ledger.MustNewWithDefault(uuid.NewString()).WithMetadata(metadata.Metadata{ "foo": "bar", diff --git a/internal/storage/driver/module.go b/internal/storage/driver/module.go index 4f89d44056..bc27558fe3 100644 --- a/internal/storage/driver/module.go +++ b/internal/storage/driver/module.go @@ -28,32 +28,36 @@ func NewFXModule() fx.Option { fx.Provide(fx.Annotate(func(tracerProvider trace.TracerProvider) bucket.Factory { return bucket.NewDefaultFactory(bucket.WithTracer(tracerProvider.Tracer("store"))) })), - fx.Provide(func(db *bun.DB) systemstore.Store { - return systemstore.New(db) - }), - fx.Provide(func( - db *bun.DB, - tracerProvider trace.TracerProvider, - meterProvider metric.MeterProvider, - ) ledgerstore.Factory { - return ledgerstore.NewFactory(db, - ledgerstore.WithMeter(meterProvider.Meter("store")), - ledgerstore.WithTracer(tracerProvider.Tracer("store")), - ) + fx.Provide(func(params struct { + fx.In + + DB *bun.DB + TracerProvider trace.TracerProvider `optional:"true"` + MeterProvider metric.MeterProvider `optional:"true"` + }) ledgerstore.Factory { + options := make([]ledgerstore.Option, 0) + if params.TracerProvider != nil { + options = append(options, ledgerstore.WithTracer(params.TracerProvider.Tracer("store"))) + } + if params.MeterProvider != nil { + options = append(options, ledgerstore.WithMeter(params.MeterProvider.Meter("store"))) + } + return ledgerstore.NewFactory(params.DB, options...) }), fx.Provide(func( db *bun.DB, bucketFactory bucket.Factory, ledgerStoreFactory ledgerstore.Factory, tracerProvider trace.TracerProvider, - meterProvider metric.MeterProvider, ) (*Driver, error) { return New( db, ledgerStoreFactory, bucketFactory, - WithMeter(meterProvider.Meter("store")), - WithTracer(tracerProvider.Tracer("store")), + systemstore.NewStoreFactory(systemstore.WithTracer( + tracerProvider.Tracer("SystemStore"), + )), + WithTracer(tracerProvider.Tracer("StorageDriver")), ), nil }), fx.Provide(fx.Annotate(NewControllerStorageDriverAdapter, fx.As(new(systemcontroller.Store)))), diff --git a/internal/storage/ledger/main_test.go b/internal/storage/ledger/main_test.go index be932624d2..f4045258da 100644 --- a/internal/storage/ledger/main_test.go +++ b/internal/storage/ledger/main_test.go @@ -60,6 +60,7 @@ func TestMain(m *testing.M) { bunDB, ledgerstore.NewFactory(bunDB), bucket.NewDefaultFactory(), + systemstore.NewStoreFactory(), )) return bunDB diff --git a/internal/storage/module.go b/internal/storage/module.go index ff358abebc..2490d2eacb 100644 --- a/internal/storage/module.go +++ b/internal/storage/module.go @@ -6,33 +6,38 @@ import ( "github.com/formancehq/go-libs/v2/health" "github.com/formancehq/go-libs/v2/logging" "github.com/formancehq/ledger/internal/storage/driver" + "github.com/formancehq/ledger/internal/tracing" + "go.opentelemetry.io/otel/trace" "go.uber.org/fx" ) const HealthCheckName = `storage-driver-up-to-date` type ModuleConfig struct { - AutoUpgrade bool + AutoUpgrade bool } func NewFXModule(config ModuleConfig) fx.Option { ret := []fx.Option{ driver.NewFXModule(), - health.ProvideHealthCheck(func(driver *driver.Driver) health.NamedCheck { + health.ProvideHealthCheck(func(driver *driver.Driver, tracer trace.TracerProvider) health.NamedCheck { hasReachedMinimalVersion := false return health.NewNamedCheck(HealthCheckName, health.CheckFn(func(ctx context.Context) error { - if hasReachedMinimalVersion { + _, err := tracing.Trace(ctx, tracer.Tracer("HealthCheck"), "HealthCheckStorage", tracing.NoResult(func(ctx context.Context) error { + if hasReachedMinimalVersion { + return nil + } + var err error + hasReachedMinimalVersion, err = driver.HasReachMinimalVersion(ctx) + if err != nil { + return err + } + if !hasReachedMinimalVersion { + return errors.New("storage driver is not up to date") + } return nil - } - var err error - hasReachedMinimalVersion, err = driver.HasReachMinimalVersion(ctx) - if err != nil { - return err - } - if !hasReachedMinimalVersion { - return errors.New("storage driver is not up to date") - } - return nil + })) + return err })) }), } diff --git a/internal/storage/system/factory.go b/internal/storage/system/factory.go new file mode 100644 index 0000000000..3636e5de8c --- /dev/null +++ b/internal/storage/system/factory.go @@ -0,0 +1,23 @@ +package system + +import ( + "github.com/uptrace/bun" +) + +type StoreFactory interface { + Create(db bun.IDB) Store +} + +type DefaultStoreFactory struct { + options []Option +} + +func (s DefaultStoreFactory) Create(db bun.IDB) Store { + return New(db, s.options...) +} + +var _ StoreFactory = DefaultStoreFactory{} + +func NewStoreFactory(opts ...Option) DefaultStoreFactory { + return DefaultStoreFactory{options: opts} +} \ No newline at end of file diff --git a/internal/storage/system/store.go b/internal/storage/system/store.go index 00769cdd3b..442b6f9feb 100644 --- a/internal/storage/system/store.go +++ b/internal/storage/system/store.go @@ -11,7 +11,10 @@ import ( ledger "github.com/formancehq/ledger/internal" systemcontroller "github.com/formancehq/ledger/internal/controller/system" "github.com/formancehq/ledger/internal/storage/common" + "github.com/formancehq/ledger/internal/tracing" "github.com/uptrace/bun" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" ) type Store interface { @@ -32,7 +35,8 @@ const ( ) type DefaultStore struct { - db bun.IDB + db bun.IDB + tracer trace.Tracer } func (d *DefaultStore) IsUpToDate(ctx context.Context) (bool, error) { @@ -115,15 +119,37 @@ func (d *DefaultStore) GetLedger(ctx context.Context, name string) (*ledger.Ledg } func (d *DefaultStore) Migrate(ctx context.Context, options ...migrations.Option) error { - return d.GetMigrator(options...).Up(ctx) + _, err := tracing.Trace(ctx, d.tracer, "MigrateSystemStore", func(ctx context.Context) (any, error) { + return nil, d.GetMigrator(options...).Up(ctx) + }) + return err + } func (d *DefaultStore) GetMigrator(options ...migrations.Option) *migrations.Migrator { - return GetMigrator(d.db, options...) + return GetMigrator(d.db, append(options, migrations.WithTracer(d.tracer))...) } -func New(db bun.IDB) *DefaultStore { - return &DefaultStore{ +func New(db bun.IDB, opts ...Option) *DefaultStore { + ret := &DefaultStore{ db: db, } + + for _, opt := range append(defaultOptions, opts...) { + opt(ret) + } + + return ret +} + +type Option func(*DefaultStore) + +func WithTracer(tracer trace.Tracer) Option { + return func(d *DefaultStore) { + d.tracer = tracer + } +} + +var defaultOptions = []Option{ + WithTracer(noop.Tracer{}), } diff --git a/internal/tracing/tracing.go b/internal/tracing/tracing.go index 635c07800e..0f4f9f5197 100644 --- a/internal/tracing/tracing.go +++ b/internal/tracing/tracing.go @@ -2,6 +2,7 @@ package tracing import ( "context" + "github.com/formancehq/go-libs/v2/otlp" "github.com/formancehq/go-libs/v2/time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -23,7 +24,7 @@ func TraceWithMetric[RET any]( now := time.Now() ret, err := fn(ctx) if err != nil { - trace.SpanFromContext(ctx).RecordError(err) + otlp.RecordError(ctx, err) return zeroRet, err } @@ -40,10 +41,16 @@ func TraceWithMetric[RET any]( } func Trace[RET any](ctx context.Context, tracer trace.Tracer, name string, fn func(ctx context.Context) (RET, error)) (RET, error) { - ctx, trace := tracer.Start(ctx, name) - defer trace.End() + ctx, span := tracer.Start(ctx, name) + defer span.End() - return fn(ctx) + ret, err := fn(ctx) + if err != nil { + otlp.RecordError(ctx, err) + return ret, err + } + + return ret, nil } func NoResult(fn func(ctx context.Context) error) func(ctx context.Context) (any, error) { diff --git a/internal/worker/async_block.go b/internal/worker/async_block.go index f2cbc71706..3b655d5d48 100644 --- a/internal/worker/async_block.go +++ b/internal/worker/async_block.go @@ -13,6 +13,9 @@ import ( "github.com/formancehq/ledger/pkg/features" "github.com/robfig/cron/v3" "github.com/uptrace/bun" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "time" ) @@ -26,6 +29,7 @@ type AsyncBlockRunner struct { logger logging.Logger db *bun.DB cfg AsyncBlockRunnerConfig + tracer trace.Tracer } func (r *AsyncBlockRunner) Name() string { @@ -69,6 +73,10 @@ func (r *AsyncBlockRunner) Stop(ctx context.Context) error { } func (r *AsyncBlockRunner) run(ctx context.Context) error { + + ctx, span := r.tracer.Start(ctx, "Run") + defer span.End() + initialQuery := ledgercontroller.NewListLedgersQuery(10) initialQuery.Options.Builder = query.Match(fmt.Sprintf("features[%s]", features.FeatureHashLogs), "ASYNC") systemStore := systemstore.New(r.db) @@ -90,6 +98,11 @@ func (r *AsyncBlockRunner) run(ctx context.Context) error { } func (r *AsyncBlockRunner) processLedger(ctx context.Context, l ledger.Ledger) error { + ctx, span := r.tracer.Start(ctx, "RunForLedger") + defer span.End() + + span.SetAttributes(attribute.String("ledger", l.Name)) + var err error _, err = r.db.NewRaw(fmt.Sprintf(` call "%s".create_blocks(?, ?) @@ -98,11 +111,29 @@ func (r *AsyncBlockRunner) processLedger(ctx context.Context, l ledger.Ledger) e return err } -func NewAsyncBlockRunner(logger logging.Logger, db *bun.DB, cfg AsyncBlockRunnerConfig) *AsyncBlockRunner { - return &AsyncBlockRunner{ +func NewAsyncBlockRunner(logger logging.Logger, db *bun.DB, cfg AsyncBlockRunnerConfig, opts ...Option) *AsyncBlockRunner { + ret := &AsyncBlockRunner{ stopChannel: make(chan chan struct{}), logger: logger, db: db, cfg: cfg, } + + for _, opt := range append(defaultOptions, opts...) { + opt(ret) + } + + return ret +} + +type Option func(*AsyncBlockRunner) + +func WithTracer(tracer trace.Tracer) Option { + return func(r *AsyncBlockRunner) { + r.tracer = tracer + } } + +var defaultOptions = []Option{ + WithTracer(noop.Tracer{}), +} \ No newline at end of file diff --git a/internal/worker/fx.go b/internal/worker/fx.go index c4713f2034..06972da63c 100644 --- a/internal/worker/fx.go +++ b/internal/worker/fx.go @@ -5,6 +5,7 @@ import ( "github.com/formancehq/go-libs/v2/logging" "github.com/robfig/cron/v3" "github.com/uptrace/bun" + "go.opentelemetry.io/otel/trace" "go.uber.org/fx" ) @@ -15,7 +16,11 @@ type ModuleConfig struct { func NewFXModule(cfg ModuleConfig) fx.Option { return fx.Options( - fx.Provide(func(logger logging.Logger, db *bun.DB) (*AsyncBlockRunner, error) { + fx.Provide(func( + logger logging.Logger, + db *bun.DB, + traceProvider trace.TracerProvider, + ) (*AsyncBlockRunner, error) { parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) schedule, err := parser.Parse(cfg.Schedule) if err != nil { @@ -25,7 +30,7 @@ func NewFXModule(cfg ModuleConfig) fx.Option { return NewAsyncBlockRunner(logger, db, AsyncBlockRunnerConfig{ MaxBlockSize: cfg.MaxBlockSize, Schedule: schedule, - }), nil + }, WithTracer(traceProvider.Tracer("AsyncBlockRunner"))), nil }), fx.Invoke(fx.Annotate(func(lc fx.Lifecycle, asyncBlockRunner *AsyncBlockRunner) { lc.Append(fx.Hook{ diff --git a/test/migrations/upgrade_test.go b/test/migrations/upgrade_test.go index be4c55c9ec..5303464198 100644 --- a/test/migrations/upgrade_test.go +++ b/test/migrations/upgrade_test.go @@ -10,6 +10,7 @@ import ( "github.com/formancehq/ledger/internal/storage/bucket" "github.com/formancehq/ledger/internal/storage/driver" "github.com/formancehq/ledger/internal/storage/ledger" + "github.com/formancehq/ledger/internal/storage/system" "github.com/ory/dockertest/v3" dockerlib "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/require" @@ -69,6 +70,7 @@ func TestMigrations(t *testing.T) { db, ledger.NewFactory(db), bucket.NewDefaultFactory(), + system.NewStoreFactory(), driver.WithParallelBucketMigration(1), ) require.NoError(t, driver.Initialize(ctx)) diff --git a/tools/generator/go.mod b/tools/generator/go.mod index b6edf001ab..accac25bd5 100644 --- a/tools/generator/go.mod +++ b/tools/generator/go.mod @@ -9,7 +9,7 @@ replace github.com/formancehq/ledger => ../.. replace github.com/formancehq/ledger/pkg/client => ../../pkg/client require ( - github.com/formancehq/go-libs/v2 v2.2.3-0.20250328155139-11198164d016 + github.com/formancehq/go-libs/v2 v2.2.3-0.20250401141012-7ef088564530 github.com/formancehq/ledger v0.0.0-00010101000000-000000000000 github.com/formancehq/ledger/pkg/client v0.0.0-00010101000000-000000000000 github.com/spf13/cobra v1.9.1 @@ -31,6 +31,7 @@ require ( github.com/dop251/goja v0.0.0-20241009100908-5f46f2705ca3 // indirect github.com/ericlagergren/decimal v0.0.0-20240411145413-00de7ca16731 // indirect github.com/fatih/color v1.18.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/formancehq/numscript v0.0.11 // indirect github.com/go-chi/chi/v5 v5.2.1 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -66,9 +67,11 @@ require ( github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/wk8/go-ordered-map/v2 v2.1.9-0.20240816141633-0a40785b4f41 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect go.opentelemetry.io/otel v1.35.0 // indirect go.opentelemetry.io/otel/log v0.9.0 // indirect go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/otel/sdk v1.35.0 // indirect go.opentelemetry.io/otel/trace v1.35.0 // indirect go.uber.org/dig v1.18.1 // indirect go.uber.org/fx v1.23.0 // indirect diff --git a/tools/generator/go.sum b/tools/generator/go.sum index 58d1898af9..2046e39582 100644 --- a/tools/generator/go.sum +++ b/tools/generator/go.sum @@ -102,8 +102,8 @@ github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/formancehq/go-libs/v2 v2.2.3-0.20250328155139-11198164d016 h1:t2REX3SXrD4asj979f8pgRw/SfUPhYGxYZ7hkkP586w= -github.com/formancehq/go-libs/v2 v2.2.3-0.20250328155139-11198164d016/go.mod h1:JvBjEDWNf7izCy2dq/eI3aMc9d28gChBe1rjw5yYlAs= +github.com/formancehq/go-libs/v2 v2.2.3-0.20250401141012-7ef088564530 h1:c7loJTPm5e/AwayJIWgZZQAqMIBR/BrClfF5aldDlxE= +github.com/formancehq/go-libs/v2 v2.2.3-0.20250401141012-7ef088564530/go.mod h1:JvBjEDWNf7izCy2dq/eI3aMc9d28gChBe1rjw5yYlAs= github.com/formancehq/numscript v0.0.11 h1:vZDfRfrhOkuInv5fLIXvWZU3ylK+fVgmR4la01dO5to= github.com/formancehq/numscript v0.0.11/go.mod h1:btuSv05cYwi9BvLRxVs5zrunU+O1vTgigG1T6UsawcY= github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=