@@ -73,6 +73,8 @@ pub struct KeeperMetrics {
73
73
pub total_gas_spent : Family < AccountLabel , Gauge < f64 , AtomicU64 > > ,
74
74
pub requests : Family < AccountLabel , Counter > ,
75
75
pub requests_processed : Family < AccountLabel , Counter > ,
76
+ pub requests_processed_success : Family < AccountLabel , Counter > ,
77
+ pub requests_processed_failure : Family < AccountLabel , Counter > ,
76
78
pub requests_reprocessed : Family < AccountLabel , Counter > ,
77
79
pub reveals : Family < AccountLabel , Counter > ,
78
80
pub request_duration_ms : Family < AccountLabel , Histogram > ,
@@ -89,6 +91,8 @@ impl Default for KeeperMetrics {
89
91
total_gas_spent : Family :: default ( ) ,
90
92
requests : Family :: default ( ) ,
91
93
requests_processed : Family :: default ( ) ,
94
+ requests_processed_success : Family :: default ( ) ,
95
+ requests_processed_failure : Family :: default ( ) ,
92
96
requests_reprocessed : Family :: default ( ) ,
93
97
reveals : Family :: default ( ) ,
94
98
request_duration_ms : Family :: new_with_constructor ( || {
@@ -133,6 +137,18 @@ impl KeeperMetrics {
133
137
keeper_metrics. requests_processed . clone ( ) ,
134
138
) ;
135
139
140
+ writable_registry. register (
141
+ "requests_processed_success" ,
142
+ "Number of requests processed successfully" ,
143
+ keeper_metrics. requests_processed_success . clone ( ) ,
144
+ ) ;
145
+
146
+ writable_registry. register (
147
+ "requests_processed_failure" ,
148
+ "Number of requests processed with failure" ,
149
+ keeper_metrics. requests_processed_failure . clone ( ) ,
150
+ ) ;
151
+
136
152
writable_registry. register (
137
153
"reveal" ,
138
154
"Number of reveals" ,
@@ -171,7 +187,7 @@ impl KeeperMetrics {
171
187
172
188
writable_registry. register (
173
189
"request_duration_ms" ,
174
- "Time taken to process each callback request in milliseconds" ,
190
+ "Time taken to process each successful callback request in milliseconds" ,
175
191
keeper_metrics. request_duration_ms . clone ( ) ,
176
192
) ;
177
193
@@ -382,14 +398,12 @@ pub async fn process_event_with_backoff(
382
398
metrics : Arc < KeeperMetrics > ,
383
399
) {
384
400
let start_time = std:: time:: Instant :: now ( ) ;
401
+ let account_label = AccountLabel {
402
+ chain_id : chain_state. id . clone ( ) ,
403
+ address : chain_state. provider_address . to_string ( ) ,
404
+ } ;
385
405
386
- metrics
387
- . requests
388
- . get_or_create ( & AccountLabel {
389
- chain_id : chain_state. id . clone ( ) ,
390
- address : chain_state. provider_address . to_string ( ) ,
391
- } )
392
- . inc ( ) ;
406
+ metrics. requests . get_or_create ( & account_label) . inc ( ) ;
393
407
tracing:: info!( "Started processing event" ) ;
394
408
let backoff = ExponentialBackoff {
395
409
max_elapsed_time : Some ( Duration :: from_secs ( 300 ) ) , // retry for 5 minutes
@@ -398,7 +412,7 @@ pub async fn process_event_with_backoff(
398
412
399
413
let current_multiplier = Arc :: new ( AtomicU64 :: new ( DEFAULT_GAS_ESTIMATE_MULTIPLIER_PCT ) ) ;
400
414
401
- match backoff:: future:: retry_notify (
415
+ let success = backoff:: future:: retry_notify (
402
416
backoff,
403
417
|| async {
404
418
let multiplier = current_multiplier. load ( std:: sync:: atomic:: Ordering :: Relaxed ) ;
@@ -426,32 +440,48 @@ pub async fn process_event_with_backoff(
426
440
) ;
427
441
} ,
428
442
)
429
- . await
430
- {
431
- Ok ( ( ) ) => {
432
- tracing:: info!( "Processed event" , ) ;
433
- }
434
- Err ( e) => {
435
- tracing:: error!( "Failed to process event: {:?}" , e) ;
436
- }
437
- }
443
+ . await ;
438
444
439
- let duration_ms = start_time. elapsed ( ) . as_millis ( ) as f64 ;
440
- metrics
441
- . request_duration_ms
442
- . get_or_create ( & AccountLabel {
443
- chain_id : chain_state. id . clone ( ) ,
444
- address : chain_state. provider_address . to_string ( ) ,
445
- } )
446
- . observe ( duration_ms) ;
445
+ let duration = start_time. elapsed ( ) ;
447
446
448
447
metrics
449
448
. requests_processed
450
- . get_or_create ( & AccountLabel {
451
- chain_id : chain_state. id . clone ( ) ,
452
- address : chain_state. provider_address . to_string ( ) ,
453
- } )
449
+ . get_or_create ( & account_label)
454
450
. inc ( ) ;
451
+
452
+ match success {
453
+ Ok ( ( ) ) => {
454
+ tracing:: info!( "Processed event successfully in {:?}" , duration) ;
455
+
456
+ metrics
457
+ . requests_processed_success
458
+ . get_or_create ( & account_label)
459
+ . inc ( ) ;
460
+
461
+ metrics
462
+ . request_duration_ms
463
+ . get_or_create ( & account_label)
464
+ . observe ( duration. as_millis ( ) as f64 ) ;
465
+ }
466
+ Err ( e) => {
467
+ // In case the callback did not succeed, we double-check that the request is still on-chain.
468
+ // If the request is no longer on-chain, one of the transactions we sent likely succeeded, but
469
+ // the RPC gave us an error anyway.
470
+ let req = chain_state
471
+ . contract
472
+ . get_request ( event. provider_address , event. sequence_number )
473
+ . await ;
474
+ tracing:: error!( "Failed to process event: {:?}. Request: {:?}" , e, req) ;
475
+
476
+ // We only count failures for cases where we are completely certain that the callback failed.
477
+ if req. is_ok_and ( |x| x. is_some ( ) ) {
478
+ metrics
479
+ . requests_processed_failure
480
+ . get_or_create ( & account_label)
481
+ . inc ( ) ;
482
+ }
483
+ }
484
+ }
455
485
}
456
486
457
487
const TX_CONFIRMATION_TIMEOUT_SECS : u64 = 30 ;
0 commit comments