Skip to content

Commit 4b02739

Browse files
committed
poll program accounts
1 parent 82d2a71 commit 4b02739

File tree

6 files changed

+210
-103
lines changed

6 files changed

+210
-103
lines changed

pc/manager.cpp

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ manager::manager()
7070
breq_->set_sub( this );
7171
sreq_->set_sub( this );
7272
preq_->set_sub( this );
73+
areq_->set_sub( this );
7374
tconn_.set_net_parser( &txp_ );
7475
txp_.mgr_ = this;
7576
}
@@ -131,6 +132,16 @@ std::string manager::get_tx_host() const
131132
return thost_;
132133
}
133134

135+
void manager::set_do_ws( bool do_ws )
136+
{
137+
do_ws_ = do_ws;
138+
}
139+
140+
bool manager::get_do_ws() const
141+
{
142+
return do_ws_;
143+
}
144+
134145
void manager::set_do_tx( bool do_tx )
135146
{
136147
do_tx_ = do_tx;
@@ -447,6 +458,13 @@ void manager::poll( bool do_wait )
447458
clnt_.send( sreq_ );
448459
}
449460
}
461+
if ( ! get_do_ws() ) {
462+
if ( areq_->get_is_recv() ) {
463+
if ( has_status( PC_PYTH_RPC_CONNECTED ) ) {
464+
clnt_.send( areq_ );
465+
}
466+
}
467+
}
450468
}
451469

452470
// try to (re)connect to tx proxy
@@ -529,9 +547,15 @@ void manager::reconnect_rpc()
529547
set_err_msg( "missing or invalid program public key [" +
530548
get_program_pub_key_file() + "]" );
531549
} else {
532-
preq_->set_commitment( get_commitment() );
533-
preq_->set_program( gpub );
534-
clnt_.send( preq_ );
550+
if ( get_do_ws() ) {
551+
preq_->set_commitment( get_commitment() );
552+
preq_->set_program( get_program_pub_key() );
553+
clnt_.send( preq_ );
554+
}
555+
else {
556+
areq_->set_commitment( get_commitment() );
557+
areq_->set_program( get_program_pub_key() );
558+
}
535559
}
536560

537561
// gather latest info on mapping accounts
@@ -716,18 +740,34 @@ void manager::on_response( rpc::get_recent_block_hash *m )
716740
PC_LOG_INF( "received_recent_block_hash" )
717741
.add( "curr_slot", slot_ )
718742
.add( "hash_slot", m->get_slot() )
719-
.add( "rount_trip_time(ms)", 1e-6*ack_ts )
743+
.add( "round_trip_time(ms)", 1e-6*ack_ts )
720744
.end();
721745

722746
}
723747

