Skip to content

Commit 33da5be

Browse files
wijagels146karol
andauthored
Batch price updates across all websocket users (#175)
* Batch price updates across all websocket users * Fully move batching out of user, remove error notification * Restore old flushing behavior, refactor to another function * Apply naming and documentation suggestions Co-authored-by: Karol Skrzypczynski <146karol@gmail.com>
1 parent bad83f1 commit 33da5be

File tree

4 files changed

+58
-41
lines changed

4 files changed

+58
-41
lines changed

pc/manager.cpp

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "manager.hpp"
22
#include "log.hpp"
33

4+
#include <algorithm>
5+
46
using namespace pc;
57

68
#define PC_TPU_PROXY_PORT 8898
@@ -10,6 +12,8 @@ using namespace pc;
1012
#define PC_PUB_INTERVAL (227L*PC_NSECS_IN_MSEC)
1113
#define PC_RPC_HOST "localhost"
1214
#define PC_MAX_BATCH 8
15+
// Flush partial batches if not completed within 400 ms.
16+
#define PC_FLUSH_INTERVAL (400L*PC_NSECS_IN_MSEC)
1317

1418
///////////////////////////////////////////////////////////////////////////
1519
// manager_sub
@@ -437,6 +441,34 @@ void manager::reset_status( int status )
437441
status_ &= ~status;
438442
}
439443

444+
void manager::send_pending_ups()
445+
{
446+
uint32_t n_to_send = 0;
447+
448+
// the batch will be sent if its size is greater than max batch size
449+
// or time since the previously sent batch is greater than PC_FLUSH_INTERVAL
450+
// the buffer is being updated by user class un user::parse_upd_price
451+
int64_t curr_ts = get_now();
452+
if (curr_ts - last_upd_ts_> PC_FLUSH_INTERVAL) {
453+
n_to_send = pending_upds_.size();
454+
} else if (pending_upds_.size() >= get_max_batch_size()) {
455+
n_to_send = get_max_batch_size();
456+
}
457+
458+
if (n_to_send == 0) {
459+
return;
460+
}
461+
462+
// send batch of price updates to solana
463+
price::send( pending_upds_.data(), n_to_send);
464+
465+
// remove the sent elements from the vector
466+
pending_upds_.erase(pending_upds_.begin(), pending_upds_.begin() + n_to_send);
467+
468+
// record the current time
469+
last_upd_ts_= curr_ts;
470+
}
471+
440472
void manager::poll( bool do_wait )
441473
{
442474
// poll for any socket events
@@ -506,10 +538,7 @@ void manager::poll( bool do_wait )
506538
// request product quotes from pythd's clients while connected
507539
poll_schedule();
508540

509-
// Flush any pending complete batches of price updates by submitting solana TXs.
510-
for ( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) {
511-
uptr->send_pending_upds();
512-
}
541+
send_pending_ups();
513542
} else {
514543
reconnect_rpc();
515544
}
@@ -927,6 +956,13 @@ price *manager::get_price( const pub_key& acc )
927956
return it ? dynamic_cast<price*>( amap_.obj( it ) ) : nullptr;
928957
}
929958

959+
void manager::add_dirty_price(price* sptr)
960+
{
961+
if( std::find(pending_upds_.begin(), pending_upds_.end(), sptr) == pending_upds_.end() ) {
962+
pending_upds_.emplace_back( sptr );
963+
}
964+
}
965+
930966
unsigned manager::get_num_product() const
931967
{
932968
return svec_.size();

pc/manager.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ namespace pc
135135
product *get_product( const pub_key& );
136136
price *get_price( const pub_key& );
137137

138+
// adds dirty price to pending updates buffer
139+
void add_dirty_price(price* sptr);
140+
138141
// submit pyth client api request
139142
void submit( request * );
140143
void submit( net_wtr& );
@@ -221,6 +224,12 @@ namespace pc
221224
void poll_schedule();
222225
void reset_status( int );
223226

227+
// send a batch of pending price updates. This function eagerly sends any complete batches.
228+
// It also sends partial batches that have not been completed within a short interval of time.
229+
// At most one complete batch will be sent. Additional price updates remain queued until the next
230+
// time this function is invoked.
231+
void send_pending_ups();
232+
224233
net_loop nl_; // epoll loop
225234
tcp_connect hconn_; // rpc http connection
226235
ws_connect *wconn_; // rpc websocket sonnection
@@ -264,6 +273,12 @@ namespace pc
264273
rpc::get_recent_block_hash breq_[1]; // block hash request
265274
rpc::program_subscribe preq_[1]; // program account subscription
266275
rpc::get_program_accounts areq_[1]; // alternative to program_subscribe
276+
277+
// price updates that have not been sent yet
278+
std::vector<price*> pending_upds_;
279+
280+
// Timestamp of the last batch
281+
int64_t last_upd_ts_= 0;
267282
};
268283

