Skip to content

Commit 04b4fe5

Browse files
Support for pyth_tx proxy server for forwarding transactions directly to tpu ports on solana nodes
1 parent 226f0c2 commit 04b4fe5

23 files changed

+1414
-137
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ add_executable( pyth pcapps/pyth.cpp )
7373
target_link_libraries( pyth ${PC_DEP} )
7474
add_executable( pyth_csv pcapps/pyth_csv.cpp )
7575
target_link_libraries( pyth_csv ${PC_DEP} )
76+
add_executable( pyth_tx pcapps/tx_svr.cpp pcapps/pyth_tx.cpp )
77+
target_link_libraries( pyth_tx ${PC_DEP} )
7678

7779
#
7880
# install

dashboard/dashboard.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,14 @@ class Prices
7474
let tab = document.getElementById( "prices" );
7575
let k = 1;
7676
res.result.sort( function(a,b) {
77-
return a['attr_dict']['symbol'] > b['attr_dict']['symbol'];
77+
let asym = a['attr_dict']['symbol'];
78+
let bsym = b['attr_dict']['symbol'];
79+
return ( asym == bsym ? 0 : (asym > bsym ? 1: -1 ) );
7880
} );
7981
res.result.sort( function(a,b) {
80-
return a['attr_dict']['asset_type'] > b['attr_dict']['asset_type'];
82+
let atype = a['attr_dict']['asset_type'];
83+
let btype = b['attr_dict']['asset_type'];
84+
return ( atype == btype ? 0 : (atype > btype ? 1: -1 ) );
8185
} );
8286
for( let i = 0; i != res.result.length; ++i ) {
8387
let sym = res.result[i];

doc/getting_started.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,16 @@ Once permissioned, you can test your setup by running the test_publish.cpp examp
4747

4848

4949
```
50-
KHOST=44.232.27.44 # or devnet.solana.com
51-
./test_publish -k $KDIR -r $KHOST
50+
KHOST=44.232.27.44 # or api.devnet.solana.com
51+
./test_publish -k $KDIR -r $KHOST -t $KHOST
5252
```
5353

5454
The certification environment (pythnet) can be found at the IP address 44.232.27.44. Please also provide to the administrator the IP address you plan to publish to pythnet from so that it can be added to the pythnet firewall. Solana devnet can be found at: devnet.solana.com. No additional permissioning is required to publish to devnet.
5555

5656
You can also publish to solana using the pythd server. Start up the server using the same key-store directory and host specification:
5757

5858
```
59-
./pythd -k $KDIR -r $KHOST
59+
./pythd -k $KDIR -r $KHOST -t $KHOST
6060
```
6161

6262
Run the test_publish.py example program on the same host to connect to the pythd server:

doc/publish_slot.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Understanding publishing slots
2+
3+
When a quoter publishes a price, the pyth-client API also forwards what it thinks is the current slot on solana. This is known as its publishing slot.
4+
5+
The publishing slot and price is stored as the latest update for that publisher on-chain but only if the price is for a later slot than that currently stored. This is to prevent prices from being updated out-of-order and to facilitate arbitration between multiple publishers.
6+
7+
The aggregation algorithm only combines prices from publishers that were published within 8 slots of the current on-chain slot.
8+
9+
Not all published prices get included in the pyth contract due to unreliable transports and the way solana formulates and reaches consensus on each slot.
10+
11+
A quoter may detect if a published price is dropped by comparing the list of publishing slots it submits vs what it subsequently receives in each aggregate price callback.
12+
13+
For example, here is an excerpt of a log take from a run of the test_publish.cpp example program against mainnet-beta. It logs everything it sends and everything it receives.
14+
15+
The publishing slots of six consecutive price submissions have been annotated with the labels A, B, C, D, E and F or slots 79018079, 79018084, 79018085, 79018086, 79018087, 79018092.
16+
17+
The API submits a new price every time it receives notification of a new slot but note that prices for slots 79018080 thru 79018083 and 79018088 thru 79018091 were not submitted. This is because solana does not always publish consecutive slots and gaps can occur. Solana can also publish slots out-of-order, but the API ignores these and is guaranteed only to issue callbacks for slots that are strictly increasing.
18+
19+
Price updates occur for slots labelled A, B, C and F. Slots D and E (79018086, 79018087) were dropped and did not get executed on the chain.
20+
21+
```
22+
[2021-05-18T22:36:14.048435Z 654359 INF submit price to block-chain ] symbol=SYMBOL1/USD,price_type=price,price=0.116000,spread=0.001000,slot=79018079,sub_id=1
23+
^^ A ^^^
24+
[2021-05-18T22:36:14.237644Z 654359 INF received aggregate price update ] symbol=SYMBOL1/USD,price_type=price,status=trading,agg_price=0.112000,agg_spread=0.001000,valid_slot=79018076,pub_slot=79018077,my_price=0.112000,my_conf=0.001000,my_status=trading,my_slot=79018075
25+
[2021-05-18T22:36:14.405182Z 654359 INF received aggregate price update ] symbol=SYMBOL1/USD,price_type=price,status=trading,agg_price=0.113000,agg_spread=0.001000,valid_slot=79018077,pub_slot=79018078,my_price=0.113000,my_conf=0.001000,my_status=trading,my_slot=79018076
26+
[2021-05-18T22:36:16.099126Z 654359 INF submit price to block-chain ] symbol=SYMBOL1/USD,price_type=price,price=0.117000,spread=0.001000,slot=79018084,sub_id=1
27+
^^ B ^^^
28+
[2021-05-18T22:36:16.962077Z 654359 INF submit price to block-chain ] symbol=SYMBOL1/USD,price_type=price,price=0.118000,spread=0.001000,slot=79018085,sub_id=1
29+
^^ C ^^^
30+
[2021-05-18T22:36:17.519741Z 654359 INF submit price to block-chain ] symbol=SYMBOL1/USD,price_type=price,price=0.119000,spread=0.001000,slot=79018086,sub_id=1
31+
^^ D ^^^
32+
[2021-05-18T22:36:17.671924Z 654359 INF received aggregate price update ] symbol=SYMBOL1/USD,price_type=price,status=trading,agg_price=0.114000,agg_spread=0.001000,valid_slot=79018078,pub_slot=79018079,my_price=0.114000,my_conf=0.001000,my_status=trading,my_slot=79018077
33+
[2021-05-18T22:36:18.109491Z 654359 INF submit price to block-chain ] symbol=SYMBOL1/USD,price_type=price,price=0.120000,spread=0.001000,slot=79018087,sub_id=1
34+
^^ E ^^^
35+
[2021-05-18T22:36:20.537479Z 654359 INF submit price to block-chain ] symbol=SYMBOL1/USD,price_type=price,price=0.121000,spread=0.001000,slot=79018092,sub_id=1
36+
^^ F ^^^
37+
[2021-05-18T22:36:21.195836Z 654359 INF submit price to block-chain ] symbol=SYMBOL1/USD,price_type=price,price=0.122000,spread=0.001000,slot=79018093,sub_id=1
38+
[2021-05-18T22:36:21.529074Z 654359 INF submit price to block-chain ] symbol=SYMBOL1/USD,price_type=price,price=0.123000,spread=0.001000,slot=79018094,sub_id=1
39+
[2021-05-18T22:36:21.802004Z 654359 INF received aggregate price update ] symbol=SYMBOL1/USD,price_type=price,status=trading,agg_price=0.116000,agg_spread=0.001000,valid_slot=79018079,pub_slot=79018085,my_price=0.116000,my_conf=0.001000,my_status=trading,my_slot=79018079
40+
^^ A ^^^
41+
[2021-05-18T22:36:21.969477Z 654359 INF received aggregate price update ] symbol=SYMBOL1/USD,price_type=price,status=trading,agg_price=0.117000,agg_spread=0.001000,valid_slot=79018085,pub_slot=79018087,my_price=0.117000,my_conf=0.001000,my_status=trading,my_slot=79018084
42+
^^ B ^^^
43+
[2021-05-18T22:36:22.304469Z 654359 INF received aggregate price update ] symbol=SYMBOL1/USD,price_type=price,status=trading,agg_price=0.118000,agg_spread=0.001000,valid_slot=79018087,pub_slot=79018093,my_price=0.118000,my_conf=0.001000,my_status=trading,my_slot=79018085
44+
^^ C ^^^
45+
[2021-05-18T22:36:22.758348Z 654359 INF submit price to block-chain ] symbol=SYMBOL1/USD,price_type=price,price=0.125000,spread=0.001000,slot=79018096,sub_id=1
46+
[2021-05-18T22:36:23.121339Z 654359 INF received aggregate price update ] symbol=SYMBOL1/USD,price_type=price,status=trading,agg_price=0.121000,agg_spread=0.001000,valid_slot=79018093,pub_slot=79018094,my_price=0.121000,my_conf=0.001000,my_status=trading,my_slot=79018092
47+
^^ F ^^^
48+
```
49+
50+
The API keeps track of the "hit-rate" of price submissions that show up in the update callbacks and tracks end-to-end latency statistics at the 25th, 50th, 75th and 99th percentiles both in terms of seconds of elapsed time and in number of slot updates observed. For example, from the same log:
51+
52+
```
53+
[2021-05-18T22:37:26.685518Z 654359 INF publish statistics ] symbol=SYMBOL1/USD,price_type=price,num_sent=135,hit_rate=73.333333,secs_p25=2.000000,secs_p50=2.500000,secs_p75=3.000000,secs_p99=7.500000,slot_p25=4,slot_p50=4,slot_p75=6,slot_p99=16
54+
```

pc/bincode.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace pc
3333

3434
// add values to buffer
3535
void add( uint8_t );
36+
void add( uint16_t );
3637
void add( uint32_t );
3738
void add( uint64_t );
3839
void add( int32_t );
@@ -117,6 +118,11 @@ namespace pc
117118
idx_ += sizeof( T );
118119
}
119120

