Skip to content

Commit 92bd39e

Browse files
authored
Merge pull request #6106 from kantai/feat/miners-versioning
Feat: persist stackerdb versioning for signers and miner
2 parents 7a23180 + 413a917 commit 92bd39e

File tree

18 files changed

+695
-83
lines changed

18 files changed

+695
-83
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to the versioning scheme outlined in the [README.md](README.md).
77

8+
## [Unreleased]
9+
10+
### Added
11+
- Persisted tracking of StackerDB slot versions for mining. This improves miner p2p performance.
12+
813
## [3.1.0.0.9]
914

1015
### Added

stacks-signer/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to the versioning scheme outlined in the [README.md](README.md).
77

8+
## [Unreleased]
9+
10+
### Added
11+
- Persisted tracking of StackerDB slot versions. This improves signer p2p performance.
12+
813
## [3.1.0.0.9.0]
914

1015
### Changed

stacks-signer/src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub enum ClientError {
4949
/// Failed to sign stacker-db chunk
5050
#[error("Failed to sign stacker-db chunk: {0}")]
5151
FailToSign(#[from] StackerDBError),
52+
/// Failed on a DBError
53+
#[error("SignerDB database error: {0}")]
54+
SignerDBError(#[from] blockstack_lib::util_lib::db::Error),
5255
/// Stacker-db instance rejected the chunk
5356
#[error("Stacker-db rejected the chunk. Reason: {0}")]
5457
PutChunkRejected(String),

stacks-signer/src/client/stackerdb.rs

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
//
1717
use blockstack_lib::net::api::poststackerdbchunk::StackerDBErrorCodes;
1818
use clarity::codec::read_next;
19+
use clarity::types::chainstate::StacksPublicKey;
1920
use hashbrown::HashMap;
2021
use libsigner::{MessageSlotID, SignerMessage, SignerSession, StackerDBSession};
2122
use libstackerdb::{StackerDBChunkAckData, StackerDBChunkData};
@@ -25,6 +26,7 @@ use stacks_common::{debug, info, warn};
2526

2627
use crate::client::{retry_with_exponential_backoff, ClientError};
2728
use crate::config::{SignerConfig, SignerConfigMode};
29+
use crate::signerdb::SignerDb;
2830

2931
/// The signer StackerDB slot ID, purposefully wrapped to prevent conflation with SignerID
3032
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, PartialOrd, Ord)]
@@ -50,13 +52,13 @@ pub struct StackerDB<M: MessageSlotID + std::cmp::Eq> {
5052
signers_message_stackerdb_sessions: HashMap<M, StackerDBSession>,
5153
/// The private key used in all stacks node communications
5254
stacks_private_key: StacksPrivateKey,
53-
/// A map of a message ID to last chunk version for each session
54-
slot_versions: HashMap<M, HashMap<SignerSlotID, u32>>,
5555
/// The running mode of the stackerdb (whether the signer is running in dry-run or
5656
/// normal operation)
5757
mode: StackerDBMode,
5858
/// The reward cycle of the connecting signer
5959
reward_cycle: u64,
60+
/// signerdb connection
61+
signer_db: SignerDb,
6062
}
6163

6264
impl<M: MessageSlotID + 'static> From<&SignerConfig> for StackerDB<M> {
@@ -69,12 +71,14 @@ impl<M: MessageSlotID + 'static> From<&SignerConfig> for StackerDB<M> {
6971
signer_slot_id: *signer_slot_id,
7072
},
7173
};
74+
let signer_db = SignerDb::new(&config.db_path).expect("Failed to connect to SignerDb");
7275

7376
Self::new(
7477
&config.node_host,
7578
config.stacks_private_key,
7679
config.mainnet,
7780
config.reward_cycle,
81+
signer_db,
7882
mode,
7983
)
8084
}
@@ -89,12 +93,14 @@ impl<M: MessageSlotID + 'static> StackerDB<M> {
8993
is_mainnet: bool,
9094
reward_cycle: u64,
9195
signer_slot_id: SignerSlotID,
96+
signer_db: SignerDb,
9297
) -> Self {
9398
Self::new(
9499
host,
95100
stacks_private_key,
96101
is_mainnet,
97102
reward_cycle,
103+
signer_db,
98104
StackerDBMode::Normal { signer_slot_id },
99105
)
100106
}
@@ -105,6 +111,7 @@ impl<M: MessageSlotID + 'static> StackerDB<M> {
105111
stacks_private_key: StacksPrivateKey,
106112
is_mainnet: bool,
107113
reward_cycle: u64,
114+
signer_db: SignerDb,
108115
signer_mode: StackerDBMode,
109116
) -> Self {
110117
let mut signers_message_stackerdb_sessions = HashMap::new();
@@ -117,9 +124,9 @@ impl<M: MessageSlotID + 'static> StackerDB<M> {
117124
Self {
118125
signers_message_stackerdb_sessions,
119126
stacks_private_key,
120-
slot_versions: HashMap::new(),
121127
mode: signer_mode,
122128
reward_cycle,
129+
signer_db,
123130
}
124131
}
125132

