Skip to content

Commit dbb5d73

Browse files
accounts: add migration code from kvdb to SQL
This commit introduces the migration logic for transitioning the accounts store from kvdb to SQL. Note that as of this commit, the migration is not yet triggered by any production code, i.e. only tests execute the migration logic.
1 parent 6030f65 commit dbb5d73

File tree

3 files changed

+581
-2
lines changed

3 files changed

+581
-2
lines changed

accounts/sql_migration.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package accounts
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"fmt"
8+
"math"
9+
"reflect"
10+
"time"
11+
12+
"github.com/davecgh/go-spew/spew"
13+
"github.com/lightninglabs/lightning-terminal/db/sqlc"
14+
"github.com/pmezard/go-difflib/difflib"
15+
)
16+
17+
var (
18+
// ErrMigrationMismatch is returned when the migrated account does not
19+
// match the original account.
20+
ErrMigrationMismatch = fmt.Errorf("migrated account does not match " +
21+
"original account")
22+
)
23+
24+
// MigrateAccountStoreToSQL runs the migration of all accounts and indices from
25+
// the KV database to the SQL database. The migration is done in a single
26+
// transaction to ensure that all accounts are migrated or none at all.
27+
func MigrateAccountStoreToSQL(ctx context.Context, kvStore *BoltStore,
28+
tx SQLQueries) error {
29+
30+
log.Infof("Starting migration of the KV accounts store to SQL")
31+
32+
err := migrateAccountsToSQL(ctx, kvStore, tx)
33+
if err != nil {
34+
return fmt.Errorf("unsuccessful migration of accounts to "+
35+
"SQL: %w", err)
36+
}
37+
38+
err = migrateAccountsIndicesToSQL(ctx, kvStore, tx)
39+
if err != nil {
40+
return fmt.Errorf("unsuccessful migration of account indices "+
41+
"to SQL: %w", err)
42+
}
43+
44+
return nil
45+
}
46+
47+
// migrateAccountsToSQL runs the migration of all accounts from the KV database
48+
// to the SQL database. The migration is done in a single transaction to ensure
49+
// that all accounts are migrated or none at all.
50+
func migrateAccountsToSQL(ctx context.Context, kvStore *BoltStore,
51+
tx SQLQueries) error {
52+
53+
log.Infof("Starting migration of accounts from KV to SQL")
54+
55+
kvAccounts, err := kvStore.Accounts(ctx)
56+
if err != nil {
57+
return err
58+
}
59+
60+
for _, kvAccount := range kvAccounts {
61+
migratedAccountID, err := migrateSingleAccountToSQL(
62+
ctx, tx, kvAccount,
63+
)
64+
if err != nil {
65+
return fmt.Errorf("unable to migrate account(%v): %w",
66+
kvAccount.ID, err)
67+
}
68+
69+
migratedAccount, err := getAndMarshalAccount(
70+
ctx, tx, migratedAccountID,
71+
)
72+
if err != nil {
73+
return fmt.Errorf("unable to fetch migrated "+
74+
"account(%v): %w", kvAccount.ID, err)
75+
}
76+
77+
overrideAccountTimeZone(kvAccount)
78+
overrideAccountTimeZone(migratedAccount)
79+
80+
if !reflect.DeepEqual(kvAccount, migratedAccount) {
81+
diff := difflib.UnifiedDiff{
82+
A: difflib.SplitLines(
83+
spew.Sdump(kvAccount),
84+
),
85+
B: difflib.SplitLines(
86+
spew.Sdump(migratedAccount),
87+
),
88+
FromFile: "Expected",
89+
FromDate: "",
90+
ToFile: "Actual",
91+
ToDate: "",
92+
Context: 3,
93+
}
94+
diffText, _ := difflib.GetUnifiedDiffString(diff)
95+
96+
return fmt.Errorf("%w: %v.\n%v", ErrMigrationMismatch,
97+
kvAccount.ID, diffText)
98+
}
99+
}
100+
101+
log.Infof("All accounts migrated from KV to SQL. Total number of "+
102+
"accounts migrated: %d", len(kvAccounts))
103+
104+
return nil
105+
}
106+
107+
// migrateSingleAccountToSQL runs the migration for a single account from the
108+
// KV database to the SQL database.
109+
func migrateSingleAccountToSQL(ctx context.Context,
110+
tx SQLQueries, account *OffChainBalanceAccount) (int64, error) {
111+
112+
accountAlias, err := account.ID.ToInt64()
113+
if err != nil {
114+
return 0, err
115+
}
116+
117+
insertAccountParams := sqlc.InsertAccountParams{
118+
Type: int16(account.Type),
119+
InitialBalanceMsat: int64(account.InitialBalance),
120+
CurrentBalanceMsat: account.CurrentBalance,
121+
LastUpdated: account.LastUpdate.UTC(),
122+
Alias: accountAlias,
123+
Expiration: account.ExpirationDate.UTC(),
124+
Label: sql.NullString{
125+
String: account.Label,
126+
Valid: len(account.Label) > 0,
127+
},
128+
}
129+
130+
sqlId, err := tx.InsertAccount(ctx, insertAccountParams)
131+
if err != nil {
132+
return 0, err
133+
}
134+
135+
for hash := range account.Invoices {
136+
addInvoiceParams := sqlc.AddAccountInvoiceParams{
137+
AccountID: sqlId,
138+
Hash: hash[:],
139+
}
140+
141+
err = tx.AddAccountInvoice(ctx, addInvoiceParams)
142+
if err != nil {
143+
return sqlId, err
144+
}
145+
}
146+
147+
for hash, paymentEntry := range account.Payments {
148+
upsertPaymentParams := sqlc.UpsertAccountPaymentParams{
149+
AccountID: sqlId,
150+
Hash: hash[:],
151+
Status: int16(paymentEntry.Status),
152+
FullAmountMsat: int64(paymentEntry.FullAmount),
153+
}
154+
155+
err = tx.UpsertAccountPayment(ctx, upsertPaymentParams)
156+
if err != nil {
157+
return sqlId, err
158+
}
159+
}
160+
161+
return sqlId, nil
162+
}
163+
164+
// migrateAccountsIndicesToSQL runs the migration for the account indices from
165+
// the KV database to the SQL database.
166+
func migrateAccountsIndicesToSQL(ctx context.Context, kvStore *BoltStore,
167+
tx SQLQueries) error {
168+
169+
log.Infof("Starting migration of accounts indices from KV to SQL")
170+
171+
addIndex, settleIndex, err := kvStore.LastIndexes(ctx)
172+
if errors.Is(err, ErrNoInvoiceIndexKnown) {
173+
log.Infof("No indices found in KV store, skipping migration")
174+
return nil
175+
} else if err != nil {
176+
return err
177+
}
178+
179+
if addIndex > math.MaxInt64 {
180+
return fmt.Errorf("%s:%v is above max int64 value",
181+
addIndexName, addIndex)
182+
}
183+
184+
if settleIndex > math.MaxInt64 {
185+
return fmt.Errorf("%s:%v is above max int64 value",
186+
settleIndexName, settleIndex)
187+
}
188+
189+
setAddIndexParams := sqlc.SetAccountIndexParams{
190+
Name: addIndexName,
191+
Value: int64(addIndex),
192+
}
193+
194+
err = tx.SetAccountIndex(ctx, setAddIndexParams)
195+
if err != nil {
196+
return err
197+
}
198+
199+
setSettleIndexParams := sqlc.SetAccountIndexParams{
200+
Name: settleIndexName,
201+
Value: int64(settleIndex),
202+
}
203+
204+
err = tx.SetAccountIndex(ctx, setSettleIndexParams)
205+
if err != nil {
206+
return err
207+
}
208+
209+
log.Infof("Successfully migratated accounts indices from KV to SQL")
210+
211+
return nil
212+
}
213+
214+
// overrideAccountTimeZone overrides the time zone of the account to the local
215+
// time zone and chops off the nanosecond part for comparison. This is needed
216+
// because KV database stores times as-is which as an unwanted side effect would
217+
// fail migration due to time comparison expecting both the original and
218+
// migrated accounts to be in the same local time zone and in microsecond
219+
// precision. Note that PostgresSQL stores times in microsecond precision while
220+
// SQLite can store times in nanosecond precision if using TEXT storage class.
221+
func overrideAccountTimeZone(account *OffChainBalanceAccount) {
222+
fixTime := func(t time.Time) time.Time {
223+
return t.In(time.Local).Truncate(time.Microsecond)
224+
}
225+
226+
if !account.ExpirationDate.IsZero() {
227+
account.ExpirationDate = fixTime(account.ExpirationDate)
228+
}
229+
230+
if !account.LastUpdate.IsZero() {
231+
account.LastUpdate = fixTime(account.LastUpdate)
232+
}
233+
}

0 commit comments

Comments
 (0)