Skip to content

Commit 4f442c0

Browse files
committed
support batch price updates
1 parent c0c15e5 commit 4f442c0

File tree

4 files changed

+179
-24
lines changed

4 files changed

+179
-24
lines changed

pc/request.cpp

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,9 +655,10 @@ bool price::update(
655655
if ( PC_UNLIKELY( !get_is_ready_publish() ) ) {
656656
return false;
657657
}
658+
preq_->set_price( price, conf, st, is_agg );
658659
manager *mgr = get_manager();
659660
const uint64_t slot = mgr->get_slot();
660-
preq_->set_price( price, conf, st, slot, is_agg );
661+
preq_->set_slot( slot );
661662
preq_->set_block_hash( mgr->get_recent_block_hash() );
662663
if ( mgr->get_do_tx() )
663664
mgr->submit( preq_ );
@@ -688,6 +689,101 @@ bool price::update(
688689
return true;
689690
}
690691

692+
void price::update_no_send(
693+
const int64_t price, const uint64_t conf
694+
, const symbol_status st, const bool is_agg
695+
)
696+
{
697+
preq_->set_price( price, conf, st, is_agg );
698+
}
699+
700+
bool price::send( price *prices[], const unsigned n )
701+
{
702+
static std::vector< rpc::upd_price * > upds_;
703+
704+
upds_.clear();
705+
706+
manager *mgr1 = nullptr;
707+
708+
for ( unsigned i = 0, j = 0; i < n; ++i ) {
709+
price *const p = prices[ i ];
710+
if ( PC_UNLIKELY( ! p->init_ && ! p->init_publish() ) ) {
711+
continue;
712+
}
713+
if ( PC_UNLIKELY( ! p->has_publisher() ) ) {
714+
continue;
715+
}
716+
if ( PC_UNLIKELY( ! p->get_is_ready_publish() ) ) {
717+
continue;
718+
}
719+
manager *const mgr = p->get_manager();
720+
if ( ! mgr1 ) {
721+
mgr1 = mgr;
722+
}
723+
else if ( mgr != mgr1 ) {
724+
PC_LOG_ERR( "unexpected manager" ).end();
725+
continue;
726+
}
727+
const uint64_t slot = mgr->get_slot();
728+
p->preq_->set_slot( slot );
729+
p->preq_->set_block_hash( mgr->get_recent_block_hash() );
730+
upds_.emplace_back( p->preq_ );
731+
732+
if (
733+
upds_.size() >= rpc::upd_price::MAX_UPDATES
734+
|| ( upds_.size() && ( i + 1 ) == n )
735+
) {
736+
if ( mgr->get_do_tx() ) {
737+
net_wtr msg;
738+
if ( rpc::upd_price::build( msg, &upds_[ 0 ], upds_.size() ) ) {
739+
mgr->submit( msg );
740+
}
741+
else {
742+
PC_LOG_ERR( "failed to build msg" );
743+
}
744+
}
745+
else {
746+
p->get_rpc_client()->send( &upds_[ 0 ], upds_.size() );
747+
for ( unsigned k = j; k <= i; ++k ) {
748+
price *const p1 = prices[ k ];
749+
p1->tvec_.emplace_back(
750+
std::string( 100, '\0' ), p1->preq_->get_sent_time()
751+
);
752+
p1->preq_->get_signature()->enc_base58( p1->tvec_.back().first );
753+
PC_LOG_DBG( "sent price update transaction" )
754+
.add( "price_account", *p1->get_account() )
755+
.add( "product_account", *p1->prod_->get_account() )
756+
.add( "symbol", p1->get_symbol() )
757+
.add( "price_type", price_type_to_str( p1->get_price_type() ) )
758+
.add( "sig", p1->tvec_.back().first )
759+
.add( "pub_slot", slot )
760+
.end();
761+
if ( PC_UNLIKELY( p1->tvec_.size() >= 100 ) ) {
762+
PC_LOG_WRN( "too many unacked price update transactions" )
763+
.add( "price_account", *p1->get_account() )
764+
.add( "product_account", *p1->prod_->get_account() )
765+
.add( "symbol", p1->get_symbol() )
766+
.add( "price_type", price_type_to_str( p1->get_price_type() ) )
767+
.add( "num_txid", p1->tvec_.size() )
768+
.end();
769+
p1->tvec_.erase( p1->tvec_.begin(), p1->tvec_.begin() + 50 );
770+
}
771+
}
772+
}
773+
774+
for ( unsigned k = j; k <= i; ++k ) {
775+
price *const p1 = prices[ k ];
776+
p1->inc_sent();
777+
}
778+
779+
j = i;
780+
upds_.clear();
781+
}
782+
}
783+
784+
return true;
785+
}
786+
691787
void price::submit()
692788
{
693789
if ( st_ == e_subscribe ) {

pc/request.hpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,11 @@ namespace pc
261261
// or because publisher does not have permission (has_publisher())
262262
bool update( int64_t price, uint64_t conf, symbol_status );
263263

264+
void update_no_send(
265+
int64_t price, uint64_t conf, symbol_status, bool aggr
266+
);
267+
static bool send( price * [], unsigned );
268+
264269
// update aggregate price only
265270
bool update();
266271

@@ -307,14 +312,6 @@ namespace pc
307312
void dump_json( json_wtr& wtr ) const;
308313

309314
public:
310-
311-
void set_price_type( price_type );
312-
void set_version( uint32_t );
313-
void set_price( int64_t );
314-
void set_conf( int64_t );
315-
void set_symbol_status( symbol_status );
316-
void set_product( product * );
317-
318315
void reset();
319316
void unsubscribe();
320317
void submit() override;

pc/rpc_client.cpp

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,11 @@ void rpc_client::send( rpc_request *rptr )
151151
reuse_.pop_back();
152152
} else {
153153
id = ++id_;
154-
rv_.resize( 1 + id, nullptr );
155154
}
156155
rptr->set_id( id );
157156
rptr->set_rpc_client( this );
158157
rptr->set_sent_time( get_now() );
159-
rv_[id] = rptr;
158+
rv_.emplace( id, rptr );
160159

161160
// construct json message
162161
json_wtr jw;
@@ -185,6 +184,56 @@ void rpc_client::send( rpc_request *rptr )
185184
}
186185
}
187186