121+
inline void bincode::add( uint16_t val )
122+
{
123+
add_val_T( val );
124+
}
125+
120126
inline void bincode::add( uint32_t val )
121127
{
122128
add_val_T( val );

pc/manager.cpp

Lines changed: 97 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33

44
using namespace pc;
55

6+
#define PC_TPU_PROXY_PORT 8898
67
#define PC_RPC_HTTP_PORT 8899
7-
#define PC_RPC_WEBSOCKET_PORT 8900
88
#define PC_RECONNECT_TIMEOUT (120L*1000000000L)
99
#define PC_BLOCKHASH_TIMEOUT 3
1010
#define PC_PUB_INTERVAL (293L*PC_NSECS_IN_MSEC)
11+
#define PC_RPC_HOST "localhost"
1112

1213
///////////////////////////////////////////////////////////////////////////
1314
// manager_sub
@@ -24,6 +25,14 @@ void manager_sub::on_disconnect( manager * )
2425
{
2526
}
2627

28+
void manager_sub::on_tx_connect( manager * )
29+
{
30+
}
31+
32+
void manager_sub::on_tx_disconnect( manager * )
33+
{
34+
}
35+
2736
void manager_sub::on_init( manager * )
2837
{
2938
}
@@ -36,7 +45,9 @@ void manager_sub::on_add_symbol( manager *, price * )
3645
// manager
3746

3847
manager::manager()
39-
: sub_( nullptr ),
48+
: thost_( PC_RPC_HOST ),
49+
rhost_( PC_RPC_HOST ),
50+
sub_( nullptr ),
4051
status_( 0 ),
4152
num_sub_( 0 ),
4253
kidx_( (unsigned)-1 ),
@@ -49,14 +60,17 @@ manager::manager()
4960
pub_int_( PC_PUB_INTERVAL ),
5061
wait_conn_( false ),
5162
do_cap_( false ),
63+
do_tx_( true ),
5264
is_pub_( false )
5365
{
66+
tconn_.set_sub( this );
5467
breq_->set_sub( this );
5568
sreq_->set_sub( this );
5669
}
5770

5871
manager::~manager()
5972
{
73+
teardown();
6074
for( get_mapping *mptr: mvec_ ) {
6175
delete mptr;
6276
}
@@ -65,7 +79,6 @@ manager::~manager()
6579
delete ptr;
6680
}
6781
svec_.clear();
68-
teardown();
6982
}
7083