@@ -160,28 +167,21 @@ impl<M: MessageSlotID + 'static> StackerDB<M> {
160167
code: None,
161168
});
162169
};
170+
let signer_pk = StacksPublicKey::from_private(&self.stacks_private_key);
163171
loop {
164-
let mut slot_version = if let Some(versions) = self.slot_versions.get_mut(msg_id) {
165-
if let Some(version) = versions.get(slot_id) {
166-
*version
167-
} else {
168-
versions.insert(*slot_id, 0);
169-
1
170-
}
171-
} else {
172-
let mut versions = HashMap::new();
173-
versions.insert(*slot_id, 0);
174-
self.slot_versions.insert(*msg_id, versions);
175-
1
176-
};
177-
178-
let mut chunk = StackerDBChunkData::new(slot_id.0, slot_version, message_bytes.clone());
179-
chunk.sign(&self.stacks_private_key)?;
172+
let slot_version = self
173+
.signer_db
174+
.get_latest_chunk_version(&signer_pk, slot_id.0)?
175+
.map(|x| x.saturating_add(1))
176+
.unwrap_or(0);
180177

181178
let Some(session) = self.signers_message_stackerdb_sessions.get_mut(msg_id) else {
182179
panic!("FATAL: would loop forever trying to send a message with ID {msg_id:?}, for which we don't have a session");
183180
};
184181

182+
let mut chunk = StackerDBChunkData::new(slot_id.0, slot_version, message_bytes.clone());
183+
chunk.sign(&self.stacks_private_key)?;
184+
185185
debug!(
186186
"Sending a chunk to stackerdb slot ID {slot_id} with version {slot_version} and message ID {msg_id:?} to contract {:?}!\n{chunk:?}",
187187
&session.stackerdb_contract_id
@@ -190,15 +190,10 @@ impl<M: MessageSlotID + 'static> StackerDB<M> {
190190
let send_request = || session.put_chunk(&chunk).map_err(backoff::Error::transient);
191191
let chunk_ack: StackerDBChunkAckData = retry_with_exponential_backoff(send_request)?;
192192

193-
if let Some(versions) = self.slot_versions.get_mut(msg_id) {
194-
// NOTE: per the above, this is always executed
195-
versions.insert(*slot_id, slot_version.saturating_add(1));
196-
} else {
197-
return Err(ClientError::NotConnected);
198-
}
199-
200193
if chunk_ack.accepted {
201194
debug!("Chunk accepted by stackerdb: {chunk_ack:?}");
195+
self.signer_db
196+
.set_latest_chunk_version(&signer_pk, slot_id.0, slot_version)?;
202197
return Ok(chunk_ack);
203198
} else {
204199
warn!("Chunk rejected by stackerdb: {chunk_ack:?}");
@@ -208,15 +203,18 @@ impl<M: MessageSlotID + 'static> StackerDB<M> {
208203
Some(StackerDBErrorCodes::DataAlreadyExists) => {
209204
if let Some(slot_metadata) = chunk_ack.metadata {
210205
warn!("Failed to send message to stackerdb due to wrong version number. Attempted {}. Expected {}. Retrying...", slot_version, slot_metadata.slot_version);
211-
slot_version = slot_metadata.slot_version;
206+
self.signer_db.set_latest_chunk_version(
207+
&signer_pk,
208+
slot_id.0,
209+
slot_metadata.slot_version,
210+
)?;
212211
} else {
213212
warn!("Failed to send message to stackerdb due to wrong version number. Attempted {}. Expected unknown version number. Incrementing and retrying...", slot_version);
214-
}
215-
if let Some(versions) = self.slot_versions.get_mut(msg_id) {
216-
// NOTE: per the above, this is always executed
217-
versions.insert(*slot_id, slot_version.saturating_add(1));
218-
} else {
219-
return Err(ClientError::NotConnected);
213+
self.signer_db.set_latest_chunk_version(
214+
&signer_pk,
215+
slot_id.0,
216+
slot_version,
217+
)?;
220218
}
221219
}
222220
_ => {

stacks-signer/src/signerdb.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use blockstack_lib::util_lib::db::{
2727
query_row, query_rows, sqlite_open, table_exists, tx_begin_immediate, u64_to_sql,
2828
Error as DBError, FromRow,
2929
};
30-
use clarity::types::chainstate::{BurnchainHeaderHash, StacksAddress};
30+
use clarity::types::chainstate::{BurnchainHeaderHash, StacksAddress, StacksPublicKey};
3131
use clarity::types::Address;
3232
use libsigner::v0::messages::{RejectReason, RejectReasonPrefix, StateMachineUpdate};
3333
use libsigner::BlockProposal;
@@ -603,6 +603,14 @@ static ADD_PARENT_BURN_BLOCK_HASH_INDEX: &str = r#"
603603
CREATE INDEX IF NOT EXISTS burn_blocks_parent_burn_block_hash_idx on burn_blocks (parent_burn_block_hash);
604604
"#;
605605

606+
static CREATE_STACKERDB_TRACKING: &str = "
607+
CREATE TABLE stackerdb_tracking(
608+
public_key TEXT NOT NULL,
609+
slot_id INTEGER NOT NULL,
610+
slot_version INTEGER NOT NULL,
611+
PRIMARY KEY (public_key, slot_id)
612+
) STRICT;";
613+
606614
static SCHEMA_1: &[&str] = &[
607615
DROP_SCHEMA_0,
608616
CREATE_DB_CONFIG,
@@ -695,6 +703,11 @@ static SCHEMA_13: &[&str] = &[
695703
"INSERT INTO db_config (version) VALUES (13);",
696704
];
697705

706+
static SCHEMA_14: &[&str] = &[
707+
CREATE_STACKERDB_TRACKING,
708+
"INSERT INTO db_config (version) VALUES (14);",
709+
];
710+
698711
impl SignerDb {
699712
/// The current schema version used in this build of the signer binary.
700713
pub const SCHEMA_VERSION: u32 = 12;
@@ -909,6 +922,20 @@ impl SignerDb {
909922
Ok(())
910923
}
911924

925+
/// Migrate from schema 13 to schema 14
926+
fn schema_14_migration(tx: &Transaction) -> Result<(), DBError> {
927+
if Self::get_schema_version(tx)? >= 14 {
928+
// no migration necessary
929+
return Ok(());
930+
}
931+
932+
for statement in SCHEMA_14.iter() {
933+
tx.execute_batch(statement)?;
934+
}
935+
936+
Ok(())
937+
}
938+
912939
/// Register custom scalar functions used by the database
913940
fn register_scalar_functions(&self) -> Result<(), DBError> {
914941
// Register helper function for determining if a block is a tenure change transaction
@@ -955,7 +982,8 @@ impl SignerDb {
955982
10 => Self::schema_11_migration(&sql_tx)?,
956983
11 => Self::schema_12_migration(&sql_tx)?,
957984
12 => Self::schema_13_migration(&sql_tx)?,
958-
13 => break,
985+
13 => Self::schema_14_migration(&sql_tx)?,
986+
14 => break,
959987
x => return Err(DBError::Other(format!(
960988
"Database schema is newer than supported by this binary. Expected version = {}, Database version = {x}",
961989
Self::SCHEMA_VERSION,
@@ -975,6 +1003,36 @@ impl SignerDb {
9751003
)
9761004
}
9771005

1006+
/// Get the latest known version from the db for the given slot_id/pk pair
1007+
pub fn get_latest_chunk_version(
1008+
&self,
1009+
pk: &StacksPublicKey,
1010+
slot_id: u32,
1011+
) -> Result<Option<u32>, DBError> {
1012+
self.db
1013+
.query_row(
1014+
"SELECT slot_version FROM stackerdb_tracking WHERE public_key = ? AND slot_id = ?",
1015+
params![pk.to_hex(), slot_id],
1016+
|row| row.get(0),
1017+
)
1018+
.optional()
1019+
.map_err(DBError::from)
1020+
}
1021+
1022+
/// Set the latest known version for the given slot_id/pk pair
1023+
pub fn set_latest_chunk_version(
1024+
&self,
1025+
pk: &StacksPublicKey,
1026+
slot_id: u32,
1027+
slot_version: u32,
1028+
) -> Result<(), DBError> {
1029+
self.db.execute(
1030+
"INSERT OR REPLACE INTO stackerdb_tracking (public_key, slot_id, slot_version) VALUES (?, ?, ?)",
1031+
params![pk.to_hex(), slot_id, slot_version],
1032+
)?;
1033+
Ok(())
1034+
}
1035+
9781036
/// Get the signer state for the provided reward cycle if it exists in the database
9791037
pub fn get_encrypted_signer_state(
9801038
&self,

stackslib/src/burnchains/bitcoin/blocks.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,9 @@ impl BitcoinBlockParser {
285285
test_debug!("Data output does not use a standard OP_RETURN");
286286
return None;
287287
}
288+
if data.len() <= MAGIC_BYTES_LENGTH {
289+
return None;
290+
}
288291
if !data.starts_with(self.magic_bytes.as_bytes()) {
289292
test_debug!("Data output does not start with magic bytes");
290293
return None;

stackslib/src/net/api/tests/mod.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
use std::net::SocketAddr;
1818
use std::sync::{Arc, Mutex};
19+
use std::thread;
20+
use std::time::{Duration, Instant};
1921

2022
use clarity::vm::costs::ExecutionCost;
2123
use clarity::vm::types::{QualifiedContractIdentifier, StacksAddressExtensions};
@@ -1060,16 +1062,20 @@ impl<'a> TestRPC<'a> {
10601062
}
10611063

10621064
pub fn run(self, requests: Vec<StacksHttpRequest>) -> Vec<StacksHttpResponse> {
1063-
self.run_with_observer(requests, None)
1065+
self.run_with_observer(requests, None, |_, _| true)
10641066
}
10651067

10661068
/// Run zero or more HTTP requests on this setup RPC test harness.
10671069
/// Return the list of responses.
1068-
pub fn run_with_observer(
1070+
pub fn run_with_observer<F>(
10691071
self,
10701072
requests: Vec<StacksHttpRequest>,
10711073
event_observer: Option<&dyn MemPoolEventDispatcher>,
1072-
) -> Vec<StacksHttpResponse> {
1074+
wait_for: F,
1075+
) -> Vec<StacksHttpResponse>
1076+
where
1077+
F: Fn(&mut TestPeer, &mut TestPeer) -> bool,
1078+
{
10731079
let mut peer_1 = self.peer_1;
10741080
let mut peer_2 = self.peer_2;
10751081
let peer_1_indexer = self.peer_1_indexer;
@@ -1079,7 +1085,14 @@ impl<'a> TestRPC<'a> {
10791085
let unconfirmed_state = self.unconfirmed_state;
10801086

10811087
let mut responses = vec![];
1082-
for request in requests.into_iter() {
1088+
for (ix, request) in requests.into_iter().enumerate() {
1089+
let start = Instant::now();
1090+
while !wait_for(&mut peer_1, &mut peer_2) {
1091+
if start.elapsed() > Duration::from_secs(120) {
1092+
panic!("Timed out waiting for wait_for check to pass");
1093+
}
1094+
thread::sleep(Duration::from_secs(1));
1095+
}
10831096
peer_1.refresh_burnchain_view();
10841097
peer_2.refresh_burnchain_view();
10851098

stackslib/src/net/api/tests/postblock_proposal.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::net::api::*;
4646
use crate::net::connection::ConnectionOptions;
4747
use crate::net::httpcore::{RPCRequestHandler, StacksHttp, StacksHttpRequest};
4848
use crate::net::relay::Relayer;
49-
use crate::net::test::TestEventObserver;
49+
use crate::net::test::{TestEventObserver, TestPeer};
5050
use crate::net::ProtocolFamily;
5151

5252
#[warn(unused)]
@@ -395,7 +395,11 @@ fn test_try_make_response() {
395395
let proposal_observer = Arc::clone(&observer.proposal_observer);
396396

397397
info!("Run requests with observer");
398-
let responses = rpc_test.run_with_observer(requests, Some(&observer));
398+
let wait_for = |peer_1: &mut TestPeer, peer_2: &mut TestPeer| {
399+
!peer_1.network.is_proposal_thread_running() && !peer_2.network.is_proposal_thread_running()
400+
};
401+
402+
let responses = rpc_test.run_with_observer(requests, Some(&observer), wait_for);
399403

400404
for response in responses.iter().take(3) {
401405
assert_eq!(response.preamble().status_code, 202);
@@ -567,8 +571,12 @@ fn replay_validation_test(
567571
let observer = ProposalTestObserver::new();
568572
let proposal_observer = Arc::clone(&observer.proposal_observer);
569573

574+
let wait_for = |peer_1: &mut TestPeer, peer_2: &mut TestPeer| {
575+
!peer_1.network.is_proposal_thread_running() && !peer_2.network.is_proposal_thread_running()
576+
};
577+
570578
info!("Run request with observer for replay mismatch test");
571-
let responses = rpc_test.run_with_observer(requests, Some(&observer));
579+
let responses = rpc_test.run_with_observer(requests, Some(&observer), wait_for);
572580

573581
// Expect 202 Accepted initially
574582
assert_eq!(responses[0].preamble().status_code, 202);

0 commit comments

Comments
 (0)