269284
inline bool manager::get_is_tx_connect() const

pc/user.cpp

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
#define PC_JSON_MISSING_PERMS -32001
1414
#define PC_JSON_NOT_READY -32002
1515
#define PC_BATCH_SEND_FAILED -32010
16-
// Flush partial batches if not completed within 400 ms.
17-
#define PC_FLUSH_INTERVAL (400L*PC_NSECS_IN_MSEC)
1816

1917
using namespace pc;
2018

@@ -36,7 +34,6 @@ user::user()
3634
hsvr_.set_net_connect( this );
3735
hsvr_.set_ws_parser( this );
3836
set_net_parser( &hsvr_ );
39-
last_upd_ts_ = get_now();
4037
}
4138

4239
void user::set_rpc_client( rpc_client *rptr )
@@ -211,9 +208,9 @@ void user::parse_upd_price( uint32_t tok, uint32_t itok )
211208

212209
// Add the updated price to the pending updates
213210
sptr->update_no_send( price, conf, stype, false );
214-
if( std::find(pending_vec_.begin(), pending_vec_.end(), sptr) == pending_vec_.end() ) {
215-
pending_vec_.emplace_back( sptr );
216-
}
211+
212+
// pass the updated price to manager
213+
sptr_->add_dirty_price(sptr);
217214

218215
// Send the result back
219216
add_header();
@@ -333,28 +330,6 @@ void user::parse_get_product( uint32_t tok, uint32_t itok )
333330
add_tail( itok );
334331
}
335332

336-
void user::send_pending_upds()
337-
{
338-
uint32_t n_to_send = 0;
339-
int64_t curr_ts = get_now();
340-
if (curr_ts - last_upd_ts_ > PC_FLUSH_INTERVAL) {
341-
n_to_send = pending_vec_.size();
342-
} else if (pending_vec_.size() >= sptr_->get_max_batch_size()) {
343-
n_to_send = sptr_->get_max_batch_size();
344-
}
345-
346-
if (n_to_send == 0) {
347-
return;
348-
}
349-
350-
if ( !price::send( pending_vec_.data(), n_to_send) ) {
351-
add_error( 0, PC_BATCH_SEND_FAILED, "batch send failed - please check the pyth logs" );
352-
}
353-
354-
pending_vec_.erase(pending_vec_.begin(), pending_vec_.begin() + n_to_send);
355-
last_upd_ts_ = curr_ts;
356-
}
357-
358333
void user::parse_get_all_products( uint32_t itok )
359334
{
360335
add_header();

pc/user.hpp

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,6 @@ namespace pc
4343
// symbol price schedule callback
4444
void on_response( price_sched *, uint64_t ) override;
4545

46-
// send a batch of pending price updates. This function eagerly sends any complete batches.
47-
// It also sends partial batches that have not been completed within a short interval of time.
48-
// At most one complete batch will be sent. Additional price updates remain queued until the next
49-
// time this function is invoked.
50-
void send_pending_upds();
51-
5246
private:
5347

5448
// http-only request parsing
@@ -63,7 +57,6 @@ namespace pc
6357
};
6458

6559
typedef std::vector<deferred_sub> def_vec_t;
66-
typedef std::vector<price*> pending_vec_t;
6760

6861
void parse_request( uint32_t );
6962
void parse_get_product_list( uint32_t );
@@ -87,8 +80,6 @@ namespace pc
8780
json_wtr jw_; // json writer
8881
def_vec_t dvec_; // deferred subscriptions
8982
request_sub_set psub_; // price subscriptions
90-
pending_vec_t pending_vec_; // prices with pending updates
91-
int64_t last_upd_ts_; // timestamp of last price update transaction
9283
};
9384

9485
}

0 commit comments

Comments
 (0)