7184
void manager::add_map_sub()
@@ -95,6 +108,26 @@ std::string manager::get_rpc_host() const
95108
return rhost_;
96109
}
97110

111+
void manager::set_tx_host( const std::string& thost )
112+
{
113+
thost_ = thost;
114+
}
115+
116+
std::string manager::get_tx_host() const
117+
{
118+
return thost_;
119+
}
120+
121+
void manager::set_do_tx( bool do_tx )
122+
{
123+
do_tx_ = do_tx;
124+
}
125+
126+
bool manager::get_do_tx() const
127+
{
128+
return do_tx_;
129+
}
130+
98131
void manager::set_capture_file( const std::string& cap_file )
99132
{
100133
cap_.set_file( cap_file );
@@ -226,17 +259,38 @@ bool manager::init()
226259
return set_err_msg( nl_.get_err_msg() );
227260
}
228261

262+
// decompose rpc_host into host:port
263+
int rport =0, wport = 0;
264+
std::string rhost = get_host_port( rhost_, rport, wport );
265+
if ( rport == 0 ) rport = PC_RPC_HTTP_PORT;
266+
if ( wport == 0 ) wport = rport+1;
267+
229268
// add rpc_client connection to net_loop and initialize
230-
hconn_.set_port( PC_RPC_HTTP_PORT );
231-
hconn_.set_host( rhost_ );
269+
hconn_.set_port( rport );
270+
hconn_.set_host( rhost );
232271
hconn_.set_net_loop( &nl_ );
233272
clnt_.set_http_conn( &hconn_ );
234-
wconn_.set_port( PC_RPC_WEBSOCKET_PORT );
235-
wconn_.set_host( rhost_ );
273+
wconn_.set_port( wport );
274+
wconn_.set_host( rhost );
236275
wconn_.set_net_loop( &nl_ );
237276
clnt_.set_ws_conn( &wconn_ );
238-
hconn_.init();
239-
wconn_.init();
277+
if ( !hconn_.init() ) {
278+
return set_err_msg( hconn_.get_err_msg() );
279+
}
280+
if ( !wconn_.init() ) {
281+
return set_err_msg( wconn_.get_err_msg() );
282+
}
283+
// connect to pyth_tx server
284+
if ( do_tx_ ) {
285+
int tport1 = 0, tport2 = 0;
286+
std::string thost = get_host_port( thost_, tport1, tport2 );
287+
tconn_.set_port( tport1 ? tport1 : PC_TPU_PROXY_PORT );
288+
tconn_.set_host( thost );
289+
tconn_.set_net_loop( &nl_ );
290+
if ( !tconn_.init() ) {
291+
return set_err_msg( tconn_.get_err_msg() );
292+
}
293+
}
240294
wait_conn_ = true;
241295

