Skip to content

feat: inject staker pk & addresses mapping #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions bin/init-mongo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,5 @@ db.createUser({
});
"

# Create the necessary indexes
mongosh --eval "
db = db.getSiblingDB('staking-api-service');
db.unbonding_queue.createIndex({'unbonding_tx_hash_hex': 1}, {unique: true});
db.timelock_queue.createIndex({'expire_height': 1}, {unique: false});
db.delegations.createIndex({'staker_pk_hex': 1, 'staking_tx.start_height': -1}, {unique: false});
db.delegations.createIndex({'staker_btc_address.taproot_address': 1, 'staking_tx.start_timestamp': -1}, {unique: false});
db.staker_stats.createIndex({'active_tvl': -1, '_id': 1}, {unique: false});
db.finality_providers_stats.createIndex({'active_tvl': -1, '_id': 1}, {unique: false});
"

# Keep the container running
tail -f /dev/null
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.22.4
require (
github.com/babylonlabs-io/babylon v0.9.0
github.com/babylonlabs-io/networks/parameters v0.2.2
github.com/babylonlabs-io/staking-queue-client v0.4.1
github.com/babylonlabs-io/staking-queue-client v0.4.3
github.com/btcsuite/btcd v0.24.2
github.com/btcsuite/btcd/btcec/v2 v2.3.2
github.com/btcsuite/btcd/btcutil v1.1.5
Expand Down Expand Up @@ -84,7 +84,7 @@ require (
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dgraph-io/badger/v2 v2.2007.4 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ github.com/babylonlabs-io/babylon v0.9.0 h1:dHZ9wUrI5XLaO4UIwJRgiCdnzFdi5yv7dpib
github.com/babylonlabs-io/babylon v0.9.0/go.mod h1:t7B4e+ooD2oYvAxkegtNKDL9bXe+vU29a8xnCQh+UKo=
github.com/babylonlabs-io/networks/parameters v0.2.2 h1:TCu39fZvjX5f6ZZrjhYe54M6wWxglNewuKu56yE+zrc=
github.com/babylonlabs-io/networks/parameters v0.2.2/go.mod h1:iEJVOzaLsE33vpP7J4u+CRGfkSIfErUAwRmgCFCBpyI=
github.com/babylonlabs-io/staking-queue-client v0.4.1 h1:AW+jtrNxZYN/isRx+njqjHbUU9CzhF42Ke6roK+0N3I=
github.com/babylonlabs-io/staking-queue-client v0.4.1/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/babylonlabs-io/staking-queue-client v0.4.3 h1:wYDE/LXc9zmeliKoy602UVtSwahP2iDxoTNYuIf/WrM=
github.com/babylonlabs-io/staking-queue-client v0.4.3/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -427,8 +427,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn
github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y=
github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218=
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f h1:U5y3Y5UE0w7amNe7Z5G/twsBW0KEalRQXZzf8ufSh9I=
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFMt8koVQZ6WFms69WAsDWr2XsYL3Hkl7jkoLE=
Expand Down
5 changes: 5 additions & 0 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ type DBClient interface {
CheckDelegationExistByStakerTaprootAddress(
ctx context.Context, address string, extraFilter *DelegationFilter,
) (bool, error)
// InsertPkAddressMappings inserts the btc public key and
// its corresponding btc addresses into the database.
InsertPkAddressMappings(
ctx context.Context, stakerPkHex, taproot, nativeSigwitOdd, nativeSigwitEven string,
) error
}

type DelegationFilter struct {
Expand Down
8 changes: 8 additions & 0 deletions internal/db/model/pk_address_mapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package model

type PkAddressMapping struct {
PkHex string `bson:"_id"`
Taproot string `bson:"taproot"`
NativeSegwitEven string `bson:"native_segwit_even"`
NativeSegwitOdd string `bson:"native_segwit_odd"`
}
6 changes: 6 additions & 0 deletions internal/db/model/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
UnbondingCollection = "unbonding_queue"
BtcInfoCollection = "btc_info"
UnprocessableMsgCollection = "unprocessable_messages"
PkAddressMappingsCollection = "pk_address_mappings"
)

type index struct {
Expand All @@ -43,6 +44,11 @@ var collections = map[string][]index{
UnbondingCollection: {{Indexes: map[string]int{"unbonding_tx_hash_hex": 1}, Unique: true}},
UnprocessableMsgCollection: {{Indexes: map[string]int{}}},
BtcInfoCollection: {{Indexes: map[string]int{}}},
PkAddressMappingsCollection: {
{Indexes: map[string]int{"taproot": 1}, Unique: true},
{Indexes: map[string]int{"nativeSegwit_odd": 1}, Unique: true},
{Indexes: map[string]int{"nativeSegwit_even": 1}, Unique: true},
},
}

func Setup(ctx context.Context, cfg *config.Config) error {
Expand Down
29 changes: 29 additions & 0 deletions internal/db/pk_address_mapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package db

import (
"context"

"github.com/babylonlabs-io/staking-api-service/internal/db/model"
"go.mongodb.org/mongo-driver/mongo"
)

func (db *Database) InsertPkAddressMappings(
ctx context.Context, pkHex, taproot, nativeSigwitOdd, nativeSigwitEven string,
) error {
client := db.Client.Database(db.DbName).Collection(model.PkAddressMappingsCollection)
addressMapping := &model.PkAddressMapping{
PkHex: pkHex,
Taproot: taproot,
NativeSegwitOdd: nativeSigwitOdd,
NativeSegwitEven: nativeSigwitEven,
}
_, err := client.InsertOne(ctx, addressMapping)
if err != nil {
// If the document already exists, ignore the error
if mongo.IsDuplicateKeyError(err) {
return nil
}
return err
}
return nil
}
26 changes: 12 additions & 14 deletions internal/queue/handlers/active_staking.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@ func (h *QueueHandler) ActiveStakingHandler(ctx context.Context, messageBody str
return nil
}

// We only emit the stats event for the active staking if it is not an overflow event
if !activeStakingEvent.IsOverflow {
// Perform the async stats calculation by emit the stats event
statsError := h.EmitStatsEvent(ctx, queueClient.NewStatsEvent(
activeStakingEvent.StakingTxHashHex,
activeStakingEvent.StakerPkHex,
activeStakingEvent.FinalityProviderPkHex,
activeStakingEvent.StakingValue,
types.Active.ToString(),
))
if statsError != nil {
log.Ctx(ctx).Error().Err(statsError).Msg("Failed to emit stats event for active staking")
return statsError
}
// Perform the async metadata calculation by emit the stats event
statsError := h.EmitStatsEvent(ctx, queueClient.NewStatsEvent(
activeStakingEvent.StakingTxHashHex,
activeStakingEvent.StakerPkHex,
activeStakingEvent.FinalityProviderPkHex,
activeStakingEvent.StakingValue,
types.Active.ToString(),
activeStakingEvent.IsOverflow,
))
if statsError != nil {
log.Ctx(ctx).Error().Err(statsError).Msg("Failed to emit stats event for active staking")
return statsError
}

// Perform the async timelock expire check
Expand Down
71 changes: 60 additions & 11 deletions internal/queue/handlers/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ import (
"github.com/rs/zerolog/log"
)

// StatsHandler handles the processing of stats event from the message queue.
// This handler is responsible for processing non-critical events such as
// statistics calculations and other metadata-related tasks.
// It performs the following operations:
// 1. If the event corresponds to an active delegation, it transforms the staker's public key
// into corresponding BTC addresses for lookup purposes, and saves them to the database.
// 2. Executes the staking statistics calculation using the provided event data.
//
// If any step fails, it logs the error and returns a corresponding error response.
// which will be sent back to the message queue for later retry
func (h *QueueHandler) StatsHandler(ctx context.Context, messageBody string) *types.Error {
var statsEvent queueClient.StatsEvent
err := json.Unmarshal([]byte(messageBody), &statsEvent)
Expand All @@ -23,18 +33,57 @@ func (h *QueueHandler) StatsHandler(ctx context.Context, messageBody string) *ty
log.Ctx(ctx).Error().Err(err).Msg("Failed to convert statsEvent.State to DelegationState")
return types.NewError(http.StatusBadRequest, types.BadRequest, err)
}
// For backwards compatibility reason, we will check msg version to determine
// if we need to look up the overflow status from db
// in version 1, we added the new field IsOverflow to the event
// Below code will be removed after service being fully rollout
isOverflow := statsEvent.IsOverflow
if statsEvent.SchemaVersion < 1 {
// Look up the overflow status from the database
overflow, overflowErr := h.Services.GetDelegation(ctx, statsEvent.GetStakingTxHashHex())
if overflowErr != nil {
log.Ctx(ctx).Error().Err(overflowErr).Msg("Failed to get overflow status")
return overflowErr
}
isOverflow = overflow.IsOverflow
}

// Perform the address lookup conversion
addressLookupErr := h.performAddressLookupConversion(ctx, statsEvent.StakerPkHex, state)
if addressLookupErr != nil {
return addressLookupErr
}

// Perform the stats calculation only if the event is not an overflow event
if !isOverflow {
// Perform the stats calculation
statsErr := h.Services.ProcessStakingStatsCalculation(
ctx, statsEvent.StakingTxHashHex,
statsEvent.StakerPkHex,
statsEvent.FinalityProviderPkHex,
state,
statsEvent.StakingValue,
)
if statsErr != nil {
log.Ctx(ctx).Error().Err(statsErr).Msg("Failed to process staking stats calculation")
return statsErr
}
}
return nil
}

// Perform the stats calculation
statsErr := h.Services.ProcessStakingStatsCalculation(
ctx, statsEvent.StakingTxHashHex,
statsEvent.StakerPkHex,
statsEvent.FinalityProviderPkHex,
state,
statsEvent.StakingValue,
)
if statsErr != nil {
log.Error().Err(statsErr).Msg("Failed to process staking stats calculation")
return statsErr
// Convert the staker's public key into corresponding BTC addresses for
// database lookup. This is performed only for active delegation events to
// prevent duplicated database writes.
func (h *QueueHandler) performAddressLookupConversion(ctx context.Context, stakerPkHex string, state types.DelegationState) *types.Error {
// Perform the address lookup conversion only for active delegation events
// to prevent duplicated database writes
if state == types.Active {
addErr := h.Services.ProcessAndSaveBtcAddresses(ctx, stakerPkHex)
if addErr != nil {
log.Ctx(ctx).Error().Err(addErr).Msg("Failed to process and save btc addresses")
return addErr
}
}
return nil
}
32 changes: 15 additions & 17 deletions internal/queue/handlers/unbonding.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,21 @@ func (h *QueueHandler) UnbondingStakingHandler(ctx context.Context, messageBody
return expireCheckErr
}

// We only emit the stats event for the staking tx that is not an overflow event
if !del.IsOverflow {
// Perform the async stats calculation by emit the stats event
// NOTE: We no longer perform the stats calculation for timelock expired event
// This is based on the assumption that phase 1 launch date + min timelock will be over the lauch of phase 2 date
statsError := h.EmitStatsEvent(ctx, queueClient.NewStatsEvent(
del.StakingTxHashHex,
del.StakerPkHex,
del.FinalityProviderPkHex,
del.StakingValue,
types.Unbonded.ToString(),
))
if statsError != nil {
log.Ctx(ctx).Error().Err(statsError).Str("stakingTxHashHex", del.StakingTxHashHex).
Msg("Failed to emit stats event for unbonding staking")
return statsError
}
// Perform the async stats calculation by emit the stats event
// NOTE: We no longer perform the stats calculation for timelock expired event
// This is based on the assumption that phase 1 launch date + min timelock will be over the lauch of phase 2 date
statsError := h.EmitStatsEvent(ctx, queueClient.NewStatsEvent(
del.StakingTxHashHex,
del.StakerPkHex,
del.FinalityProviderPkHex,
del.StakingValue,
types.Unbonded.ToString(),
del.IsOverflow,
))
if statsError != nil {
log.Ctx(ctx).Error().Err(statsError).Str("stakingTxHashHex", del.StakingTxHashHex).
Msg("Failed to emit stats event for unbonding staking")
return statsError
}

// Save the unbonding staking delegation. This is the final step in the unbonding staking event processing
Expand Down
4 changes: 2 additions & 2 deletions internal/services/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *Services) SaveActiveStakingDelegation(
value, startHeight uint64, stakingTimestamp int64, timeLock, stakingOutputIndex uint64,
stakingTxHex string, isOverflow bool,
) *types.Error {
taprootAddress, err := utils.GetTaprootAddressFromPk(stakerPkHex, s.cfg.Server.BTCNetParam)
addresses, err := utils.DeriveAddressesFromNoCoordPk(stakerPkHex, s.cfg.Server.BTCNetParam)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Failed to get taproot address from staker pk")
return types.NewErrorWithMsg(
Expand All @@ -92,7 +92,7 @@ func (s *Services) SaveActiveStakingDelegation(
}
err = s.DbClient.SaveActiveStakingDelegation(
ctx, txHashHex, stakerPkHex, finalityProviderPkHex, stakingTxHex,
value, startHeight, timeLock, stakingOutputIndex, stakingTimestamp, isOverflow, taprootAddress,
value, startHeight, timeLock, stakingOutputIndex, stakingTimestamp, isOverflow, addresses.Taproot,
)
if err != nil {
if ok := db.IsDuplicateKeyError(err); ok {
Expand Down
35 changes: 35 additions & 0 deletions internal/services/staker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package services

import (
"context"
"net/http"

"github.com/babylonlabs-io/staking-api-service/internal/types"
"github.com/babylonlabs-io/staking-api-service/internal/utils"
"github.com/rs/zerolog/log"
)

// Given the staker public key, transform into multiple btc addresses and save them in the db.
func (s *Services) ProcessAndSaveBtcAddresses(
ctx context.Context, stakerPkHex string,
) *types.Error {
// Prepare the btc addresses
addresses, err := utils.DeriveAddressesFromNoCoordPk(stakerPkHex, s.cfg.Server.BTCNetParam)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Failed to derive addresses from staker pk")
return types.NewErrorWithMsg(
http.StatusBadRequest, types.BadRequest, "failed to derive addresses from staker pk",
)
}

// Try to save the btc addresses, ignore if they already exist
err = s.DbClient.InsertPkAddressMappings(
ctx, stakerPkHex, addresses.Taproot,
addresses.NativeSegwitOdd, addresses.NativeSegwitEven,
)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Failed to save btc addresses")
return types.NewInternalServiceError(err)
}
return nil
}
Loading
Loading