Skip to content

Commit c763eca

Browse files
massjhunsaker
authored andcommitted
Add ability to send price updates without pyth_tx.
1 parent 31e3188 commit c763eca

File tree

12 files changed

+216
-58
lines changed

12 files changed

+216
-58
lines changed

pc/manager.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,11 @@ bool manager::get_is_tx_send() const
344344
return tconn_.get_is_send();
345345
}
346346

347+
bool manager::get_is_rpc_send() const
348+
{
349+
return hconn_.get_is_send() || wconn_.get_is_send();
350+
}
351+
347352
bool manager::bootstrap()
348353
{
349354
int status = PC_PYTH_RPC_CONNECTED | PC_PYTH_HAS_BLOCK_HASH;
@@ -674,7 +679,7 @@ void manager::on_response( rpc::get_slot *res )
674679

675680
PC_LOG_DBG( "received get_slot" )
676681
.add( "slot", slot_ )
677-
.add( "rount_trip_time(ms)", 1e-6*ack_ts )
682+
.add( "round_trip_time(ms)", 1e-6*ack_ts )
678683
.end();
679684

680685
// submit block hash every N slots

pc/manager.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ namespace pc
169169
void on_response( rpc::program_subscribe * ) override;
170170
void set_status( int );
171171
get_mapping *get_last_mapping() const;
172+
bool get_is_rpc_send() const;
172173

173174
private:
174175

pc/misc.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
#include <string>
55

66
#define PC_PACKED __attribute__((__packed__))
7-
#define PC_UNLIKELY(ARG) __builtin_expect((ARG),1)
7+
#define PC_LIKELY(ARG) __builtin_expect((ARG),1)
8+
#define PC_UNLIKELY(ARG) __builtin_expect((ARG),0)
89
#define PC_NSECS_IN_SEC 1000000000L
910
#define PC_NSECS_IN_MSEC 1000000L
1011

pc/request.cpp

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1838,6 +1838,7 @@ price::price( const pub_key& acc, product *prod )
18381838
areq_->set_account( &apub_ );
18391839
preq_->set_account( &apub_ );
18401840
areq_->set_sub( this );
1841+
preq_->set_sub( this );
18411842
size_t tlen = ZSTD_compressBound( sizeof(pc_price_t) );
18421843
pptr_ = (pc_price_t*)new char[tlen];
18431844
__builtin_memset( pptr_, 0, tlen );
@@ -1973,7 +1974,14 @@ uint64_t price::get_pub_slot() const
19731974

19741975
bool price::get_is_ready_publish() const
19751976
{
1976-
return st_ == e_publish && get_manager()->get_is_tx_connect();
1977+
if ( st_ != e_publish )
1978+
return false;
1979+
manager *cptr = get_manager();
1980+
if ( cptr->get_do_tx() ) {
1981+
return cptr->get_is_tx_connect();
1982+
} else {
1983+
return cptr->has_status( PC_PYTH_RPC_CONNECTED | PC_PYTH_HAS_BLOCK_HASH );
1984+
}
19771985
}
19781986

19791987
void price::reset()
@@ -2033,9 +2041,34 @@ bool price::update(
20332041
return false;
20342042
}
20352043
manager *mgr = get_manager();
2036-
preq_->set_price( price, conf, st, mgr->get_slot(), is_agg );
2044+
const uint64_t slot = mgr->get_slot();
2045+
preq_->set_price( price, conf, st, slot, is_agg );
20372046
preq_->set_block_hash( mgr->get_recent_block_hash() );
2038-
mgr->submit( preq_ );
2047+
if ( mgr->get_do_tx() )
2048+
mgr->submit( preq_ );
2049+
else {
2050+
get_rpc_client()->send( preq_ );
2051+
tvec_.emplace_back( std::string( '\0' , 100 ), preq_->get_sent_time() );
2052+
preq_->get_signature()->enc_base58( tvec_.back().first );
2053+
PC_LOG_DBG( "sent price update transaction" )
2054+
.add( "price_account", *get_account() )
2055+
.add( "product_account", *prod_->get_account() )
2056+
.add( "symbol", get_symbol() )
2057+
.add( "price_type", price_type_to_str( get_price_type() ) )
2058+
.add( "sig", tvec_.back().first )
2059+
.add( "pub_slot", slot )
2060+
.end();
2061+
if ( PC_UNLIKELY( tvec_.size() >= 100 ) ) {
2062+
PC_LOG_WRN( "too many unacked price update transactions" )
2063+
.add( "price_account", *get_account() )
2064+
.add( "product_account", *prod_->get_account() )
2065+
.add( "symbol", get_symbol() )
2066+
.add( "price_type", price_type_to_str( get_price_type() ) )
2067+
.add( "num_txid", tvec_.size() )
2068+
.end();
2069+
tvec_.erase( tvec_.begin(), tvec_.begin() + 50 );
2070+
}
2071+
}
20392072
inc_sent();
20402073
return true;
20412074
}
@@ -2051,6 +2084,30 @@ void price::submit()
20512084
}
20522085
}
20532086

