Skip to content

Commit e5abb91

Browse files
subscribe to signatures to get accurate tx rate; scale conf. interval before applying sqrt(slot_diff)
1 parent 7643025 commit e5abb91

File tree

13 files changed

+188
-11
lines changed

13 files changed

+188
-11
lines changed

pc/dbl_list.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ namespace pc
1414
void add( T * );
1515
void del( T * );
1616
T *first() const;
17+
T *last() const;
1718
private:
1819
T *hd_;
1920
T *tl_;
@@ -103,6 +104,12 @@ namespace pc
103104
return hd_;
104105
}
105106

107+
template<class T>
108+
T *dbl_list<T>::last() const
109+
{
110+
return tl_;
111+
}
112+
106113
template<class T>
107114
next<T>::next()
108115
: next_( nullptr )

pc/manager.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ manager::~manager()
7979
delete ptr;
8080
}
8181
svec_.clear();
82+
while( !slist_.empty() ) {
83+
price_sig *ptr = slist_.first();
84+
slist_.del( ptr );
85+
delete ptr;
86+
}
8287
}
8388

8489
void manager::add_map_sub()
@@ -727,3 +732,19 @@ product *manager::get_product( unsigned i ) const
727732
{
728733
return i < svec_.size() ? svec_[i] : nullptr;
729734
}
735+
736+
void manager::alloc( price_sig*& ptr )
737+
{
738+
ptr = slist_.last();
739+
if ( ptr ) {
740+
ptr->reset_notify();
741+
slist_.del( ptr );
742+
} else {
743+
ptr = new price_sig;
744+
}
745+
}
746+
747+
void manager::dealloc( price_sig *ptr )
748+
{
749+
slist_.add( ptr );
750+
}

pc/manager.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ namespace pc
151151
void del_map_sub();
152152
void schedule( price_sched* );
153153
void write( pc_pub_key_t *, pc_acc_t *ptr );
154+
void alloc( price_sig*& );
155+
void dealloc( price_sig* );
154156

155157
// tx_sub callbacks
156158
void on_connect() override;
@@ -181,6 +183,7 @@ namespace pc
181183

182184
typedef dbl_list<user> user_list_t;
183185
typedef dbl_list<request> req_list_t;
186+
typedef dbl_list<price_sig> sig_list_t;
184187
typedef std::vector<get_mapping*> map_vec_t;
185188
typedef std::vector<product*> spx_vec_t;
186189
typedef std::vector<price_sched*> kpx_vec_t;
@@ -200,6 +203,7 @@ namespace pc
200203
tx_connect tconn_; // tx proxy connection
201204
user_list_t olist_; // open users list
202205
user_list_t dlist_; // to-be-deleted users list
206+
sig_list_t slist_; // signature reuse list
203207
req_list_t plist_; // pending requests
204208
map_vec_t mvec_; // mapping account updates
205209
acc_map_t amap_; // account to symbol pricing info

pc/pub_stats.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub_stats::pub_stats()
1111
void pub_stats::clear_stats()
1212
{
1313
slots_.clear();
14-
num_sent_ = num_recv_ = 0;
14+
num_sent_ = num_recv_ = num_tx_ = num_coal_ = 0;
1515
__builtin_memset( shist_, 0, sizeof( shist_ ) );
1616
__builtin_memset( thist_, 0, sizeof( shist_ ) );
1717
}
@@ -26,16 +26,31 @@ uint64_t pub_stats::get_num_recv() const
2626
return num_recv_;
2727
}
2828

29+
uint64_t pub_stats::get_num_tx() const
30+
{
31+
return num_tx_;
32+
}
33+
2934
uint64_t pub_stats::get_num_drop() const
3035
{
31-
return num_sent_ - num_recv_;
36+
return num_sent_ - num_tx_;
37+
}
38+
39+
uint64_t pub_stats::get_num_coalesced() const
40+
{
41+
return num_coal_;
3242
}
3343

3444
double pub_stats::get_hit_rate() const
3545
{
3646
return (100.*num_recv_)/num_sent_;
3747
}
3848