724-
void manager::on_response( rpc::program_subscribe *m )
748+
void manager::on_response( rpc::account_update *m )
725749
{
726750
if ( m->get_is_err() ) {
727-
set_err_msg( "failed to program_subscribe ["
751+
set_err_msg( "account update failed ["
728752
+ m->get_err_msg() + "]" );
729753
return;
730754
}
755+
756+
if ( m->get_is_http() ) {
757+
int64_t ack_ts = m->get_recv_time() - m->get_sent_time();
758+
PC_LOG_DBG( "received account_update" )
759+
.add( "account", *m->get_account() )
760+
.add( "slot", slot_ )
761+
.add( "round_trip_time(ms)", 1e-6*ack_ts )
762+
.end();
763+
}
764+
else {
765+
PC_LOG_DBG( "received account_update" )
766+
.add( "account", *m->get_account() )
767+
.add( "slot", slot_ )
768+
.end();
769+
}
770+
731771
// look up by account and dispatch update
732772
acc_map_t::iter_t it = amap_.find( *m->get_account() );
733773
if ( it ) {

pc/manager.hpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ namespace pc
5050
public rpc_sub,
5151
public rpc_sub_i<rpc::get_slot>,
5252
public rpc_sub_i<rpc::get_recent_block_hash>,
53-
public rpc_sub_i<rpc::program_subscribe>
53+
public rpc_sub_i<rpc::account_update>
5454
{
5555
public:
5656

@@ -65,6 +65,10 @@ namespace pc
6565
void set_tx_host( const std::string& );
6666
std::string get_tx_host() const;
6767

68+
// turn on/off ws subscriptions
69+
void set_do_ws( bool );
70+
bool get_do_ws() const;
71+
6872
// turn on/off tx proxy mode
6973
void set_do_tx( bool );
7074
bool get_do_tx() const;
@@ -166,7 +170,7 @@ namespace pc
166170
// rpc callbacks
167171
void on_response( rpc::get_slot * ) override;
168172
void on_response( rpc::get_recent_block_hash * ) override;
169-
void on_response( rpc::program_subscribe * ) override;
173+
void on_response( rpc::account_update * ) override;
170174
void set_status( int );
171175
get_mapping *get_last_mapping() const;
172176
bool get_is_rpc_send() const;
@@ -236,6 +240,7 @@ namespace pc
236240
kpx_vec_t kvec_; // symbol price scheduling
237241
bool wait_conn_;// waiting on connection
238242
bool do_cap_; // do capture flag
243+
bool do_ws_; // do ws subscriptions
239244
bool do_tx_; // do tx proxy connectivity
240245
bool is_pub_; // is publishing mode
241246
capture cap_; // aggregate price capture
@@ -246,6 +251,7 @@ namespace pc
246251
rpc::get_slot sreq_[1]; // slot subscription
247252
rpc::get_recent_block_hash breq_[1]; // block hash request
248253
rpc::program_subscribe preq_[1]; // program account subscription
254+
rpc::get_program_accounts areq_[1]; // alternative to program_subscribe
249255
};
250256

251257
inline bool manager::get_is_tx_connect() const

pc/request.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ bool request::get_is_recv() const
151151
return is_recv_;
152152
}
153153

154-
void request::on_response( rpc::program_subscribe * )
154+
void request::on_response( rpc::account_update * )
155155
{
156156
}
157157

@@ -319,7 +319,7 @@ void get_mapping::on_response( rpc::get_account_info *res )
319319
update( res );
320320
}
321321

322-
void get_mapping::on_response( rpc::program_subscribe *res )
322+
void get_mapping::on_response( rpc::account_update *res )
323323
{
324324
if ( get_is_recv( )) {
325325
update( res );
@@ -1401,7 +1401,7 @@ void upd_test::on_response( rpc::upd_test *res )
14011401
}
14021402
}
14031403

1404-
void upd_test::on_response( rpc::account_subscribe *res )
1404+
void upd_test::on_response( rpc::account_update *res )
14051405
{
14061406
if ( res->get_is_err() ) {
14071407
on_error_sub( res->get_err_msg(), this );
@@ -1712,7 +1712,7 @@ void product::on_response( rpc::get_account_info *res )
17121712
update( res );
17131713
}
17141714

1715-
void product::on_response( rpc::program_subscribe *res )
1715+
void product::on_response( rpc::account_update *res )
17161716
{
17171717
if ( get_is_recv() ) {
17181718
update( res );
@@ -2114,7 +2114,7 @@ void price::on_response( rpc::get_account_info *res )
21142114
update( res );
21152115
}
21162116

2117-
void price::on_response( rpc::program_subscribe *res )
2117+
void price::on_response( rpc::account_update *res )
21182118
{
21192119
if ( get_is_recv() ) {
21202120
update( res );

pc/request.hpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ namespace pc
6060
class request : public prev_next<request>,
6161
public error,
6262
public rpc_sub,
63-
public rpc_sub_i<rpc::program_subscribe>
63+
public rpc_sub_i<rpc::account_update>
6464
{
6565
public:
6666

@@ -99,7 +99,7 @@ namespace pc
9999
// has received account update
100100
void set_is_recv( bool );
101101
bool get_is_recv() const;
102-
void on_response( rpc::program_subscribe * ) override;
102+
void on_response( rpc::account_update * ) override;
103103

104104
protected:
105105

@@ -165,7 +165,7 @@ namespace pc
165165
void reset();
166166
void submit() override;
167167
void on_response( rpc::get_account_info * ) override;
168-
void on_response( rpc::program_subscribe * ) override;
168+
void on_response( rpc::account_update * ) override;
169169
private:
170170
typedef enum { e_new, e_init } state_t;
171171

@@ -447,8 +447,7 @@ namespace pc
447447

448448
// run aggregate price test
449449
class upd_test : public request,
450-
public rpc_sub_i<rpc::upd_test>,
451-
public rpc_sub_i<rpc::account_subscribe>
450+
public rpc_sub_i<rpc::upd_test>
452451
{
453452
public:
454453
void set_test_key( const std::string& );
@@ -459,7 +458,7 @@ namespace pc
459458

460459
void submit() override;
461460
void on_response( rpc::upd_test * ) override;
462-
void on_response( rpc::account_subscribe * ) override;
461+
void on_response( rpc::account_update * ) override;
463462

464463
private:
465464
typedef enum {
@@ -591,7 +590,7 @@ namespace pc
591590
void reset();
592591
void submit() override;
593592
void on_response( rpc::get_account_info * ) override;
594-
void on_response( rpc::program_subscribe * ) override;
593+
void on_response( rpc::account_update * ) override;
595594
bool get_is_done() const override;
596595
void add_price( price * );
597596

@@ -736,7 +735,7 @@ namespace pc
736735
void submit() override;
737736
void on_response( rpc::upd_price * ) override;
738737
void on_response( rpc::get_account_info * ) override;
739-
void on_response( rpc::program_subscribe * ) override;
738+
void on_response( rpc::account_update * ) override;
740739
bool get_is_done() const override;
741740

742741
private:

0 commit comments

Comments
 (0)