Skip to content

Add BadgerDB backup solution #204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,12 @@ func (agg *Aggregator) Start(ctx context.Context) error {

agg.init()

agg.logger.Infof("Initialize Storagre")
agg.logger.Infof("Initialize Storage")
if err := agg.initDB(ctx); err != nil {
agg.logger.Fatalf("failed to initialize storage", "error", err)
}


agg.logger.Infof("Starting Task engine")
agg.startTaskEngine(ctx)

Expand Down Expand Up @@ -268,6 +269,7 @@ func (agg *Aggregator) Start(ctx context.Context) error {
agg.status = shutdownStatus
agg.stopRepl()
agg.stopTaskEngine()

agg.db.Close()

return nil
Expand Down
37 changes: 37 additions & 0 deletions aggregator/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package aggregator

import (
"bufio"
"context"
"fmt"
"log"
"net"
"os"
"path/filepath"
"strings"
"time"
)

var (
Expand Down Expand Up @@ -140,6 +143,40 @@ func handleConnection(agg *Aggregator, conn net.Conn) {
case "trigger":
fmt.Fprintln(conn, "about to trigger on server")
//agg.engine.TriggerWith

case "backup":
if len(parts) == 2 {
backupDir := parts[1]
fmt.Fprintf(conn, "Starting backup to directory: %s\n", backupDir)

timestamp := fmt.Sprintf("%s", time.Now().Format("06-01-02-15-04"))
backupPath := filepath.Join(backupDir, timestamp)

if err := os.MkdirAll(backupPath, 0755); err != nil {
fmt.Fprintf(conn, "Failed to create backup directory: %v\n", err)
break
}

backupFile := filepath.Join(backupPath, "badger.backup")
f, err := os.Create(backupFile)
if err != nil {
fmt.Fprintf(conn, "Failed to create backup file: %v\n", err)
break
}

fmt.Fprintf(conn, "Running backup to %s\n", backupFile)
since := uint64(0) // Full backup
_, err = agg.db.Backup(context.Background(), f, since)
f.Close()

if err != nil {
fmt.Fprintf(conn, "Backup failed: %v\n", err)
} else {
fmt.Fprintf(conn, "Backup completed successfully to %s\n", backupFile)
}
} else {
fmt.Fprintln(conn, "Usage: backup <directory>")
}

default:
fmt.Fprintln(conn, "Unknown command:", command)
Expand Down
5 changes: 5 additions & 0 deletions config/operator_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ enable_node_api: true

db_path: /tmp/ap-avs-operator

backup:
enabled: false
interval_minutes: 60
backup_dir: "./backup"

# Destination chain where the task run, relace with your actualy target chain
target_chain:
eth_rpc_url: <sepolia-chain-rpc>
Expand Down
105 changes: 105 additions & 0 deletions core/backup/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package backup

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"github.com/AvaProtocol/ap-avs/storage"
"github.com/Layr-Labs/eigensdk-go/logging"
)

type Service struct {
logger logging.Logger
db storage.Storage
backupDir string
backupEnabled bool
interval time.Duration
stop chan struct{}
}

func NewService(logger logging.Logger, db storage.Storage, backupDir string) *Service {
return &Service{
logger: logger,
db: db,
backupDir: backupDir,
backupEnabled: false,
stop: make(chan struct{}),
}
}

func (s *Service) StartPeriodicBackup(interval time.Duration) error {
if s.backupEnabled {
return fmt.Errorf("backup service already running")
}

if err := os.MkdirAll(s.backupDir, 0755); err != nil {
return fmt.Errorf("failed to create backup directory: %v", err)
}

s.interval = interval
s.backupEnabled = true

if err := s.PerformBackup(); err != nil {
s.logger.Errorf("Initial backup failed: %v", err)
}

go s.backupLoop()

s.logger.Infof("Started periodic backup every %v to %s", interval, s.backupDir)
return nil
}

func (s *Service) StopPeriodicBackup() {
if !s.backupEnabled {
return
}

s.backupEnabled = false
close(s.stop)
s.logger.Infof("Stopped periodic backup")
}

func (s *Service) backupLoop() {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := s.PerformBackup(); err != nil {
s.logger.Errorf("Periodic backup failed: %v", err)
}
case <-s.stop:
return
}
}
}