49+
double pub_stats::get_tx_rate() const
50+
{
51+
return (100.*num_tx_)/num_sent_;
52+
}
53+
3954
void pub_stats::add_send( uint64_t slot, int64_t ts )
4055
{
4156
++num_sent_;

pc/pub_stats.hpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,22 @@ namespace pc
2222
// get publish stats details
2323
uint64_t get_num_sent() const;
2424
uint64_t get_num_recv() const;
25+
uint64_t get_num_tx() const;
2526
uint64_t get_num_drop() const;
2627

28+
// number of detected coalesce events (i.e. where valid slot does
29+
// not match publish slot of previous update). Does not indicate
30+
// how many submissions (transactions) were actually coalesced
31+
uint64_t get_num_coalesced() const;
32+
2733
// hit rate is percentage of quotes sent that appeared in an aggregate
34+
// this may be undercounted because some submissions may be coalesced
35+
// into a single slot
2836
double get_hit_rate() const;
2937

38+
// tx rate is the percentage of quotes send that are confirmed
39+
double get_tx_rate() const;
40+
3041
// get (rough) quartiles of publish end-to-end latency in seconds
3142
// to a max of 16 seconds
3243
void get_time_quartiles( float q[4] ) const;
@@ -38,6 +49,12 @@ namespace pc
3849
// clear-down statistics
3950
void clear_stats();
4051

52+
// increment signature count
53+
void inc_tx();
54+
55+
// increment coalesce event count
56+
void inc_coalesce();
57+
4158
private:
4259

4360
struct slot_time {
@@ -53,9 +70,21 @@ namespace pc
5370

5471
slots_t slots_;
5572
uint64_t num_sent_;
73+
uint64_t num_tx_;
5674
uint64_t num_recv_;
75+
uint64_t num_coal_;
5776
uint32_t thist_[num_buckets];
5877
uint32_t shist_[num_buckets];
5978
};
6079

80+
inline void pub_stats::inc_tx()
81+
{
82+
++num_tx_;
83+
}
84+
85+
inline void pub_stats::inc_coalesce()
86+
{
87+
++num_coal_;
88+
}
89+
6190
}

pc/request.cpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1981,9 +1981,62 @@ price *product::get_price( price_type pt ) const
19811981
return nullptr;
19821982
}
19831983

1984+
///////////////////////////////////////////////////////////////////////////
1985+
// price_sig
1986+
1987+
price_sig::price_sig()
1988+
: is_notify_( false )
1989+
{
1990+
set_commitment( commitment::e_confirmed );
1991+
set_signature( this );
1992+
}
1993+
1994+
void price_sig::response( const jtree& jt )
1995+
{
1996+
is_notify_ = true;
1997+
add_notify( jt );
1998+
}
1999+
2000+
bool price_sig::notify( const jtree& )
2001+
{
2002+
((price*)get_sub())->on_response( this );
2003+
return false;
2004+
}
2005+
2006+
void price_sig::reset_notify()
2007+
{
2008+
is_notify_ = false;
2009+
}
2010+
2011+
bool price_sig::get_is_notify() const
2012+
{
2013+
return is_notify_;
2014+
}
2015+
19842016
///////////////////////////////////////////////////////////////////////////
19852017
// price
19862018

2019+
void price::on_response( price_sig *res )
2020+
{
2021+
// update transaction receive count
2022+
inc_tx();
2023+
2024+
// remove everything up to and including this result
2025+
for( price_sig *ptr = glist_.first(); ptr; ) {
2026+
price_sig *nxt = ptr->get_next();
2027+
if ( ptr->get_is_notify() ) {
2028+
ptr->remove_notify();
2029+
glist_.del( ptr );
2030+
get_manager()->dealloc( ptr );
2031+
}
2032+
if ( ptr != res ) {
2033+
ptr = nxt;
2034+
} else {
2035+
break;
2036+
}
2037+
}
2038+
}
2039+
19872040
price::price( const pub_key& acc, product *prod )
19882041
: init_( false ),
19892042
isched_( false ),
@@ -2013,6 +2066,15 @@ price::price( const pub_key& acc, product *prod )
20132066
sreq_->set_sub( this );
20142067
}
20152068

