1
1
#[ cfg( test) ]
2
2
use mock_instant:: {
3
- Instant ,
4
3
SystemTime ,
5
4
UNIX_EPOCH ,
6
5
} ;
7
6
#[ cfg( not( test) ) ]
8
7
use std:: time:: {
9
- Instant ,
10
8
SystemTime ,
11
9
UNIX_EPOCH ,
12
10
} ;
56
54
} ,
57
55
} ,
58
56
} ,
57
+ serde:: Serialize ,
59
58
std:: {
60
59
collections:: HashSet ,
61
60
time:: Duration ,
@@ -116,10 +115,13 @@ pub struct AggregateStateData {
116
115
pub latest_completed_slot : Option < Slot > ,
117
116
118
117
/// Time of the latest completed update. This is used for the health probes.
119
- pub latest_completed_update_at : Option < Instant > ,
118
+ pub latest_completed_update_time : Option < SystemTime > ,
120
119
121
120
/// The latest observed slot among different Aggregate updates. This is used for the health
122
- /// probes.
121
+ /// probes. The slot is not necessarily the maximum observed slot but it should be close
122
+ /// to the maximum. The maximum observed slot is not used because sometimes due to some
123
+ /// network issues we might receive an update with a much higher slot specially during
124
+ /// the forks.
123
125
pub latest_observed_slot : Option < Slot > ,
124
126
125
127
/// The duration of no aggregation after which the readiness of the state is considered stale.
@@ -140,7 +142,7 @@ impl AggregateStateData {
140
142
) -> Self {
141
143
Self {
142
144
latest_completed_slot : None ,
143
- latest_completed_update_at : None ,
145
+ latest_completed_update_time : None ,
144
146
latest_observed_slot : None ,
145
147
metrics : metrics:: Metrics :: new ( metrics_registry) ,
146
148
readiness_staleness_threshold,
@@ -213,6 +215,17 @@ pub struct PriceFeedsWithUpdateData {
213
215
pub update_data : Vec < Vec < u8 > > ,
214
216
}
215
217
218
+ #[ derive( Debug , Serialize ) ]
219
+ pub struct ReadinessMetadata {
220
+ pub has_completed_recently : bool ,
221
+ pub is_not_behind : bool ,
222
+ pub is_metadata_loaded : bool ,
223
+ pub latest_completed_slot : Option < Slot > ,
224
+ pub latest_observed_slot : Option < Slot > ,
225
+ pub latest_completed_unix_timestamp : Option < UnixTimestamp > ,
226
+ pub price_feeds_metadata_len : usize ,
227
+ }
228
+
216
229
#[ async_trait:: async_trait]
217
230
pub trait Aggregates
218
231
where
@@ -221,7 +234,7 @@ where
221
234
Self : PriceFeedMeta ,
222
235
{
223
236
fn subscribe ( & self ) -> Receiver < AggregationEvent > ;
224
- async fn is_ready ( & self ) -> bool ;
237
+ async fn is_ready ( & self ) -> ( bool , ReadinessMetadata ) ;
225
238
async fn store_update ( & self , update : Update ) -> Result < ( ) > ;
226
239
async fn get_price_feed_ids ( & self ) -> HashSet < PriceIdentifier > ;
227
240
async fn get_price_feeds_with_update_data (
@@ -304,10 +317,7 @@ where
304
317
// Update the aggregate state with the latest observed slot
305
318
{
306
319
let mut aggregate_state = self . into ( ) . data . write ( ) . await ;
307
- aggregate_state. latest_observed_slot = aggregate_state
308
- . latest_observed_slot
309
- . map ( |latest| latest. max ( slot) )
310
- . or ( Some ( slot) ) ;
320
+ aggregate_state. latest_observed_slot = Some ( slot) ;
311
321
}
312
322
313
323
let accumulator_messages = self . fetch_accumulator_messages ( slot) . await ?;
@@ -366,8 +376,8 @@ where
366
376
. or ( Some ( slot) ) ;
367
377
368
378
aggregate_state
369
- . latest_completed_update_at
370
- . replace ( Instant :: now ( ) ) ;
379
+ . latest_completed_update_time
380
+ . replace ( SystemTime :: now ( ) ) ;
371
381
372
382
aggregate_state
373
383
. metrics
@@ -401,15 +411,20 @@ where
401
411
. collect ( )
402
412
}
403
413
404
- async fn is_ready ( & self ) -> bool {
414
+ async fn is_ready ( & self ) -> ( bool , ReadinessMetadata ) {
405
415
let state_data = self . into ( ) . data . read ( ) . await ;
406
416
let price_feeds_metadata = PriceFeedMeta :: retrieve_price_feeds_metadata ( self )
407
417
. await
408
418
. unwrap ( ) ;
409
419
410
- let has_completed_recently = match state_data. latest_completed_update_at . as_ref ( ) {
420
+ let current_time = SystemTime :: now ( ) ;
421
+
422
+ let has_completed_recently = match state_data. latest_completed_update_time {
411
423
Some ( latest_completed_update_time) => {
412
- latest_completed_update_time. elapsed ( ) < state_data. readiness_staleness_threshold
424
+ current_time
425
+ . duration_since ( latest_completed_update_time)
426
+ . unwrap_or ( Duration :: from_secs ( 0 ) )
427
+ < state_data. readiness_staleness_threshold
413
428
}
414
429
None => false ,
415
430
} ;
@@ -419,14 +434,31 @@ where
419
434
state_data. latest_observed_slot ,
420
435
) {
421
436
( Some ( latest_completed_slot) , Some ( latest_observed_slot) ) => {
422
- latest_observed_slot - latest_completed_slot
437
+ latest_observed_slot. saturating_sub ( latest_completed_slot)
423
438
<= state_data. readiness_max_allowed_slot_lag
424
439
}
425
440
_ => false ,
426
441
} ;
427
442
428
443
let is_metadata_loaded = !price_feeds_metadata. is_empty ( ) ;
429
- has_completed_recently && is_not_behind && is_metadata_loaded
444
+ (
445
+ has_completed_recently && is_not_behind && is_metadata_loaded,
446
+ ReadinessMetadata {
447
+ has_completed_recently,
448
+ is_not_behind,
449
+ is_metadata_loaded,
450
+ latest_completed_slot : state_data. latest_completed_slot ,
451
+ latest_observed_slot : state_data. latest_observed_slot ,
452
+ latest_completed_unix_timestamp : state_data. latest_completed_update_time . and_then (
453
+ |t| {
454
+ t. duration_since ( UNIX_EPOCH )
455
+ . map ( |d| d. as_secs ( ) as i64 )
456
+ . ok ( )
457
+ } ,
458
+ ) ,
459
+ price_feeds_metadata_len : price_feeds_metadata. len ( ) ,
460
+ } ,
461
+ )
430
462
}
431
463
}
432
464
@@ -896,14 +928,14 @@ mod test {
896
928
. unwrap ( ) ;
897
929
898
930
// Check the state is ready
899
- assert ! ( state. is_ready( ) . await ) ;
931
+ assert ! ( state. is_ready( ) . await . 0 ) ;
900
932
901
933
// Advance the clock to make the prices stale
902
934
let staleness_threshold = Duration :: from_secs ( 30 ) ;
903
935
MockClock :: advance_system_time ( staleness_threshold) ;
904
936
MockClock :: advance ( staleness_threshold) ;
905
937
// Check the state is not ready
906
- assert ! ( !state. is_ready( ) . await ) ;
938
+ assert ! ( !state. is_ready( ) . await . 0 ) ;
907
939
}
908
940
909
941
/// Test that the state retains the latest slots upon cache eviction.
0 commit comments