Skip to content

Commit 8be5247

Browse files
authored
Batch updates in update_price JRPC call (#140)
* Change semantics of update_price JRPC call to buffer updates internally, instead of immediately sending them * Send pending updates when a new slot is received * Add additional logging to batch sending * Don't add a price to the list of pending updates if it is already present. * Add test for update_price JPRC call
1 parent 91eadd7 commit 8be5247

File tree

7 files changed

+190
-27
lines changed

7 files changed

+190
-27
lines changed

docker/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ RUN apt-get install -qq \
1616
libzstd1 \
1717
libzstd-dev \
1818
python3-pytest \
19+
python3-pytest-asyncio \
1920
python3-websockets \
2021
sudo \
2122
zlib1g \

pc/manager.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,12 @@ void manager::on_response( rpc::get_slot *res )
746746
if (
747747
has_status( PC_PYTH_RPC_CONNECTED )
748748
) {
749+
750+
// New slot received, so flush all pending updates for all active users
751+
for( user *uptr = olist_.first(); uptr; uptr = uptr->get_next() ) {
752+
uptr->send_pending_upds();
753+
}
754+
749755
if ( sub_ ) {
750756
sub_->on_slot_publish( this );
751757
}

pc/request.cpp

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -732,20 +732,39 @@ bool price::send( price *prices[], const unsigned n )
732732
for ( unsigned i = 0, j = 0; i < n; ++i ) {
733733
price *const p = prices[ i ];
734734
if ( PC_UNLIKELY( ! p->init_ && ! p->init_publish() ) ) {
735+
PC_LOG_ERR( "failed to initialize publisher" )
736+
.add( "price_account", *p->get_account() )
737+
.add( "product_account", *p->prod_->get_account() )
738+
.add( "symbol", p->get_symbol() )
739+
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
735740
continue;
736741
}
737742
if ( PC_UNLIKELY( ! p->has_publisher() ) ) {
743+
PC_LOG_ERR( "missing publish permission" )
744+
.add( "price_account", *p->get_account() )
745+
.add( "product_account", *p->prod_->get_account() )
746+
.add( "symbol", p->get_symbol() )
747+
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
738748
continue;
739749
}
740750
if ( PC_UNLIKELY( ! p->get_is_ready_publish() ) ) {
751+
PC_LOG_ERR( "not ready to publish - check rpc / pyth_tx connection" )
752+
.add( "price_account", *p->get_account() )
753+
.add( "product_account", *p->prod_->get_account() )
754+
.add( "symbol", p->get_symbol() )
755+
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
741756
continue;
742757
}
743758
manager *const mgr = p->get_manager();
744759
if ( ! mgr1 ) {
745760
mgr1 = mgr;
746761
}
747762
else if ( mgr != mgr1 ) {
748-
PC_LOG_ERR( "unexpected manager" ).end();
763+
PC_LOG_ERR( "unexpected manager" )
764+
.add( "price_account", *p->get_account() )
765+
.add( "product_account", *p->prod_->get_account() )
766+
.add( "symbol", p->get_symbol() )
767+
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
749768
continue;
750769
}
751770
const uint64_t slot = mgr->get_slot();
@@ -763,7 +782,11 @@ bool price::send( price *prices[], const unsigned n )
763782
mgr->submit( msg );
764783
}
765784
else {
766-
PC_LOG_ERR( "failed to build msg" );
785+
PC_LOG_ERR( "failed to build msg" )
786+
.add( "price_account", *p->get_account() )
787+
.add( "product_account", *p->prod_->get_account() )
788+
.add( "symbol", p->get_symbol() )
789+
.add( "price_type", price_type_to_str( p->get_price_type() ) ).end();
767790
}
768791
}
769792
else {
@@ -774,7 +797,7 @@ bool price::send( price *prices[], const unsigned n )
774797
std::string( 100, '\0' ), p1->preq_->get_sent_time()
775798
);
776799
p1->preq_->get_signature()->enc_base58( p1->tvec_.back().first );
777-
PC_LOG_DBG( "sent price update transaction" )
800+
PC_LOG_DBG( "sent price update" )
778801
.add( "price_account", *p1->get_account() )
779802
.add( "product_account", *p1->prod_->get_account() )
780803
.add( "symbol", p1->get_symbol() )

pc/user.cpp

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "manager.hpp"
33
#include "log.hpp"
44
#include "mem_map.hpp"
5+
#include <algorithm>
56

67
#define PC_JSON_RPC_VER "2.0"
78
#define PC_JSON_PARSE_ERROR -32700
@@ -11,6 +12,7 @@
1112
#define PC_JSON_UNKNOWN_SYMBOL -32000
1213
#define PC_JSON_MISSING_PERMS -32001
1314
#define PC_JSON_NOT_READY -32002
15+
#define PC_BATCH_SEND_FAILED -32010
1416

1517
using namespace pc;
1618

@@ -204,22 +206,17 @@ void user::parse_upd_price( uint32_t tok, uint32_t itok )
204206
if ( 0 == (ntok = jp_.find_val( ptok, "status" ) ) ) break;
205207
symbol_status stype = str_to_symbol_status( jp_.get_str( ntok ) );
206208

207-
// submit new price
208-
if ( sptr->update( price, conf, stype ) ) {
209-
// create result
210-
add_header();
211-
jw_.add_key( "result", 0UL );
212-
add_tail( itok );
213-
} else if ( !sptr->get_is_ready_publish() ) {
214-
add_error( itok, PC_JSON_NOT_READY,
215-
"not ready to publish - check rpc / pyth_tx connection" );
216-
} else if ( !sptr->has_publisher() ) {
217-
add_error( itok, PC_JSON_MISSING_PERMS, "missing publish permission" );
218-
} else if ( sptr->get_is_err() ) {
219-
add_error( itok, PC_JSON_INVALID_REQUEST, sptr->get_err_msg() );
220-
} else {
221-
add_error( itok, PC_JSON_INVALID_REQUEST, "unknown error" );
209+
// Add the updated price to the pending updates
210+
sptr->update_no_send( price, conf, stype, false );
211+
if( std::find(pending_vec_.begin(), pending_vec_.end(), sptr) == pending_vec_.end() ) {
212+
pending_vec_.emplace_back( sptr );
222213
}
214+
215+
// Send the result back
216+
add_header();
217+
jw_.add_key( "result", 0UL );
218+
add_tail( itok );
219+
223220
return;
224221
} while( 0 );
225222
add_invalid_params( itok );
@@ -333,6 +330,19 @@ void user::parse_get_product( uint32_t tok, uint32_t itok )
333330
add_tail( itok );
334331
}
335332