2087+
bool price::has_unacked_updates() const
2088+
{
2089+
return ! tvec_.empty();
2090+
}
2091+
2092+
void price::on_response( rpc::upd_price *res )
2093+
{
2094+
std::string txid = res->get_ack_signature().as_string();
2095+
const auto it = std::find_if( tvec_.begin(), tvec_.end(),
2096+
[&] ( const std::pair<std::string,int64_t>& m ) { return m.first == txid; } );
2097+
if ( it == tvec_.end() )
2098+
return;
2099+
const int64_t ack_dur = res->get_recv_time() - it->second;
2100+
tvec_.erase( it );
2101+
PC_LOG_DBG( "received price update transaction ack" )
2102+
.add( "price_account", *get_account() )
2103+
.add( "product_account", *prod_->get_account() )
2104+
.add( "symbol", get_symbol() )
2105+
.add( "price_type", price_type_to_str( get_price_type() ) )
2106+
.add( "sig", txid )
2107+
.add( "round_trip_time(ms)", 1e-6 * ack_dur )
2108+
.end();
2109+
}
2110+
20542111
void price::on_response( rpc::get_account_info *res )
20552112
{
20562113
set_is_recv( true );

pc/request.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,8 @@ namespace pc
646646
// price subscriber and publisher
647647
class price : public request,
648648
public pub_stats,
649-
public rpc_sub_i<rpc::get_account_info>
649+
public rpc_sub_i<rpc::get_account_info>,
650+
public rpc_sub_i<rpc::upd_price>
650651
{
651652
public:
652653

@@ -680,6 +681,9 @@ namespace pc
680681
// update aggregate price only
681682
bool update();
682683

684+
// are there any update transactions which have not yet been acked
685+
bool has_unacked_updates() const;
686+
683687
// get and activate price schedule subscription
684688
price_sched *get_sched();
685689

@@ -730,6 +734,7 @@ namespace pc
730734
void reset();
731735
void unsubscribe();
732736
void submit() override;
737+
void on_response( rpc::upd_price * ) override;
733738
void on_response( rpc::get_account_info * ) override;
734739
void on_response( rpc::program_subscribe * ) override;
735740
bool get_is_done() const override;
@@ -739,6 +744,8 @@ namespace pc
739744
typedef enum {
740745
e_subscribe, e_sent_subscribe, e_publish, e_error } state_t;
741746

747+
typedef std::vector<std::pair<std::string,int64_t>> txid_vec_t;
748+
742749
template<class T> void update( T *res );
743750

744751
bool init_publish();
@@ -760,6 +767,7 @@ namespace pc
760767
rpc::get_account_info areq_[1];
761768
rpc::upd_price preq_[1];
762769
pc_price_t *pptr_;
770+
txid_vec_t tvec_;
763771
};
764772

765773
template<class T>

pc/rpc_client.cpp

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,15 @@ namespace pc
107107
static hash sys_id = gen_sys_id();
108108

109109
// generate json for sendTransaction
110-
static void send_transaction( json_wtr& msg, bincode& tx )
110+
static void send_transaction( json_wtr& msg, bincode& tx, bool skipPreflight )
111111
{
112112
msg.add_key( "method", "sendTransaction" );
113113
msg.add_key( "params", json_wtr::e_arr );
114114
msg.add_val_enc_base64( str( tx.get_buf(), tx.size() ) );
115115
msg.add_val( json_wtr::e_obj );
116116
msg.add_key( "encoding", "base64" );
117+
if ( skipPreflight )
118+
msg.add_key( "skipPreflight", json_wtr::jtrue() );
117119
msg.pop();
118120
msg.pop();
119121
}
@@ -808,7 +810,7 @@ void rpc::transfer::request( json_wtr& msg )
808810
sig_.init_from_buf( (const uint8_t*)(tx.get_buf() + sign_idx) );
809811

810812
// encode transaction and add to json params
811-
send_transaction( msg, tx );
813+
send_transaction( msg, tx, false );
812814
bptr->dealloc();
813815
}
814816

