Skip to content

Commit 2b37fb5

Browse files
use base64+zstd account subscription; add dependency on libzstd (or ubuntu package libzstd-dev)
1 parent acca35f commit 2b37fb5

File tree

9 files changed

+65
-29
lines changed

9 files changed

+65
-29
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ set( PC_HDR
6161
add_library( pc STATIC ${PC_SRC} )
6262

6363
# dependencies
64-
set( PC_DEP pc ssl crypto z )
64+
set( PC_DEP pc ssl crypto z zstd )
6565

6666
#
6767
# applications

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ apt install libssl-dev
88
# depends on libz
99
apt install zlib1g zlib1g-dev
1010
11+
# depends on libzstd
12+
apt install libzstd-dev
13+
1114
# uses cmake to build
1215
apt install cmake
1316

pc/manager.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,8 @@ bool manager::init()
315315
}
316316
PC_LOG_INF( "initialized" )
317317
.add( "version", PC_VERSION )
318+
.add( "rpc_host", get_rpc_host() )
319+
.add( "tx_host", get_tx_host() )
318320
.add( "capture_file", get_capture_file() )
319321
.add( "publish_interval(ms)", get_publish_interval() )
320322
.end();

pc/request.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ void add_product::submit()
558558
areq_->set_sender( pkey );
559559
areq_->set_account( &akey_ );
560560
areq_->set_owner( gpub );
561-
areq_->set_space( sizeof( pc_price_t ) );
561+
areq_->set_space( PC_PROD_ACC_SIZE );
562562
sreq_->set_program( gpub );
563563
sreq_->set_publish( pkey );
564564
sreq_->set_account( &akey_ );
@@ -1910,7 +1910,8 @@ void product::update( T *res )
19101910
return;
19111911
}
19121912
pc_prod_t *prod;
1913-
if ( sizeof( pc_prod_t ) > res->get_data( prod ) ||
1913+
size_t plen = std::max( sizeof(pc_price_t), (size_t)PC_PROD_ACC_SIZE );
1914+
if ( sizeof( pc_prod_t ) > res->get_data( prod, plen ) ||
19141915
prod->magic_ != PC_MAGIC ||
19151916
!init_from_account( prod ) ) {
19161917
cptr->set_err_msg( "invalid or corrupt product account" );

pc/rpc_client.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "bincode.hpp"
33
#include <unistd.h>
44
#include "log.hpp"
5+
#include <zstd.h>
56

67
using namespace pc;
78

@@ -123,10 +124,20 @@ static void send_transaction( json_wtr& msg, bincode& tx )
123124
rpc_client::rpc_client()
124125
: hptr_( nullptr ),
125126
wptr_( nullptr ),
126-
id_( 0UL )
127+
id_( 0UL ),
128+
cxt_( nullptr )
127129
{
128130
hp_.cp_ = this;
129131
wp_.cp_ = this;
132+
cxt_ = ZSTD_createDCtx();
133+
}
134+
135+
rpc_client::~rpc_client()
136+
{
137+
if ( cxt_ ) {
138+
ZSTD_freeDCtx( (ZSTD_DCtx*)cxt_ );
139+
cxt_ = nullptr;
140+
}
130141
}
131142

132143
void rpc_client::set_http_conn( net_connect *hptr )
@@ -252,6 +263,19 @@ void rpc_client::remove_notify( rpc_request *rptr )
252263
}
253264
}
254265

266+
size_t rpc_client::get_data(
267+
const char *dptr, size_t dlen, size_t tlen, char *&ptr )
268+
{
269+
tlen = ZSTD_compressBound( tlen );
270+
abuf_.resize( dlen );
271+
zbuf_.resize( tlen );
272+
ZSTD_DCtx *cxt = (ZSTD_DCtx*)cxt_;
273+
dlen = dec_base64( (const uint8_t*)dptr, dlen, (uint8_t*)&abuf_[0] );
274+
tlen = ZSTD_decompressDCtx( cxt, &zbuf_[0], tlen, &abuf_[0], dlen );
275+
ptr = &zbuf_[0];
276+
return tlen;
277+
}
278+
255279
///////////////////////////////////////////////////////////////////////////
256280
// rpc_request
257281

@@ -452,7 +476,7 @@ void rpc::get_account_info::request( json_wtr& msg )
452476
msg.add_key( "params", json_wtr::e_arr );
453477
msg.add_val( *acc_ );
454478
msg.add_val( json_wtr::e_obj );
455-
msg.add_key( "encoding", "base64" );
479+
msg.add_key( "encoding", "base64+zstd" );
456480
msg.add_key( "commitment", commitment_to_str( cmt_ ) );
457481
msg.pop();
458482
msg.pop();
@@ -790,7 +814,7 @@ void rpc::account_subscribe::request( json_wtr& msg )
790814
msg.add_key( "params", json_wtr::e_arr );
791815
msg.add_val( *acc_ );
792816
msg.add_val( json_wtr::e_obj );
793-
msg.add_key( "encoding", "base64" );
817+
msg.add_key( "encoding", "base64+zstd" );
794818
msg.add_key( "commitment", commitment_to_str( cmt_ ) );
795819
msg.pop();
796820
msg.pop();

pc/rpc_client.hpp

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,13 @@ namespace pc
6161
commitment str_to_commitment( str );
6262