333+
void user::send_pending_upds()
334+
{
335+
if ( pending_vec_.empty() ) {
336+
return;
337+
}
338+
339+
if ( !price::send( pending_vec_.data(), pending_vec_.size()) ) {
340+
add_error( 0, PC_BATCH_SEND_FAILED, "batch send failed - please check the pyth logs" );
341+
}
342+
343+
pending_vec_.clear();
344+
}
345+
336346
void user::parse_get_all_products( uint32_t itok )
337347
{
338348
add_header();

pc/user.hpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ namespace pc
4343
// symbol price schedule callback
4444
void on_response( price_sched *, uint64_t ) override;
4545

46+
// send all pending updates
47+
void send_pending_upds();
48+
4649
private:
4750

4851
// http-only request parsing
@@ -57,6 +60,7 @@ namespace pc
5760
};
5861

5962
typedef std::vector<deferred_sub> def_vec_t;
63+
typedef std::vector<price*> pending_vec_t;
6064

6165
void parse_request( uint32_t );
6266
void parse_get_product_list( uint32_t );
@@ -73,13 +77,14 @@ namespace pc
7377
void add_unknown_symbol( uint32_t id );
7478
void add_error( uint32_t id, int err, str );
7579

76-
rpc_client *rptr_; // rpc manager api
77-
manager *sptr_; // manager collection
78-
user_http hsvr_; // http parser
79-
jtree jp_; // json parser
80-
json_wtr jw_; // json writer
81-
def_vec_t dvec_; // deferred subscriptions
82-
request_sub_set psub_; // price subscriptions
80+
rpc_client *rptr_; // rpc manager api
81+
manager *sptr_; // manager collection
82+
user_http hsvr_; // http parser
83+
jtree jp_; // json parser
84+
json_wtr jw_; // json writer
85+
def_vec_t dvec_; // deferred subscriptions
86+
request_sub_set psub_; // price subscriptions
87+
pending_vec_t pending_vec_; // prices with pending updates
8388
};
8489

