Skip to content

Commit 7f6d7f3

Browse files
jayantkJayant Krishnamurthy
andauthored
Flush batches early (#167)
* add code to flush batches early * fix build * fix build * fix build * comments * fix test? * better approach * fix * fix build * fix * pr comments Co-authored-by: Jayant Krishnamurthy <jkrishnamurthy@jumptrading.com>
1 parent ac12a50 commit 7f6d7f3

File tree

3 files changed

+26
-10
lines changed

3 files changed

+26
-10
lines changed

pc/manager.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -500,11 +500,16 @@ void manager::poll( bool do_wait )
500500
tconn_.reconnect();
501501
}
502502

503-
// submit new quotes while connected
504503
if ( has_status( PC_PYTH_RPC_CONNECTED ) &&
505504
!hconn_.get_is_err() &&
506505
( !wconn_ || !wconn_->get_is_err() ) ) {
506+
// request product quotes from pythd's clients while connected
507507
poll_schedule();
508+
509+
// Flush any pending complete batches of price updates by submitting solana TXs.
510+
for ( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) {
511+
uptr->send_pending_upds();
512+
}
508513
} else {
509514
reconnect_rpc();
510515
}
@@ -758,11 +763,6 @@ void manager::on_response( rpc::get_slot *res )
758763
if (
759764
has_status( PC_PYTH_RPC_CONNECTED )
760765
) {
761-
// New slot received, so flush all pending updates for all active users
762-
for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) {
763-
uptr->send_pending_upds();
764-
}
765-
766766
if ( sub_ ) {
767767
sub_->on_slot_publish( this );
768768
}

pc/user.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#define PC_JSON_MISSING_PERMS -32001
1414
#define PC_JSON_NOT_READY -32002
1515
#define PC_BATCH_SEND_FAILED -32010
16+
// Flush partial batches if not completed within 400 ms.
17+
#define PC_FLUSH_INTERVAL (400L*PC_NSECS_IN_MSEC)
1618

1719
using namespace pc;
1820

@@ -34,6 +36,7 @@ user::user()
3436
hsvr_.set_net_connect( this );
3537
hsvr_.set_ws_parser( this );
3638
set_net_parser( &hsvr_ );
39+
last_upd_ts_ = get_now();
3740
}
3841

3942
void user::set_rpc_client( rpc_client *rptr )
@@ -332,15 +335,24 @@ void user::parse_get_product( uint32_t tok, uint32_t itok )
332335

333336
void user::send_pending_upds()
334337
{
335-
if ( pending_vec_.empty() ) {
338+
uint32_t n_to_send = 0;
339+
int64_t curr_ts = get_now();
340+
if (curr_ts - last_upd_ts_ > PC_FLUSH_INTERVAL) {
341+
n_to_send = pending_vec_.size();
342+
} else if (pending_vec_.size() >= sptr_->get_max_batch_size()) {
343+
n_to_send = sptr_->get_max_batch_size();
344+
}
345+
346+
if (n_to_send == 0) {
336347
return;
337348
}
338349

339-
if ( !price::send( pending_vec_.data(), pending_vec_.size()) ) {
350+
if ( !price::send( pending_vec_.data(), n_to_send) ) {
340351
add_error( 0, PC_BATCH_SEND_FAILED, "batch send failed - please check the pyth logs" );
341352
}
342353

343-
pending_vec_.clear();
354+
pending_vec_.erase(pending_vec_.begin(), pending_vec_.begin() + n_to_send);
355+
last_upd_ts_ = curr_ts;
344356
}
345357

346358
void user::parse_get_all_products( uint32_t itok )

pc/user.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ namespace pc
4343
// symbol price schedule callback
4444
void on_response( price_sched *, uint64_t ) override;
4545

46-
// send all pending updates
46+
// send a batch of pending price updates. This function eagerly sends any complete batches.
47+
// It also sends partial batches that have not been completed within a short interval of time.
48+
// At most one complete batch will be sent. Additional price updates remain queued until the next
49+
// time this function is invoked.
4750
void send_pending_upds();
4851

4952
private:
@@ -85,6 +88,7 @@ namespace pc
8588
def_vec_t dvec_; // deferred subscriptions
8689
request_sub_set psub_; // price subscriptions
8790
pending_vec_t pending_vec_; // prices with pending updates
91+
int64_t last_upd_ts_; // timestamp of last price update transaction
8892
};
8993

9094
}

0 commit comments

Comments
 (0)