Skip to content

Commit 3d487b1

Browse files
authored
Merge pull request #5831 from stacks-network/feat/lossy_events_observer
Feat/lossy events observer
2 parents 2ea875a + e3a322c commit 3d487b1

File tree

7 files changed

+140
-17
lines changed

7 files changed

+140
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE
1111

1212
- The `BlockProposal` StackerDB message serialization struct now includes a `server_version` string, which represents the version of the node that the miner is using. ([#5803](https://github.com/stacks-network/stacks-core/pull/5803))
1313
- Add `vrf_seed` to the `/v3/sortitions` rpc endpoint
14+
- Add `disable_retries` mode for events_observer disabling automatic retry on error
1415

1516
### Changed
1617

stackslib/src/config/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,7 @@ impl Config {
915915
endpoint: observer.endpoint,
916916
events_keys,
917917
timeout_ms: observer.timeout_ms.unwrap_or(1_000),
918+
disable_retries: observer.disable_retries.unwrap_or(false),
918919
});
919920
}
920921
observers
@@ -928,6 +929,7 @@ impl Config {
928929
endpoint: val,
929930
events_keys: vec![EventKeyType::AnyEvent],
930931
timeout_ms: 1_000,
932+
disable_retries: false,
931933
});
932934
};
933935

@@ -2824,13 +2826,15 @@ pub struct EventObserverConfigFile {
28242826
pub endpoint: String,
28252827
pub events_keys: Vec<String>,
28262828
pub timeout_ms: Option<u64>,
2829+
pub disable_retries: Option<bool>,
28272830
}
28282831

28292832
#[derive(Clone, Default, Debug, Hash, PartialEq, Eq, PartialOrd)]
28302833
pub struct EventObserverConfig {
28312834
pub endpoint: String,
28322835
pub events_keys: Vec<EventKeyType>,
28332836
pub timeout_ms: u64,
2837+
pub disable_retries: bool,
28342838
}
28352839

28362840
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd)]

testnet/stacks-node/src/event_dispatcher.rs

Lines changed: 126 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ pub struct EventObserver {
9090
pub endpoint: String,
9191
/// Timeout for sending events to this observer
9292
pub timeout: Duration,
93+
/// If true, the stacks-node will not retry if event delivery fails for any reason.
94+
/// WARNING: This should not be set on observers that require successful delivery of all events.
95+
pub disable_retries: bool,
9396
}
9497

