@@ -236,27 +236,33 @@ namespace iroha {
236
236
}),
237
237
sync_event_notifier.get_observable ()
238
238
.tap ([this ](const synchronizer::SynchronizationEvent &event) {
239
- last_received_round_ = event.round ;
239
+ if (not last_received_round_
240
+ or *last_received_round_ < event.round ) {
241
+ last_received_round_ = event.round ;
242
+ } else {
243
+ log_->debug (" Dropping {}, since {} is already processed" ,
244
+ event.round ,
245
+ *last_received_round_);
246
+ }
240
247
})
241
248
.lift <iroha::synchronizer::SynchronizationEvent>(
242
249
iroha::makeDelay<iroha::synchronizer::SynchronizationEvent>(
243
250
delay_func, rxcpp::identity_current_thread ()))
244
- .flat_map ([this ](const auto &event) mutable
245
- -> rxcpp::observable<
246
- ordering::OnDemandOrderingGate::RoundSwitch> {
251
+ .filter ([this ](const auto &event) {
247
252
assert (last_received_round_);
248
253
if (not last_received_round_) {
249
254
log_->error (" Cannot continue without last received round" );
250
- return rxcpp::observable<>::empty<
251
- ordering::OnDemandOrderingGate::RoundSwitch>();
255
+ return false ;
252
256
}
253
257
if (event.round < *last_received_round_) {
254
258
log_->debug (" Dropping {}, since {} is already processed" ,
255
259
event.round ,
256
260
*last_received_round_);
257
- return rxcpp::observable<>::empty<
258
- ordering::OnDemandOrderingGate::RoundSwitch>();
261
+ return false ;
259
262
}
263
+ return true ;
264
+ })
265
+ .map ([this ](const auto &event) {
260
266
consensus::Round current_round;
261
267
switch (event.sync_outcome ) {
262
268
case iroha::synchronizer::SynchronizationOutcomeType::kCommit :
@@ -282,9 +288,8 @@ namespace iroha {
282
288
log_->error (" unknown SynchronizationOutcomeType" );
283
289
assert (false );
284
290
}
285
- return rxcpp::observable<>::just (
286
- ordering::OnDemandOrderingGate::RoundSwitch{
287
- std::move (current_round), event.ledger_state });
291
+ return ordering::OnDemandOrderingGate::RoundSwitch{
292
+ std::move (current_round), event.ledger_state };
288
293
}),
289
294
std::move (cache),
290
295
std::move (proposal_factory),
0 commit comments