8590
}

pyth/tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,12 +347,12 @@ def pythd(solana_test_validator, pyth_dir):
347347
'-x',
348348
'-m', 'finalized',
349349
'-d',
350+
'-l', 'pyth_logs.txt',
350351
]
351352
kwargs = {
352353
'stdin': DEVNULL,
353-
'stdout': DEVNULL,
354-
'stderr': DEVNULL,
355354
}
355+
356356
with Popen(cmd, **kwargs) as p:
357357
time.sleep(3)
358358
yield

pyth/tests/test_update_price.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import json
2+
from subprocess import check_output
3+
import pytest
4+
import websockets
5+
import time
6+
import itertools
7+
import random
8+
9+
from pyth.tests.conftest import PRODUCTS
10+
11+
@pytest.mark.asyncio
12+
async def test_batch_update_price(solana_test_validator, pythd, pyth_dir, pyth_init_product, pyth_init_price):
13+
14+
messageIds = itertools.count()
15+
16+
# Use a single websocket connection for the entire test, as batching is done per-user
17+
async with websockets.connect('ws://localhost:8910/') as ws:
18+
19+
async def update_price(account, price, conf, status):
20+
msg = jrpc_req(
21+
method='update_price',
22+
params={
23+
'account': account,
24+
'price': price,
25+
'conf': conf,
26+
'status': status,
27+
})
28+
29+
await send(msg)
30+
resp = await recv()
31+
assert resp['result'] == 0
32+
33+
async def get_product(account):
34+
output = check_output([
35+
'pyth', 'get_product',
36+
account,
37+
'-r', 'localhost',
38+
'-k', pyth_dir,
39+
'-c', 'finalized',
40+
'-j',
41+
]).decode('ascii')
42+
result = json.loads(output)
43+
44+
return result
45+
46+
def jrpc_req(method=None, params=None):
47+
return {
48+
'jsonrpc': '2.0',
49+
'method': method,
50+
'params': params,
51+
'id': next(messageIds)
52+
}
53+
54+
async def send(msg):
55+
print("--- sending message ---")
56+
print(msg)
57+
await ws.send(json.dumps(msg))
58+
59+
async def recv():
60+
data = await ws.recv()
61+
msg = json.loads(data)
62+
print("----- received message -----")
63+
print(msg)
64+
return msg
65+
66+
def get_publisher_acc(product_acc):
67+
assert len(product_acc['price_accounts']) == 1
68+
price_acc = product_acc['price_accounts'][0]
69+
70+
assert len(price_acc['publisher_accounts']) == 1
71+
return price_acc['publisher_accounts'][0]
72+
73+
# Check that the prices are 0 initially
74+
for product in PRODUCTS.keys():
75+
product_acc = await get_product(pyth_init_product[product])
76+
publisher_acc = get_publisher_acc(product_acc)
77+
78+
assert publisher_acc['price'] == 0
79+
assert publisher_acc['conf'] == 0
80+
assert publisher_acc['status'] == 'unknown'
81+
82+
# Generate new values for this test
83+
new_values = {
84+
product: {
85+
'price':random.randint(1, 150),
86+
'conf': random.randint(1, 20),
87+
'status': 'trading',
88+
} for product in PRODUCTS.keys()
89+
}
90+
91+
# Update the values of the products
92+
for product in PRODUCTS.keys():
93+
await update_price(
94+
pyth_init_price[product],
95+
new_values[product]['price'],
96+
new_values[product]['conf'],
97+
new_values[product]['status'])
98+
99+
time.sleep(80)
100+
101+
# Crank the products
102+
for product in PRODUCTS.keys():
103+
await update_price(
104+
pyth_init_price[product],
105+
1,
106+
1,
107+
'trading')
108+
109+
time.sleep(80)
110+
111+
# Check that the price has been updated
112+
for product in PRODUCTS.keys():
113+
product_acc = await get_product(pyth_init_product[product])
114+
publisher_acc = get_publisher_acc(product_acc)
115+
116+
assert publisher_acc['price'] == new_values[product]['price']
117+
assert publisher_acc['conf'] == new_values[product]['conf']
118+
assert publisher_acc['status'] == new_values[product]['status']

0 commit comments

Comments
 (0)