9598
struct ReceiptPayloadInfo<'a> {
@@ -439,7 +442,7 @@ impl EventObserver {
439442

440443
for (id, url, payload, timeout_ms) in pending_payloads {
441444
let timeout = Duration::from_millis(timeout_ms);
442-
Self::send_payload_directly(&payload, &url, timeout);
445+
Self::send_payload_directly(&payload, &url, timeout, false);
443446

444447
#[cfg(test)]
445448
if TEST_EVENT_OBSERVER_SKIP_RETRY.get() {
@@ -456,7 +459,12 @@ impl EventObserver {
456459
}
457460
}
458461

459-
fn send_payload_directly(payload: &serde_json::Value, full_url: &str, timeout: Duration) {
462+
fn send_payload_directly(
463+
payload: &serde_json::Value,
464+
full_url: &str,
465+
timeout: Duration,
466+
disable_retries: bool,
467+
) {
460468
debug!(
461469
"Event dispatcher: Sending payload"; "url" => %full_url, "payload" => ?payload
462470
);
@@ -506,6 +514,11 @@ impl EventObserver {
506514
}
507515
}
508516

517+
if disable_retries {
518+
warn!("Observer is configured in disable_retries mode: skipping retry of payload");
519+
return;
520+
}
521+
509522
#[cfg(test)]
510523
if TEST_EVENT_OBSERVER_SKIP_RETRY.get() {
511524
warn!("Fault injection: skipping retry of payload");
@@ -522,7 +535,12 @@ impl EventObserver {
522535
}
523536
}
524537

525-
fn new(working_dir: Option<PathBuf>, endpoint: String, timeout: Duration) -> Self {
538+
fn new(
539+
working_dir: Option<PathBuf>,
540+
endpoint: String,
541+
timeout: Duration,
542+
disable_retries: bool,
543+
) -> Self {
526544
let db_path = if let Some(mut db_path) = working_dir {
527545
db_path.push("event_observers.sqlite");
528546

@@ -541,6 +559,7 @@ impl EventObserver {
541559
db_path,
542560
endpoint,
543561
timeout,
562+
disable_retries,
544563
}
545564
}
546565

@@ -555,7 +574,10 @@ impl EventObserver {
555574
};
556575
let full_url = format!("http://{url_str}");
557576

558-
if let Some(db_path) = &self.db_path {
577+
// if the observer is in "disable_retries" mode quickly send the payload without checking for the db
578+
if self.disable_retries {
579+
Self::send_payload_directly(payload, &full_url, self.timeout, true);
580+
} else if let Some(db_path) = &self.db_path {
559581
let conn =
560582
Connection::open(db_path).expect("Failed to open database for event observer");
561583

@@ -566,7 +588,7 @@ impl EventObserver {
566588
Self::process_pending_payloads(&conn);
567589
} else {
568590
// No database, just send the payload
569-
Self::send_payload_directly(payload, &full_url, self.timeout);
591+
Self::send_payload_directly(payload, &full_url, self.timeout, false);
570592
}
571593
}
572594

@@ -1666,8 +1688,13 @@ impl EventDispatcher {
16661688
Some(working_dir),
16671689
conf.endpoint.clone(),
16681690
Duration::from_millis(conf.timeout_ms),
1691+
conf.disable_retries,
16691692
);
16701693

1694+
if conf.disable_retries {
1695+
warn!("Observer {} is configured in \"disable_retries\" mode: events are not guaranteed to be delivered", conf.endpoint);
1696+
}
1697+
16711698
let observer_index = self.registered_observers.len() as u16;
16721699

16731700
for event_key_type in conf.events_keys.iter() {
@@ -1770,7 +1797,8 @@ mod test {
17701797

17711798
#[test]
17721799
fn build_block_processed_event() {
1773-
let observer = EventObserver::new(None, "nowhere".to_string(), Duration::from_secs(3));
1800+
let observer =
1801+
EventObserver::new(None, "nowhere".to_string(), Duration::from_secs(3), false);
17741802

17751803
let filtered_events = vec![];
17761804
let block = StacksBlock::genesis_block();
@@ -1830,7 +1858,8 @@ mod test {
18301858

18311859
#[test]
18321860
fn test_block_processed_event_nakamoto() {
1833-
let observer = EventObserver::new(None, "nowhere".to_string(), Duration::from_secs(3));
1861+
let observer =
1862+
EventObserver::new(None, "nowhere".to_string(), Duration::from_secs(3), false);
18341863

18351864
let filtered_events = vec![];
18361865
let mut block_header = NakamotoBlockHeader::empty();
@@ -2087,7 +2116,8 @@ mod test {
20872116
let endpoint = "http://example.com".to_string();
20882117
let timeout = Duration::from_secs(5);
20892118

2090-
let observer = EventObserver::new(Some(working_dir.clone()), endpoint.clone(), timeout);
2119+
let observer =
2120+
EventObserver::new(Some(working_dir.clone()), endpoint.clone(), timeout, false);
20912121

20922122
// Verify fields
20932123
assert_eq!(observer.endpoint, endpoint);
@@ -2104,7 +2134,7 @@ mod test {
21042134
let endpoint = "http://example.com".to_string();
21052135
let timeout = Duration::from_secs(5);
21062136

2107-
let observer = EventObserver::new(None, endpoint.clone(), timeout);
2137+
let observer = EventObserver::new(None, endpoint.clone(), timeout, false);
21082138

21092139
// Verify fields
21102140
assert_eq!(observer.endpoint, endpoint);
@@ -2133,7 +2163,7 @@ mod test {
21332163
let endpoint = server.url().strip_prefix("http://").unwrap().to_string();
21342164
let timeout = Duration::from_secs(5);
21352165

2136-
let observer = EventObserver::new(Some(working_dir), endpoint, timeout);
2166+
let observer = EventObserver::new(Some(working_dir), endpoint, timeout, false);
21372167

21382168
TEST_EVENT_OBSERVER_SKIP_RETRY.set(false);
21392169

@@ -2170,7 +2200,7 @@ mod test {
21702200

21712201
let endpoint = server.url().strip_prefix("http://").unwrap().to_string();
21722202

2173-
let observer = EventObserver::new(None, endpoint, timeout);
2203+
let observer = EventObserver::new(None, endpoint, timeout, false);
21742204

21752205
// Call send_payload
21762206
observer.send_payload(&payload, "/test");
@@ -2201,8 +2231,12 @@ mod test {
22012231
tx.send(()).unwrap();
22022232
});
22032233

2204-
let observer =
2205-
EventObserver::new(None, format!("127.0.0.1:{port}"), Duration::from_secs(3));
2234+
let observer = EventObserver::new(
2235+
None,
2236+
format!("127.0.0.1:{port}"),
2237+
Duration::from_secs(3),
2238+
false,
2239+
);
22062240

22072241
let payload = json!({"key": "value"});
22082242

@@ -2250,8 +2284,12 @@ mod test {
22502284
}
22512285
});
22522286

2253-
let observer =
2254-
EventObserver::new(None, format!("127.0.0.1:{port}"), Duration::from_secs(3));
2287+
let observer = EventObserver::new(
2288+
None,
2289+
format!("127.0.0.1:{port}"),
2290+
Duration::from_secs(3),
2291+
false,
2292+
);
22552293

22562294
let payload = json!({"key": "value"});
22572295

@@ -2298,7 +2336,7 @@ mod test {
22982336
}
22992337
});
23002338

2301-
let observer = EventObserver::new(None, format!("127.0.0.1:{port}"), timeout);
2339+
let observer = EventObserver::new(None, format!("127.0.0.1:{port}"), timeout, false);
23022340

23032341
let payload = json!({"key": "value"});
23042342

@@ -2391,7 +2429,12 @@ mod test {
23912429
}
23922430
});
23932431

2394-
let observer = EventObserver::new(Some(working_dir), format!("127.0.0.1:{port}"), timeout);
2432+
let observer = EventObserver::new(
2433+
Some(working_dir),
2434+
format!("127.0.0.1:{port}"),
2435+
timeout,
2436+
false,
2437+
);
23952438

23962439
let payload = json!({"key": "value"});
23972440
let payload2 = json!({"key": "value2"});
@@ -2417,4 +2460,70 @@ mod test {
24172460
rx.recv_timeout(Duration::from_secs(5))
24182461
.expect("Server did not receive request in time");
24192462
}
2463+
2464+
#[test]
2465+
fn test_event_dispatcher_disable_retries() {
2466+
let timeout = Duration::from_secs(5);
2467+
let payload = json!({"key": "value"});
2468+
2469+
// Create a mock server returning error 500
2470+
let mut server = mockito::Server::new();
2471+
let _m = server.mock("POST", "/test").with_status(500).create();
2472+
2473+
let endpoint = server.url().strip_prefix("http://").unwrap().to_string();
2474+
2475+
let observer = EventObserver::new(None, endpoint, timeout, true);
2476+
2477+
// in non "disable_retries" mode this will run forever
2478+
observer.send_payload(&payload, "/test");
2479+
2480+
// Verify that the payload was sent
2481+
_m.assert();
2482+
}
2483+
2484+
#[test]
2485+
fn test_event_dispatcher_disable_retries_invalid_url() {
2486+
let timeout = Duration::from_secs(5);
2487+
let payload = json!({"key": "value"});
2488+
2489+
let endpoint = String::from("255.255.255.255");
2490+
2491+
let observer = EventObserver::new(None, endpoint, timeout, true);
2492+
2493+
// in non "disable_retries" mode this will run forever
2494+
observer.send_payload(&payload, "/test");
2495+
}
2496+
2497+
#[test]
2498+
#[ignore]
2499+
/// This test generates a new block and ensures the "disable_retries" events_observer will not block.
2500+
fn block_event_with_disable_retries_observer() {
2501+
let dir = tempdir().unwrap();
2502+
let working_dir = dir.path().to_path_buf();
2503+
2504+
let mut event_dispatcher = EventDispatcher::new();
2505+
let config = EventObserverConfig {
2506+
endpoint: String::from("255.255.255.255"),
2507+
events_keys: vec![EventKeyType::MinedBlocks],
2508+
timeout_ms: 1000,
2509+
disable_retries: true,
2510+
};
2511+
event_dispatcher.register_observer(&config, working_dir);
2512+
2513+
let nakamoto_block = NakamotoBlock {
2514+
header: NakamotoBlockHeader::empty(),
2515+
txs: vec![],
2516+
};
2517+
2518+
// this will block forever in non "disable_retries" mode
2519+
event_dispatcher.process_mined_nakamoto_block_event(
2520+
0,
2521+
&nakamoto_block,
2522+
0,
2523+
&ExecutionCost::max_value(),
2524+
vec![],
2525+
);
2526+
2527+
assert_eq!(event_dispatcher.registered_observers.len(), 1);
2528+
}
24202529
}

testnet/stacks-node/src/tests/epoch_22.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ fn pox_2_unlock_all() {
644644
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
645645
events_keys: vec![EventKeyType::AnyEvent],
646646
timeout_ms: 1000,
647+
disable_retries: false,
647648
});
648649
conf.initial_balances.append(&mut initial_balances);
649650

testnet/stacks-node/src/tests/neon_integrations.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ pub mod test_observer {
657657
endpoint: format!("localhost:{EVENT_OBSERVER_PORT}"),
658658
events_keys: event_keys.to_vec(),
659659
timeout_ms: 1000,
660+
disable_retries: false,
660661
});
661662
}
662663

@@ -7490,6 +7491,7 @@ fn atlas_integration_test() {
74907491
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
74917492
events_keys: vec![EventKeyType::AnyEvent],
74927493
timeout_ms: 1000,
7494+
disable_retries: false,
74937495
});
74947496

74957497
conf_follower_node.node.always_use_affirmation_maps = false;
@@ -8023,6 +8025,7 @@ fn antientropy_integration_test() {
80238025
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
80248026
events_keys: vec![EventKeyType::AnyEvent],
80258027
timeout_ms: 1000,
8028+
disable_retries: false,
80268029
});
80278030

80288031
conf_follower_node.node.mine_microblocks = true;

testnet/stacks-node/src/tests/signer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,7 @@ fn setup_stx_btc_node<G: FnMut(&mut NeonConfig)>(
736736
EventKeyType::BurnchainBlocks,
737737
],
738738
timeout_ms: 1000,
739+
disable_retries: false,
739740
});
740741
}
741742

