Skip to content

feat: Optimize migrations #603

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/buckets_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewBucketUpgrade() *cobra.Command {
}()

if args[0] == "*" {
return driver.UpgradeAllBuckets(cmd.Context(), make(chan struct{}))
return driver.UpgradeAllBuckets(cmd.Context())
}

return driver.UpgradeBucket(cmd.Context(), args[0])
Expand Down
24 changes: 16 additions & 8 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package cmd

import (
"github.com/formancehq/go-libs/v2/bun/bunmigrate"
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/formancehq/ledger/internal/storage/driver"
"github.com/formancehq/ledger/internal/storage/ledger"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"github.com/uptrace/bun"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -34,17 +39,20 @@ func NewRootCommand() *cobra.Command {
root.AddCommand(serve)
root.AddCommand(buckets)
root.AddCommand(version)
root.AddCommand(bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, _ *bun.DB) error {
// todo: use provided db ...
driver, db, err := getDriver(cmd)
if err != nil {
root.AddCommand(bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, db *bun.DB) error {
logger := logging.NewDefaultLogger(cmd.OutOrStdout(), service.IsDebug(cmd), false, false)
cmd.SetContext(logging.ContextWithLogger(cmd.Context(), logger))

driver := driver.New(
ledger.NewFactory(db),
systemstore.New(db),
bucket.NewDefaultFactory(db),
)
if err := driver.Initialize(cmd.Context()); err != nil {
return err
}
defer func() {
_ = db.Close()
}()

return driver.UpgradeAllBuckets(cmd.Context(), make(chan struct{}))
return driver.UpgradeAllBuckets(cmd.Context())
}))
root.AddCommand(NewDocsCommand())

Expand Down
16 changes: 6 additions & 10 deletions deployments/pulumi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/formancehq/ledger/deployments/pulumi/pkg"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
"github.com/pulumi/pulumi/sdk/v3/go/pulumix"
)

func main() {
Expand Down Expand Up @@ -35,23 +36,18 @@ func deploy(ctx *pulumi.Context) error {
}
}

debug, _ := conf.TryBool("debug")
imagePullPolicy, _ := conf.Try("image.pullPolicy")

replicaCount, _ := conf.TryInt("replicaCount")
experimentalFeatures, _ := conf.TryBool("experimentalFeatures")

_, err = pulumi_ledger.NewComponent(ctx, "ledger", &pulumi_ledger.ComponentArgs{
Namespace: pulumi.String(namespace),
Timeout: pulumi.Int(timeout),
Tag: pulumi.String(version),
ImagePullPolicy: pulumi.String(imagePullPolicy),
ImagePullPolicy: pulumi.String(conf.Get("image.pullPolicy")),
Postgres: pulumi_ledger.PostgresArgs{
URI: pulumi.String(postgresURI),
},
Debug: pulumi.Bool(debug),
ReplicaCount: pulumi.Int(replicaCount),
ExperimentalFeatures: pulumi.Bool(experimentalFeatures),
Debug: pulumi.Bool(conf.GetBool("debug")),
ReplicaCount: pulumi.Int(conf.GetInt("replicaCount")),
ExperimentalFeatures: pulumi.Bool(conf.GetBool("experimentalFeatures")),
Upgrade: pulumix.Val(pulumi_ledger.UpgradeMode(config.Get(ctx, "upgrade-mode"))),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure upgrade-mode configuration is correctly handled

Passing the upgrade-mode configuration directly may result in an empty UpgradeMode if not set. Provide a default value to prevent unexpected behavior.

Modify the code to handle a default upgrade-mode:

 Upgrade: pulumix.Val(pulumi_ledger.UpgradeMode(config.Get(ctx, "upgrade-mode"))),
+// Set default upgrade mode if not provided
+upgradeMode := config.Get(ctx, "upgrade-mode")
+if upgradeMode == "" {
+    upgradeMode = "in-app" // or another appropriate default
+}
+Upgrade: pulumix.Val(pulumi_ledger.UpgradeMode(upgradeMode)),

Committable suggestion skipped: line range outside the PR's diff.

})

return err
Expand Down
75 changes: 48 additions & 27 deletions deployments/pulumi/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,59 @@ import (

func TestProgram(t *testing.T) {

ctx := logging.TestingContext()
stackName := "ledger-tests-pulumi-" + uuid.NewString()[:8]
type testCase struct {
name string
config map[string]string
}
for _, tc := range []testCase{
{
name: "nominal",
config: map[string]string{
"timeout": "30",
},
},
{
name: "upgrade using a job",
config: map[string]string{
"timeout": "30",
"upgrade-mode": "job",
},
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := logging.TestingContext()
stackName := "ledger-tests-pulumi-" + uuid.NewString()[:8]

stack, err := auto.UpsertStackInlineSource(ctx, stackName, "ledger-tests-pulumi-postgres", deployPostgres(stackName))
require.NoError(t, err)
stack, err := auto.UpsertStackInlineSource(ctx, stackName, "ledger-tests-pulumi-postgres", deployPostgres(stackName))
require.NoError(t, err)

t.Log("Deploy pg stack")
up, err := stack.Up(ctx, optup.ProgressStreams(os.Stdout), optup.ErrorProgressStreams(os.Stderr))
require.NoError(t, err)
t.Log("Deploy pg stack")
up, err := stack.Up(ctx, optup.ProgressStreams(os.Stdout), optup.ErrorProgressStreams(os.Stderr))
require.NoError(t, err)

t.Cleanup(func() {
t.Log("Destroy stack")
_, err := stack.Destroy(ctx, optdestroy.Remove(), optdestroy.ProgressStreams(os.Stdout), optdestroy.ErrorProgressStreams(os.Stderr))
require.NoError(t, err)
})
t.Cleanup(func() {
t.Log("Destroy stack")
_, err := stack.Destroy(ctx, optdestroy.Remove(), optdestroy.ProgressStreams(os.Stdout), optdestroy.ErrorProgressStreams(os.Stderr))
require.NoError(t, err)
})

postgresURI := up.Outputs["uri"].Value.(string)
postgresURI := up.Outputs["uri"].Value.(string)

t.Log("Test program")
integration.ProgramTest(t, &integration.ProgramTestOptions{
Quick: true,
SkipRefresh: true,
Dir: ".",
Config: map[string]string{
"namespace": stackName,
"postgres.uri": postgresURI,
"timeout": "30",
},
Stdout: os.Stdout,
Stderr: os.Stderr,
Verbose: testing.Verbose(),
})
tc.config["postgres.uri"] = postgresURI
tc.config["namespace"] = stackName

t.Log("Test program")
integration.ProgramTest(t, &integration.ProgramTestOptions{
Quick: true,
SkipRefresh: true,
Dir: ".",
Config: tc.config,
Stdout: os.Stdout,
Stderr: os.Stderr,
Verbose: testing.Verbose(),
})
})
}
}

func deployPostgres(stackName string) func(ctx *pulumi.Context) error {
Expand Down
67 changes: 40 additions & 27 deletions deployments/pulumi/pkg/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@ import (

var ErrPostgresURIRequired = fmt.Errorf("postgresURI is required")

type UpgradeMode string

const (
UpgradeModeDisabled UpgradeMode = "disabled"
UpgradeModeJob UpgradeMode = "job"
UpgradeModeInApp UpgradeMode = "in-app"
)

type Component struct {
pulumi.ResourceState

ServiceName pulumix.Output[string]
ServiceNamespace pulumix.Output[string]
ServicePort pulumix.Output[int]
ServiceInternalURL pulumix.Output[string]
Migrations pulumix.Output[*batchv1.Job]
}

type PostgresArgs struct {
Expand Down Expand Up @@ -73,13 +80,12 @@ type ComponentArgs struct {
Debug pulumix.Input[bool]
ReplicaCount pulumix.Input[int]
GracePeriod pulumix.Input[string]
AutoUpgrade pulumix.Input[bool]
WaitUpgrade pulumix.Input[bool]
BallastSizeInBytes pulumix.Input[int]
NumscriptCacheMaxCount pulumix.Input[int]
BulkMaxSize pulumix.Input[int]
BulkParallel pulumix.Input[int]
TerminationGracePeriodSeconds pulumix.Input[*int]
Upgrade pulumix.Input[UpgradeMode]

ExperimentalFeatures pulumix.Input[bool]
ExperimentalNumscriptInterpreter pulumix.Input[bool]
Expand Down Expand Up @@ -129,14 +135,29 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
}
ledgerImage := pulumi.Sprintf("ghcr.io/formancehq/ledger:%s", tag)

autoUpgrade := pulumix.Val(true)
if args.AutoUpgrade != nil {
autoUpgrade = args.AutoUpgrade.ToOutput(ctx.Context())
upgradeMode := UpgradeModeInApp
if args.Upgrade != nil {
var (
upgradeModeChan = make(chan UpgradeMode, 1)
)
pulumix.ApplyErr(args.Upgrade, func(upgradeMode UpgradeMode) (any, error) {
upgradeModeChan <- upgradeMode
close(upgradeModeChan)
return nil, nil
})

select {
case <-ctx.Context().Done():
return nil, ctx.Context().Err()
case upgradeMode = <-upgradeModeChan:
if upgradeMode == "" {
upgradeMode = UpgradeModeInApp
}
}
Comment on lines +138 to +156
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential deadlock risk in upgradeMode initialization

The use of an unbuffered channel with pulumix.ApplyErr may lead to a deadlock if the ApplyErr callback is not invoked. This can occur if the input is not ready or an error occurs upstream.

Refactor the code to avoid using channels and handle the input more safely. Consider using ApplyT and Await to retrieve the value synchronously within the context:

 upgradeMode := UpgradeModeInApp
 if args.Upgrade != nil {
-	var (
-		upgradeModeChan = make(chan UpgradeMode, 1)
-	)
-	pulumix.ApplyErr(args.Upgrade, func(upgradeMode UpgradeMode) (any, error) {
-		upgradeModeChan <- upgradeMode
-		close(upgradeModeChan)
-		return nil, nil
-	})
-
-	select {
-	case <-ctx.Context().Done():
-		return nil, ctx.Context().Err()
-	case upgradeMode = <-upgradeModeChan:
-		if upgradeMode == "" {
-			upgradeMode = UpgradeModeInApp
-		}
-	}
+	upgradeModeOutput := args.Upgrade.ToOutput(ctx.Context())
+	err := upgradeModeOutput.ApplyT(func(mode UpgradeMode) (UpgradeMode, error) {
+		if mode == "" {
+			return UpgradeModeInApp, nil
+		}
+		upgradeMode = mode
+		return mode, nil
+	}).(pulumi.Output).Await(ctx.Context())
+	if err != nil {
+		return nil, err
+	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
upgradeMode := UpgradeModeInApp
if args.Upgrade != nil {
var (
upgradeModeChan = make(chan UpgradeMode, 1)
)
pulumix.ApplyErr(args.Upgrade, func(upgradeMode UpgradeMode) (any, error) {
upgradeModeChan <- upgradeMode
close(upgradeModeChan)
return nil, nil
})
select {
case <-ctx.Context().Done():
return nil, ctx.Context().Err()
case upgradeMode = <-upgradeModeChan:
if upgradeMode == "" {
upgradeMode = UpgradeModeInApp
}
}
upgradeMode := UpgradeModeInApp
if args.Upgrade != nil {
upgradeModeOutput := args.Upgrade.ToOutput(ctx.Context())
err := upgradeModeOutput.ApplyT(func(mode UpgradeMode) (UpgradeMode, error) {
if mode == "" {
return UpgradeModeInApp, nil
}
upgradeMode = mode
return mode, nil
}).(pulumi.Output).Await(ctx.Context())
if err != nil {
return nil, err
}
}

}

waitUpgrade := pulumix.Val(true)
if args.WaitUpgrade != nil {
waitUpgrade = args.WaitUpgrade.ToOutput(ctx.Context())
if upgradeMode != "" && upgradeMode != UpgradeModeDisabled && upgradeMode != UpgradeModeJob && upgradeMode != UpgradeModeInApp {
return nil, fmt.Errorf("invalid upgrade mode: %s", upgradeMode)
}

imagePullPolicy := pulumix.Val("")
Expand Down Expand Up @@ -351,18 +372,10 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
})
}

if args.AutoUpgrade != nil {
if upgradeMode == UpgradeModeInApp {
envVars = append(envVars, corev1.EnvVarArgs{
Name: pulumi.String("AUTO_UPGRADE"),
Value: pulumix.Apply2Err(autoUpgrade, waitUpgrade, func(autoUpgrade, waitUpgrade bool) (string, error) {
if waitUpgrade && !autoUpgrade {
return "", fmt.Errorf("waitUpgrade requires autoUpgrade to be true")
}
if !autoUpgrade {
return "false", nil
}
return "true", nil
}).Untyped().(pulumi.StringOutput),
Name: pulumi.String("AUTO_UPGRADE"),
Value: pulumi.String("true"),
})
}

Expand Down Expand Up @@ -501,11 +514,8 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
return nil, err
}

cmp.Migrations = pulumix.ApplyErr(waitUpgrade, func(waitUpgrade bool) (*batchv1.Job, error) {
if !waitUpgrade {
return nil, nil
}
return batchv1.NewJob(ctx, "wait-migration-completion", &batchv1.JobArgs{
if upgradeMode == UpgradeModeJob {
_, err = batchv1.NewJob(ctx, "migrate", &batchv1.JobArgs{
Metadata: &metav1.ObjectMetaArgs{
Namespace: namespace.Untyped().(pulumi.StringOutput),
},
Expand All @@ -515,7 +525,7 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
RestartPolicy: pulumi.String("OnFailure"),
Containers: corev1.ContainerArray{
corev1.ContainerArgs{
Name: pulumi.String("check"),
Name: pulumi.String("migrate"),
Args: pulumi.StringArray{
pulumi.String("migrate"),
},
Expand All @@ -537,7 +547,10 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
},
},
}, pulumi.Parent(cmp))
})
if err != nil {
return nil, err
}
}

service, err := corev1.NewService(ctx, "ledger", &corev1.ServiceArgs{
Metadata: &metav1.ObjectMetaArgs{
Expand Down
23 changes: 19 additions & 4 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,10 @@ Accept: application/json
}
}
}
]
],
"errorCode": "VALIDATION",
"errorMessage": "[VALIDATION] invalid 'cursor' query param",
"details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9"
}
```

Expand Down Expand Up @@ -3244,7 +3247,7 @@ Authorization ( Scopes: ledger:write )
|*anonymous*|INTERPRETER_PARSE|
|*anonymous*|INTERPRETER_RUNTIME|
|*anonymous*|LEDGER_ALREADY_EXISTS|
|*anonymous*|BUCKET_OUTDATED|
|*anonymous*|OUTDATED_SCHEMA|

<h2 id="tocS_V2LedgerInfoResponse">V2LedgerInfoResponse</h2>
<!-- backwards compatibility -->
Expand Down Expand Up @@ -3788,16 +3791,28 @@ and
}
}
}
]
],
"errorCode": "VALIDATION",
"errorMessage": "[VALIDATION] invalid 'cursor' query param",
"details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9"
}

```

### Properties

allOf

|Name|Type|Required|Restrictions|Description|
|---|---|---|---|---|
|*anonymous*|object|false|none|none|
|» data|[[V2BulkElementResult](#schemav2bulkelementresult)]|false|none|none|

and

|Name|Type|Required|Restrictions|Description|
|---|---|---|---|---|
|data|[[V2BulkElementResult](#schemav2bulkelementresult)]|true|none|none|
|*anonymous*|[V2ErrorResponse](#schemav2errorresponse)|false|none|none|
Comment on lines +3804 to +3815
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix duplicate heading in schema composition

The schema composition using allOf has duplicate headings which could cause confusion in the generated documentation.

Apply this fix to resolve the duplicate heading:

-allOf
-
-|Name|Type|Required|Restrictions|Description|
-|---|---|---|---|---|
-|*anonymous*|object|false|none|none|
-|» data|[[V2BulkElementResult](#schemav2bulkelementresult)]|false|none|none|
-
-and
-
|Name|Type|Required|Restrictions|Description|
|---|---|---|---|---|
|*anonymous*|[V2ErrorResponse](#schemav2errorresponse)|false|none|none|

Committable suggestion skipped: line range outside the PR's diff.


<h2 id="tocS_V2BulkElementResult">V2BulkElementResult</h2>
<!-- backwards compatibility -->
Expand Down
1 change: 0 additions & 1 deletion internal/api/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const (
ErrMetadataOverride = "METADATA_OVERRIDE"
ErrBulkSizeExceeded = "BULK_SIZE_EXCEEDED"
ErrLedgerAlreadyExists = "LEDGER_ALREADY_EXISTS"
ErrBucketOutdated = "BUCKET_OUTDATED"

ErrInterpreterParse = "INTERPRETER_PARSE"
ErrInterpreterRuntime = "INTERPRETER_RUNTIME"
Expand Down
2 changes: 1 addition & 1 deletion internal/api/v2/controllers_ledgers_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func createLedger(systemController system.Controller) http.HandlerFunc {
errors.Is(err, ledger.ErrInvalidBucketName{}):
api.BadRequest(w, common.ErrValidation, err)
case errors.Is(err, system.ErrBucketOutdated):
api.BadRequest(w, common.ErrBucketOutdated, err)
api.BadRequest(w, common.ErrOutdatedSchema, err)
case errors.Is(err, system.ErrLedgerAlreadyExists):
api.BadRequest(w, common.ErrLedgerAlreadyExists, err)
default:
Expand Down
2 changes: 1 addition & 1 deletion internal/api/v2/controllers_ledgers_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestLedgersCreate(t *testing.T) {
expectedBackendCall: true,
returnErr: system.ErrBucketOutdated,
expectStatusCode: http.StatusBadRequest,
expectErrorCode: common.ErrBucketOutdated,
expectErrorCode: common.ErrOutdatedSchema,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Error code inconsistency detected - ErrBucketOutdated still present in codebase

The change to ErrOutdatedSchema is incomplete as ErrBucketOutdated is still being used in multiple locations:

  • internal/storage/driver/driver.go: Returns systemcontroller.ErrBucketOutdated
  • internal/api/v2/controllers_ledgers_create.go: Checks for system.ErrBucketOutdated
  • internal/controller/system/errors.go: Defines ErrBucketOutdated

This creates an inconsistency where the same error condition is represented by two different error codes across the codebase.

🔗 Analysis chain

LGTM - Error code update aligns with new schema

The change from ErrBucketOutdated to ErrOutdatedSchema correctly reflects the updated error categorization in the main code.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify consistent usage of ErrOutdatedSchema across the codebase

# Check for any remaining references to ErrBucketOutdated
rg "ErrBucketOutdated"

# Check for consistent usage of ErrOutdatedSchema
rg "ErrOutdatedSchema"

Length of output: 1011

},
{
name: "unexpected error",
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type Bucket interface {
Migrate(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error
Migrate(ctx context.Context, opts ...migrations.Option) error
AddLedger(ctx context.Context, ledger ledger.Ledger) error
HasMinimalVersion(ctx context.Context) (bool, error)
IsUpToDate(ctx context.Context) (bool, error)
Expand Down
Loading
Loading