@@ -1140,7 +1142,7 @@ void rpc::create_account::request( json_wtr& msg )
11401142
tx.sign( acct_idx, tx_idx, *account_ );
11411143

11421144
// encode transaction and add to json params
1143-
send_transaction( msg, tx );
1145+
send_transaction( msg, tx, false );
11441146
bptr->dealloc();
11451147
}
11461148

@@ -1240,7 +1242,7 @@ void rpc::add_product::request( json_wtr& msg )
12401242
tx.sign( sym_idx, tx_idx, *akey_ );
12411243

12421244
// encode transaction and add to json params
1243-
send_transaction( msg, tx );
1245+
send_transaction( msg, tx, false );
12441246
bptr->dealloc();
12451247
}
12461248

@@ -1344,7 +1346,7 @@ void rpc::upd_product::request( json_wtr& msg )
13441346
tx.sign( sym_idx, tx_idx, *akey_ );
13451347

13461348
// encode transaction and add to json params
1347-
send_transaction( msg, tx );
1349+
send_transaction( msg, tx, false );
13481350
bptr->dealloc();
13491351
}
13501352

@@ -1464,7 +1466,7 @@ void rpc::add_price::request( json_wtr& msg )
14641466
tx.sign( prc_idx, tx_idx, *akey_ );
14651467

14661468
// encode transaction and add to json params
1467-
send_transaction( msg, tx );
1469+
send_transaction( msg, tx, false );
14681470
bptr->dealloc();
14691471
}
14701472

@@ -1573,7 +1575,7 @@ void rpc::init_price::request( json_wtr& msg )
15731575
tx.sign( prc_idx, tx_idx, *akey_ );
15741576

15751577
// encode transaction and add to json params
1576-
send_transaction( msg, tx );
1578+
send_transaction( msg, tx, false );
15771579
bptr->dealloc();
15781580
}
15791581

@@ -1655,7 +1657,7 @@ void rpc::init_mapping::request( json_wtr& msg )
16551657
tx.sign( map_idx, tx_idx, *mkey_ );
16561658

16571659
// encode transaction and add to json params
1658-
send_transaction( msg, tx );
1660+
send_transaction( msg, tx, false );
16591661
bptr->dealloc();
16601662
}
16611663

@@ -1746,7 +1748,7 @@ void rpc::add_mapping::request( json_wtr& msg )
17461748
tx.sign( acc_idx, tx_idx, *akey_ );
17471749

17481750
// encode transaction and add to json params
1749-
send_transaction( msg, tx );
1751+
send_transaction( msg, tx, false );
17501752
bptr->dealloc();
17511753
}
17521754

@@ -1839,7 +1841,7 @@ void rpc::add_publisher::request( json_wtr& msg )
18391841
tx.sign( sym_idx, tx_idx, *akey_ );
18401842

18411843
// encode transaction and add to json params
1842-
send_transaction( msg, tx );
1844+
send_transaction( msg, tx, false );
18431845
bptr->dealloc();
18441846
}
18451847

@@ -1932,7 +1934,7 @@ void rpc::del_publisher::request( json_wtr& msg )
19321934
tx.sign( sym_idx, tx_idx, *akey_ );
19331935