6363
class rpc_request;
64-
6564
// solana rpc REST API client
6665
class rpc_client : public error
6766
{
6867
public:
6968

7069
rpc_client();
70+
~rpc_client();
7171

7272
// rpc http connection
7373
void set_http_conn( net_connect * );
@@ -90,8 +90,7 @@ namespace pc
9090
void remove_notify( rpc_request * );
9191

9292
// decode into buffer and return pointer
93-
template<class T>
94-
size_t get_data( const char *dptr, size_t dlen, T *&ptr );
93+
size_t get_data( const char *dptr, size_t dlen, size_t tlen, char*&ptr);
9594

9695
// reset state
9796
void reset();
@@ -133,7 +132,9 @@ namespace pc
133132
id_vec_t reuse_; // reuse id list
134133
sub_map_t smap_; // subscription map
135134
acc_buf_t abuf_; // account decode buffer
135+
acc_buf_t zbuf_; // account decompress buffer
136136
uint64_t id_; // next request id
137+
void *cxt_;
137138
};
138139

139140
// rpc response or subscrption callback
@@ -240,16 +241,6 @@ namespace pc
240241
/////////////////////////////////////////////////////////////////////////
241242
// wrappers for various solana rpc requests
242243

243-
template<class T>
244-
size_t rpc_client::get_data( const char *dptr, size_t dlen, T *&ptr )
245-
{
246-
size_t tlen = enc_base64_len( sizeof( T ) );
247-
abuf_.resize( std::max( dlen, tlen ) );
248-
dec_base64( (const uint8_t*)dptr, dlen, (uint8_t*)&abuf_[0] );
249-
ptr = (T*)&abuf_[0];
250-
return abuf_.size();
251-
}
252-
253244
namespace rpc
254245
{
255246
// get account balance, program data and account meta-data
@@ -266,7 +257,7 @@ namespace pc
266257
uint64_t get_rent_epoch() const;
267258
bool get_is_executable() const;
268259
void get_owner( const char *&, size_t& ) const;
269-
template<class T> size_t get_data( T *& ) const;
260+
template<class T> size_t get_data( T *&, size_t len=sizeof(T) ) const;
270261

271262
get_account_info();
272263
void request( json_wtr& ) override;
@@ -285,9 +276,13 @@ namespace pc
285276
commitment cmt_;
286277
};
287278

288-
template<class T> size_t get_account_info::get_data( T *&res ) const
279+
template<class T>
280+
size_t get_account_info::get_data( T *&res, size_t tlen ) const
289281
{
290-
return get_rpc_client()->get_data( dptr_, dlen_, res );
282+
char *ptr;
283+
size_t len = get_rpc_client()->get_data( dptr_, dlen_, tlen, ptr );
284+
res = (T*)ptr;
285+
return len;
291286
}
292287

293288
// recent block hash and fee schedule
@@ -421,7 +416,7 @@ namespace pc
421416
// results
422417
uint64_t get_slot() const;
423418
uint64_t get_lamports() const;
424-
template<class T> size_t get_data( T *& ) const;
419+
template<class T> size_t get_data( T *&, size_t len=sizeof(T) ) const;
425420

426421
account_subscribe();
427422
void request( json_wtr& ) override;
@@ -437,9 +432,13 @@ namespace pc
437432
commitment cmt_;
438433
};
439434

440-
template<class T> size_t account_subscribe::get_data( T *&res ) const
435+
template<class T>
436+
size_t account_subscribe::get_data( T *&res, size_t tlen ) const
441437
{
442-
return get_rpc_client()->get_data( dptr_, dlen_, res );
438+
char *ptr;
439+
size_t len = get_rpc_client()->get_data( dptr_, dlen_, tlen, ptr );
440+
res = (T*)ptr;
441+
return len;
443442
}
444443

445444
// transaction to transfer funds between accounts

pc/user.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ void user::parse_upd_price( uint32_t tok, uint32_t itok )
191191
jw_.add_key( "result", 0UL );
192192
add_tail( itok );
193193
} else if ( !sptr->get_is_ready_publish() ) {
194-
add_error( itok, PC_JSON_NOT_READY, "not ready to publish" );
194+
add_error( itok, PC_JSON_NOT_READY,
195+
"not ready to publish - check pyth_tx connection" );
195196
} else if ( !sptr->has_publisher() ) {
196197
add_error( itok, PC_JSON_MISSING_PERMS, "missing publish permission" );
197198
} else if ( sptr->get_is_err() ) {

pctest/test_publish.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,11 @@ void test_publish::on_response( pc::price_sched *ptr, uint64_t sub_id )
302302
.end();
303303
// should work once publisher has been permissioned
304304
} else if ( !sym->get_is_ready_publish() ) {
305-
PC_LOG_WRN( "not ready to publish next price" )
305+
PC_LOG_WRN( "not ready to publish next price - check pyth_tx connection")
306306
.add( "symbol", sym->get_symbol() )
307307
.add( "price_type", pc::price_type_to_str( sym->get_price_type() ) )
308308
.end();
309-
// could be delay in confirmation - try again next time
309+
// likely that pyth_tx not yet connected
310310
} else if ( sym->get_is_err() ) {
311311
PC_LOG_WRN( "block-chain error" )
312312
.add( "symbol", sym->get_symbol() )

pctest/test_publish.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,13 @@ async def poll( uri ):
125125
while True:
126126
msg = json.loads( await ws.recv() )
127127
# print(msg)
128-
if 'result' in msg:
128+
if 'error' in msg:
129+
ts = datetime.datetime.utcnow().isoformat()
130+
code = msg['error']['code']
131+
emsg = msg['error']['message']
132+
print( f'{ts} error code: {code} msg: {emsg}' )
133+
sys.exit(1)
134+
elif 'result' in msg:
129135
msgid = msg['id']
130136
if msgid in allids:
131137
allids[msgid].parse_reply( msg, allsub )

0 commit comments

Comments
 (0)