func (s *Service) PerformBackup() error {
timestamp := time.Now().Format("06-01-02-15-04")
backupPath := filepath.Join(s.backupDir, timestamp)

if err := os.MkdirAll(backupPath, 0755); err != nil {
return fmt.Errorf("failed to create backup timestamp directory: %v", err)
}

backupFile := filepath.Join(backupPath, "badger.backup")
f, err := os.Create(backupFile)
if err != nil {
return fmt.Errorf("failed to create backup file: %v", err)
}
defer f.Close()

s.logger.Infof("Running backup to %s", backupFile)
since := uint64(0) // Full backup
_, err = s.db.Backup(context.Background(), f, since)
if err != nil {
return fmt.Errorf("backup operation failed: %v", err)
}

s.logger.Infof("Backup completed successfully to %s", backupFile)
return nil
}
20 changes: 20 additions & 0 deletions core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Config struct {
// Account abstraction config
SmartWallet *SmartWalletConfig

BackupConfig BackupConfig

SocketPath string
Environment sdklogging.LogLevel

Expand All @@ -72,6 +74,12 @@ type SmartWalletConfig struct {
PaymasterAddress common.Address
}

type BackupConfig struct {
Enabled bool // Whether periodic backups are enabled
IntervalMinutes int // Interval between backups in minutes
BackupDir string // Directory to store backups
}

// These are read from configPath
type ConfigRaw struct {
EcdsaPrivateKey string `yaml:"ecdsa_private_key"`
Expand All @@ -97,6 +105,12 @@ type ConfigRaw struct {
PaymasterAddress string `yaml:"paymaster_address"`
} `yaml:"smart_wallet"`

Backup struct {
Enabled bool `yaml:"enabled"`
IntervalMinutes int `yaml:"interval_minutes"`
BackupDir string `yaml:"backup_dir"`
} `yaml:"backup"`

SocketPath string `yaml:"socket_path"`

Macros map[string]map[string]string `yaml:"macros"`
Expand Down Expand Up @@ -205,6 +219,12 @@ func NewConfig(configFilePath string) (*Config, error) {
ControllerPrivateKey: controllerPrivateKey,
},

BackupConfig: BackupConfig{
Enabled: configRaw.Backup.Enabled,
IntervalMinutes: configRaw.Backup.IntervalMinutes,
BackupDir: configRaw.Backup.BackupDir,
},

SocketPath: configRaw.SocketPath,
MacroVars: configRaw.Macros["vars"],
MacroSecrets: configRaw.Macros["secrets"],
Expand Down
8 changes: 7 additions & 1 deletion operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ type OperatorConfig struct {

DbPath string `yaml:"db_path"`

Backup struct {
Enabled bool `yaml:"enabled"`
IntervalMinutes int `yaml:"interval_minutes"`
BackupDir string `yaml:"backup_dir"`
} `yaml:"backup"`

PublicMetricsPort int32

// Usually we don't need this, but on testnet, our target chain might be
Expand Down Expand Up @@ -536,4 +542,4 @@ func (o *Operator) GetSignature(ctx context.Context, message []byte) (*blscrypto
}

return sig, nil
}
}
4 changes: 4 additions & 0 deletions operator/worker_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (o *Operator) runWorkLoop(ctx context.Context) error {
o.scheduler.Start()
o.scheduler.NewJob(gocron.DurationJob(time.Second*5), gocron.NewTask(o.PingServer))

if o.config.DbPath != "" {
o.logger.Infof("Operator database path: %s", o.config.DbPath)
}

macros.SetRpc(o.config.TargetChain.EthWsUrl)
taskengine.SetRpc(o.config.TargetChain.EthRpcUrl)
taskengine.SetWsRpc(o.config.TargetChain.EthWsUrl)
Expand Down
13 changes: 13 additions & 0 deletions storage/db.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package storage

import (
"context"
"fmt"
"io"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -48,6 +50,9 @@ type Storage interface {
SetCounter(key []byte, value uint64) error
Vacuum() error

Backup(ctx context.Context, w io.Writer, since uint64) (uint64, error)
Load(ctx context.Context, r io.Reader) error

DbPath() string
}

Expand Down Expand Up @@ -488,3 +493,11 @@ func (a *BadgerStorage) SetCounter(key []byte, value uint64) error {
return txn.Set(key, []byte(counterStr))
})
}

func (bs *BadgerStorage) Backup(ctx context.Context, w io.Writer, since uint64) (uint64, error) {
return bs.db.Backup(w, since)
}

func (bs *BadgerStorage) Load(ctx context.Context, r io.Reader) error {
return bs.db.Load(r, 16) // 16 is a good default for the number of concurrent threads
}