187+
void rpc_client::send( rpc::upd_price *upds[], const unsigned n )
188+
{
189+
if ( ! n ) {
190+
return;
191+
}
192+
193+
// get request id
194+
uint64_t id;
195+
if ( !reuse_.empty() ) {
196+
id = reuse_.back();
197+
reuse_.pop_back();
198+
} else {
199+
id = ++id_;
200+
}
201+
const auto now = get_now();
202+
for ( unsigned i = 0; i < n; ++i ) {
203+
rpc_request *const rptr = upds[ i ];
204+
rptr->set_id( id );
205+
rptr->set_rpc_client( this );
206+
rptr->set_sent_time( now );
207+
rv_.emplace( id, rptr );
208+
}
209+
210+
// construct json message
211+
json_wtr jw;
212+
jw.add_val( json_wtr::e_obj );
213+
jw.add_key( "jsonrpc", "2.0" );
214+
jw.add_key( "id", id );
215+
rpc::upd_price::request( jw, upds, n );
216+
jw.pop();
217+
// jw.print();
218+
if ( upds[ 0 ]->get_is_http() ) {
219+
// submit http POST request
220+
http_request msg;
221+
msg.init( "POST", "/" );
222+
msg.add_hdr( "Host", hptr_->get_host() );
223+
msg.add_hdr( "Content-Type", "application/json" );
224+
msg.commit( jw );
225+
hptr_->add_send( msg );
226+
} else if ( wptr_ ) {
227+
// submit websocket message
228+
ws_wtr msg;
229+
msg.commit( ws_wtr::text_id, jw, true );
230+
wptr_->add_send( msg );
231+
}
232+
else {
233+
PC_LOG_WRN( "no ws connection to send msg" ).end();
234+
}
235+
}
236+
188237
void rpc_client::rpc_http::parse_content( const char *txt, size_t len )
189238
{
190239
cp_->parse_response( txt, len );
@@ -202,15 +251,14 @@ void rpc_client::parse_response( const char *txt, size_t len )
202251
uint32_t idtok = jp_.find_val( 1, "id" );
203252
if ( idtok ) {
204253
// response to http request
205-
uint64_t id = jp_.get_uint( idtok );
206-
if ( id < rv_.size() ) {
207-
rpc_request *rptr = rv_[id];
208-
if ( rptr ) {
209-
rv_[id] = nullptr;
210-
reuse_.push_back( id );
211-
rptr->response( jp_ );
212-
}
254+
const uint64_t id = jp_.get_uint( idtok );
255+
const auto range = rv_.equal_range( id );
256+
for ( auto it = range.first; it != range.second; ++it ) {
257+
rpc_request *const rptr = it->second;
258+
rptr->response( jp_ );
213259
}
260+
reuse_.push_back( id );
261+
rv_.erase( range.first, range.second );
214262
} else {
215263
// websocket notification
216264
uint32_t ptok = jp_.find_val( 1, "params" );
@@ -798,16 +846,19 @@ void rpc::upd_price::set_block_hash( hash *bhash )
798846
void rpc::upd_price::set_price( int64_t px,
799847
uint64_t conf,
800848
symbol_status st,
801-
uint64_t pub_slot,
802849
bool is_agg )
803850
{
804851
price_ = px;
805852
conf_ = conf;
806853
st_ = st;
807-
pub_slot_ = pub_slot;
808854
cmd_ = is_agg?e_cmd_agg_price:e_cmd_upd_price;
809855
}
810856

857+
void rpc::upd_price::set_slot( const uint64_t pub_slot )
858+
{
859+
pub_slot_ = pub_slot;
860+
}
861+
811862
signature *rpc::upd_price::get_signature()
812863
{
813864
return &sig_;
@@ -974,7 +1025,7 @@ bool rpc::upd_price::request(
9741025
// construct binary transaction
9751026
net_buf *bptr = net_buf::alloc();
9761027
bincode tx( bptr->buf_ );
977-
if ( ! build_tx( tx, upds, n ) ) {
1028+
if ( ! build_tx( tx, upds, n ) ) {
9781029
return false;
9791030
}
9801031

pc/rpc_client.hpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#include <oracle/oracle.h>
88
#include <pc/hash_map.hpp>
99

10+
#include <unordered_map>
11+
1012
#define PC_RPC_ERROR_BLOCK_CLEANED_UP -32001
1113
#define PC_RPC_ERROR_SEND_TX_PREFLIGHT_FAIL -32002
1214
#define PC_RPC_ERROR_TX_SIG_VERIFY_FAILURE -32003
@@ -62,6 +64,11 @@ namespace pc
6264
str commitment_to_str( commitment );
6365
commitment str_to_commitment( str );
6466

67+
namespace rpc
68+
{
69+
class upd_price;
70+
}
71+
6572
// solana rpc REST API client
6673
class rpc_client : public error
6774
{
@@ -80,6 +87,7 @@ namespace pc
8087

8188
// submit rpc request (and bundled callback)
8289
void send( rpc_request * );
90+
void send( rpc::upd_price *[], unsigned n );
8391

8492
public:
8593

@@ -123,7 +131,7 @@ namespace pc
123131
};
124132
};
125133

126-
typedef std::vector<rpc_request*> request_t;
134+
typedef std::unordered_multimap< uint64_t, rpc_request* > request_t;
127135
typedef std::vector<uint64_t> id_vec_t;
128136
typedef std::vector<char> acc_buf_t;
129137
typedef hash_map<trait> sub_map_t;
@@ -395,6 +403,8 @@ namespace pc
395403
class upd_price : public tx_request, public rpc_request
396404
{
397405
public:
406+
static constexpr unsigned MAX_UPDATES = 10;
407+
398408
// parameters
399409
void set_symbol_status( symbol_status );
400410
void set_publish( key_pair * );
@@ -403,7 +413,8 @@ namespace pc
403413
void set_program( pub_key * );
404414
void set_block_hash( hash * );
405415
void set_price( int64_t px, uint64_t conf, symbol_status,
406-
uint64_t pub_slot, bool is_aggregate );
416+
bool is_aggregate );
417+
void set_slot( uint64_t );
407418

408419
// results
409420
signature *get_signature();

0 commit comments

Comments
 (0)