@@ -751,6 +752,7 @@ fn setup_stx_btc_node<G: FnMut(&mut NeonConfig)>(
751752
EventKeyType::BurnchainBlocks,
752753
],
753754
timeout_ms: 1000,
755+
disable_retries: false,
754756
});
755757

756758
// The signers need some initial balances in order to pay for epoch 2.5 transaction votes

testnet/stacks-node/src/tests/signer/v0.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,6 +1518,7 @@ fn regr_use_block_header_pk() {
15181518
endpoint,
15191519
db_path: None,
15201520
timeout: Duration::from_secs(120),
1521+
disable_retries: false,
15211522
})
15221523
.collect();
15231524

@@ -4589,6 +4590,7 @@ fn signer_set_rollover() {
45894590
EventKeyType::BurnchainBlocks,
45904591
],
45914592
timeout_ms: 1000,
4593+
disable_retries: false,
45924594
});
45934595
}
45944596
naka_conf.node.rpc_bind = rpc_bind.clone();
@@ -9115,6 +9117,7 @@ fn injected_signatures_are_ignored_across_boundaries() {
91159117
EventKeyType::BurnchainBlocks,
91169118
],
91179119
timeout_ms: 1000,
9120+
disable_retries: false,
91189121
});
91199122
naka_conf.node.rpc_bind = rpc_bind.clone();
91209123
},

0 commit comments

Comments
 (0)