242296
// initialize listening port if port defined
@@ -320,6 +374,9 @@ void manager::poll( bool do_wait )
320374
hconn_.poll();
321375
wconn_.poll();
322376
}
377+
if ( do_tx_ ) {
378+
tconn_.poll();
379+
}
323380
if ( lsvr_.get_port() ) {
324381
lsvr_.poll();
325382
for( user *uptr = olist_.first(); uptr; ) {
@@ -346,6 +403,11 @@ void manager::poll( bool do_wait )
346403
// get current time
347404
curr_ts_ = get_now();
348405

406+
// try to (re)connect to tx proxy
407+
if ( do_tx_ && ( !tconn_.get_is_connect() || tconn_.get_is_err() ) ) {
408+
tconn_.reconnect();
409+
}
410+
349411
// submit new quotes while connected
350412
if ( has_status( PC_PYTH_RPC_CONNECTED ) &&
351413
!hconn_.get_is_err() &&
@@ -489,6 +551,7 @@ void manager::teardown_users()
489551
PC_LOG_DBG( "delete_user" ).add("fd", usr->get_fd() ).end();
490552
usr->close();
491553
dlist_.del( usr );
554+
delete usr;
492555
}
493556
}
494557

@@ -584,6 +647,31 @@ void manager::submit( request *req )
584647
plist_.add( req );
585648
}
586649

650+
void manager::submit( tx_request *req )
651+
{
652+
net_wtr msg;
653+
req->build( msg );
654+
tconn_.add_send( msg );
655+
}
656+
657+
void manager::on_connect()
658+
{
659+
// callback user with connection status
660+
PC_LOG_INF( "pyth_tx_connected" ).end();
661+
if ( sub_ ) {
662+
sub_->on_tx_connect( this );
663+
}
664+
}
665+
666+
void manager::on_disconnect()
667+
{
668+
// callback user with connection status
669+
PC_LOG_INF( "pyth_tx_reset" ).end();
670+
if ( sub_ ) {
671+
sub_->on_tx_disconnect( this );
672+
}
673+
}
674+
587675
void manager::add_product( const pub_key&acc )
588676
{
589677
acc_map_t::iter_t it = amap_.find( acc );

0 commit comments

Comments
 (0)