19341936
// encode transaction and add to json params
1935-
send_transaction( msg, tx );
1937+
send_transaction( msg, tx, false );
19361938
bptr->dealloc();
19371939
}
19381940

@@ -2014,7 +2016,7 @@ void rpc::init_test::request( json_wtr& msg )
20142016
tx.sign( prm_idx, tx_idx, *akey_ );
20152017

20162018
// encode transaction and add to json params
2017-
send_transaction( msg, tx );
2019+
send_transaction( msg, tx, false );
20182020
bptr->dealloc();
20192021
}
20202022

@@ -2122,16 +2124,7 @@ void rpc::upd_test::request( json_wtr& msg )
21222124
tx.sign( tst_idx, tx_idx, *tkey_ );
21232125

21242126
// encode transaction and add to json params
2125-
msg.add_key( "method", "sendTransaction" );
2126-
msg.add_key( "params", json_wtr::e_arr );
2127-
char buf[4096];
2128-
size_t buf_len = enc_base64( (const uint8_t*)tx.get_buf(),
2129-
tx.size(), (uint8_t*)buf );
2130-
msg.add_val( str( buf, buf_len ) );
2131-
msg.add_val( json_wtr::e_obj );
2132-
msg.add_key( "encoding", "base64" );
2133-
msg.pop();
2134-
msg.pop();
2127+
send_transaction( msg, tx, false );
21352128
bptr->dealloc();
21362129
}
21372130

@@ -2314,6 +2307,16 @@ void rpc::upd_price::set_price( int64_t px,
23142307
cmd_ = is_agg?e_cmd_agg_price:e_cmd_upd_price;
23152308
}
23162309

2310+
signature *rpc::upd_price::get_signature()
2311+
{
2312+
return &sig_;
2313+
}
2314+
2315+
str rpc::upd_price::get_ack_signature() const
2316+
{
2317+
return ack_sig_;
2318+
}
2319+
23172320
class tx_wtr : public net_wtr
23182321
{
23192322
public:
@@ -2329,12 +2332,8 @@ class tx_wtr : public net_wtr
23292332
}
23302333
};
23312334

2332-
void rpc::upd_price::build( net_wtr& wtr )
2335+
void rpc::upd_price::build_tx( bincode& tx )
23332336
{
2334-
// construct binary transaction and add header
2335-
bincode tx;
2336-
((tx_wtr&)wtr).init( tx );
2337-
23382337
// signatures section
23392338
tx.add_len<1>(); // one signature (publish)
23402339
size_t pub_idx = tx.reserve_sign();
@@ -2376,5 +2375,36 @@ void rpc::upd_price::build( net_wtr& wtr )
23762375

23772376
// all accounts need to sign transaction
23782377
tx.sign( pub_idx, tx_idx, *ckey_ );
2378+
sig_.init_from_buf( (const uint8_t*)(tx.get_buf() + pub_idx) );
2379+
}
2380+
2381+
void rpc::upd_price::build( net_wtr& wtr )
2382+
{
2383+
bincode tx;
2384+
((tx_wtr&)wtr).init( tx );
2385+
build_tx( tx );
23792386
((tx_wtr&)wtr).commit( tx );
23802387
}
2388+
2389+
void rpc::upd_price::request( json_wtr& msg )
2390+
{
2391+
// construct binary transaction
2392+
net_buf *bptr = net_buf::alloc();
2393+
bincode tx( bptr->buf_ );
2394+
build_tx( tx );
2395+
2396+
// encode transaction and add to json params
2397+
send_transaction( msg, tx, true );
2398+
bptr->dealloc();
2399+
}
2400+
2401+
void rpc::upd_price::response( const jtree& jt )
2402+
{
2403+
if ( on_error( jt, this ) )
2404+
return;
2405+
uint32_t rtok = jt.find_val( 1, "result" );
2406+
if ( rtok == 0 )
2407+
return;
2408+
ack_sig_ = jt.get_str( rtok );
2409+
on_response( this );
2410+
}

0 commit comments

Comments
 (0)