2069+
price::~price()
2070+
{
2071+
while( !glist_.empty() ) {
2072+
price_sig *ptr = glist_.first();
2073+
glist_.del( ptr );
2074+
delete ptr;
2075+
}
2076+
}
2077+
20162078
bool price::init_publish()
20172079
{
20182080
manager *cptr = get_manager();
@@ -2178,11 +2240,18 @@ bool price::update(
21782240
if ( PC_UNLIKELY( !get_is_ready_publish() ) ) {
21792241
return false;
21802242
}
2243+
price_sig *sig;
21812244
manager *mgr = get_manager();
2245+
mgr->alloc( sig );
21822246
add_send( mgr->get_slot(), mgr->get_curr_time() );
21832247
preq_->set_price( price, conf, st, mgr->get_slot(), is_agg );
21842248
preq_->set_block_hash( mgr->get_recent_block_hash() );
2249+
preq_->set_signature( sig );
21852250
mgr->submit( preq_ );
2251+
// subscribe to signature
2252+
sig->set_sub( this );
2253+
get_rpc_client()->send( sig );
2254+
glist_.add( sig );
21862255
return true;
21872256
}
21882257

@@ -2345,6 +2414,9 @@ void price::update( T *res )
23452414
twap_ = pupd->twap_;
23462415
avol_ = pupd->avol_;
23472416
sym_st_ = (symbol_status)pupd->agg_.status_;
2417+
if ( pub_slot_ < pupd->valid_slot_ ) {
2418+
inc_coalesce();
2419+
}
23482420
pub_slot_ = pupd->agg_.pub_slot_;
23492421
valid_slot_ = pupd->valid_slot_;
23502422

pc/request.hpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,15 +645,31 @@ namespace pc
645645
uint64_t shash_;
646646
};
647647

648+
class price_sig : public rpc::signature_subscribe,
649+
public signature,
650+
public prev_next<price_sig>
651+
{
652+
public:
653+
price_sig();
654+
void response( const jtree& jt ) override;
655+
bool notify( const jtree& jt ) override;
656+
void reset_notify();
657+
bool get_is_notify() const;
658+
private:
659+
bool is_notify_;
660+
};
661+
648662
// price subscriber and publisher
649663
class price : public request,
650664
public pub_stats,
651665
public rpc_sub_i<rpc::get_account_info>,
652-
public rpc_sub_i<rpc::account_subscribe>
666+
public rpc_sub_i<rpc::account_subscribe>,
667+
public rpc_sub_i<price_sig>
653668
{
654669
public:
655670

656671
price( const pub_key&, product *prod );
672+
virtual ~price();
657673

658674
// corresponding product definition
659675
product *get_product() const;
@@ -726,13 +742,16 @@ namespace pc
726742
void submit() override;
727743
void on_response( rpc::get_account_info * ) override;
728744
void on_response( rpc::account_subscribe * ) override;
745+
void on_response( price_sig * ) override;
729746
bool get_is_done() const override;
730747

731748
private:
732749

733750
typedef enum {
734751
e_subscribe, e_sent_subscribe, e_publish, e_error } state_t;
735752

753+
typedef dbl_list<price_sig> sig_list_t;
754+
736755
template<class T> void update( T *res );
737756

738757
bool init_publish();
@@ -760,6 +779,7 @@ namespace pc
760779
int32_t aexpo_;
761780
uint32_t cnum_;
762781
product *prod_;
782+
sig_list_t glist_;
763783
price_sched sched_;
764784
pc_pub_key_t cpub_[PC_COMP_SIZE];
765785
pc_price_info_t cprice_[PC_COMP_SIZE];

pc/rpc_client.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include "rpc_client.hpp"
22
#include "bincode.hpp"
33
#include <unistd.h>
4-
54
#include "log.hpp"
65

76
using namespace pc;
@@ -2241,6 +2240,11 @@ void rpc::upd_price::set_block_hash( hash *bhash )
22412240
bhash_ = bhash;
22422241
}
22432242

2243+
void rpc::upd_price::set_signature( signature *sig )
2244+
{
2245+
sig_ = sig;
2246+
}
2247+
22442248
void rpc::upd_price::set_price( int64_t px,
22452249
uint64_t conf,
22462250
symbol_status st,
@@ -2318,5 +2322,6 @@ void rpc::upd_price::build( net_wtr& wtr )
23182322

23192323
// all accounts need to sign transaction
23202324
tx.sign( pub_idx, tx_idx, *ckey_ );
2325+
sig_->init_from_buf( (const uint8_t*)(tx.get_buf() + pub_idx) );
23212326
((tx_wtr&)wtr).commit( tx );
23222327
}

pc/rpc_client.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,7 @@ namespace pc
835835
void set_params( pub_key * );
836836
void set_program( pub_key * );
837837
void set_block_hash( hash * );
838+
void set_signature( signature * );
838839
void set_price( int64_t px, uint64_t conf, symbol_status,
839840
uint64_t pub_slot, bool is_aggregate );
840841
void build( net_wtr& ) override;
@@ -846,6 +847,7 @@ namespace pc
846847
pub_key *gkey_;
847848
pub_key *akey_;
848849
pub_key *rkey_;
850+
signature *sig_;
849851
int64_t price_;
850852
uint64_t conf_;
851853
uint64_t pub_slot_;;

0 commit comments

Comments
 (0)