Skip to content

Commit 48e664c

Browse files
jayantkJayant KrishnamurthyJayant Krishnamurthy
authored
Make batch size configurable (#159)
* env var * add * grr * store size in manager * fix bug * cleanup * pr comments * bug fix Co-authored-by: Jayant Krishnamurthy <jkrishnamurthy@jumptrading.com> Co-authored-by: Jayant Krishnamurthy <jayant@jumpcrypto.com>
1 parent dd35133 commit 48e664c

File tree

5 files changed

+27
-5
lines changed

5 files changed

+27
-5
lines changed

pc/manager.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ using namespace pc;
99
#define PC_BLOCKHASH_TIMEOUT 3
1010
#define PC_PUB_INTERVAL (227L*PC_NSECS_IN_MSEC)
1111
#define PC_RPC_HOST "localhost"
12+
#define PC_MAX_BATCH 8
1213

1314
///////////////////////////////////////////////////////////////////////////
1415
// manager_sub
@@ -70,6 +71,7 @@ manager::manager()
7071
do_tx_( true ),
7172
is_pub_( false ),
7273
cmt_( commitment::e_confirmed ),
74+
max_batch_( PC_MAX_BATCH ),
7375
sreq_{ { commitment::e_processed } }
7476
{
7577
tconn_.set_sub( this );
@@ -178,6 +180,16 @@ int64_t manager::get_publish_interval() const
178180
return pub_int_ / PC_NSECS_IN_MSEC;
179181
}
180182

183+
void manager::set_max_batch_size( unsigned batch_size )
184+
{
185+
max_batch_ = batch_size;
186+
}
187+
188+
unsigned manager::get_max_batch_size() const
189+
{
190+
return max_batch_;
191+
}
192+
181193
void manager::set_do_capture( bool do_cap )
182194
{
183195
do_cap_ = do_cap;
@@ -746,7 +758,6 @@ void manager::on_response( rpc::get_slot *res )
746758
if (
747759
has_status( PC_PYTH_RPC_CONNECTED )
748760
) {
749-
750761
// New slot received, so flush all pending updates for all active users
751762
for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) {
752763
uptr->send_pending_upds();

pc/manager.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ namespace pc
100100
void set_publish_interval( int64_t mill_secs );
101101
int64_t get_publish_interval() const;
102102

103+
// override the default maximum number of price updates to send in a batch
104+
void set_max_batch_size( unsigned batch_size );
105+
unsigned get_max_batch_size() const;
106+
103107
// event subscription callback
104108
void set_manager_sub( manager_sub * );
105109
manager_sub *get_manager_sub() const;
@@ -253,6 +257,7 @@ namespace pc
253257
capture cap_; // aggregate price capture
254258
tx_parser txp_; // handle unexpected errors
255259
commitment cmt_; // account get/subscribe commitment
260+
unsigned max_batch_;// maximum number of price updates that can be sent in a single batch
256261

257262
// requests
258263
rpc::get_slot sreq_[1]; // slot subscription

pc/request.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ bool price::send( price *prices[], const unsigned n )
773773
upds_.emplace_back( p->preq_ );
774774

775775
if (
776-
upds_.size() >= rpc::upd_price::MAX_UPDATES
776+
upds_.size() >= mgr->get_max_batch_size()
777777
|| ( upds_.size() && ( i + 1 ) == n )
778778
) {
779779
if ( mgr->get_do_tx() ) {

pc/rpc_client.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,6 @@ namespace pc
403403
class upd_price : public tx_request, public rpc_request
404404
{
405405
public:
406-
static constexpr unsigned MAX_UPDATES = 8;
407-
408406
// parameters
409407
void set_symbol_status( symbol_status );
410408
void set_publish( key_pair * );

pcapps/pythd.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ int main(int argc, char **argv)
101101
std::string tx_host = get_tx_host();
102102
int pyth_port = get_port();
103103
int opt = 0;
104+
unsigned max_batch_size = 0;
104105
bool do_wait = true, do_tx = true, do_ws = true, do_debug = false;
105-
while( (opt = ::getopt(argc,argv, "r:t:p:k:w:c:l:m:dnxhz" )) != -1 ) {
106+
while( (opt = ::getopt(argc,argv, "r:t:p:k:w:c:l:m:b:dnxhz" )) != -1 ) {
106107
switch(opt) {
107108
case 'r': rpc_host = optarg; break;
108109
case 't': tx_host = optarg; break;
@@ -112,6 +113,7 @@ int main(int argc, char **argv)
112113
case 'w': cnt_dir = optarg; break;
113114
case 'l': log_file = optarg; break;
114115
case 'm': cmt = str_to_commitment(optarg); break;
116+
case 'b': max_batch_size = strtoul(optarg, NULL, 0); break;
115117
case 'n': do_wait = false; break;
116118
case 'x': do_tx = false; break;
117119
case 'z': do_ws = false; break;
@@ -154,6 +156,12 @@ int main(int argc, char **argv)
154156
<< mgr.get_mapping_pub_key_file() << "]" << std::endl;
155157
return 1;
156158
}
159+
160+
if (max_batch_size > 0) {
161+
mgr.set_max_batch_size(max_batch_size);
162+
}
163+
std::cout << "pythd: max batch size " << mgr.get_max_batch_size() << std::endl;
164+
157165
// set up signal handing
158166
signal( SIGINT, sig_handle );
159167
signal( SIGHUP, sig_handle );

0 commit comments

Comments
 (0)