3
3
#include < unistd.h>
4
4
#include < signal.h>
5
5
#include < iostream>
6
+ #include < stdlib.h>
6
7
7
8
class test_publish ;
8
9
@@ -30,12 +31,14 @@ class test_connect : public pc::manager_sub
30
31
// construct publishers on addition of new symbols
31
32
void on_add_symbol ( pc::manager *, pc::price * ) override ;
32
33
34
+ // when we receive a new slot
35
+ void on_slot_publish ( pc::manager * ) override ;
36
+
33
37
// have we received an on_init() callback yet
34
38
bool get_is_init () const ;
35
39
36
40
void teardown ();
37
41
38
- private:
39
42
test_publish *pub1_; // SYMBOL1 publisher
40
43
test_publish *pub2_; // SYMBOL2 publisher
41
44
};
@@ -44,8 +47,7 @@ class test_connect : public pc::manager_sub
44
47
class test_publish : public pc ::request_sub,
45
48
public pc::request_sub_i<pc::product>,
46
49
public pc::request_sub_i<pc::price>,
47
- public pc::request_sub_i<pc::price_init>,
48
- public pc::request_sub_i<pc::price_sched>
50
+ public pc::request_sub_i<pc::price_init>
49
51
{
50
52
public:
51
53
test_publish ( pc::price *sym, int64_t px, uint64_t sprd );
@@ -57,12 +59,18 @@ class test_publish : public pc::request_sub,
57
59
// callback for on-chain aggregate price update
58
60
void on_response ( pc::price*, uint64_t ) override ;
59
61
60
- // callback for when to submit new price on-chain
61
- void on_response ( pc::price_sched *, uint64_t ) override ;
62
-
63
62
// callback for re-initialization of price account (with diff. exponent)
64
63
void on_response ( pc::price_init *, uint64_t ) override ;
65
64
65
+ // user-facing method to update the price this publisher will send
66
+ void update_price ( int64_t px_, uint64_t sprd_ );
67
+
68
+ // the pyth price publisher for this symbol
69
+ pc::price *sym_;
70
+
71
+ // indicates if there is an update pending which should be sent in the next slot
72
+ bool has_update_pending;
73
+
66
74
private:
67
75
void unsubscribe ();
68
76
@@ -71,25 +79,23 @@ class test_publish : public pc::request_sub,
71
79
uint64_t sprd_; // confidence interval or bid-ask spread
72
80
double expo_; // price exponent
73
81
uint64_t sid1_; // subscription id for prices
74
- uint64_t sid2_; // subscription id for scheduling
75
- uint64_t sid3_; // subscription id for scheduling
82
+ uint64_t sid2_; // subscription id for products
76
83
uint64_t rcnt_; // price receive count
77
84
};
78
85
79
86
test_publish::test_publish ( pc::price *sym, int64_t px, uint64_t sprd )
80
- : sub_( this ),
87
+ : sym_( sym ),
88
+ has_update_pending( false ),
89
+ sub_( this ),
81
90
px_( px ),
82
91
sprd_( sprd ),
83
92
rcnt_( 0UL )
84
93
{
85
94
// add subscriptions for price updates from block chain
86
95
sid1_ = sub_.add ( sym );
87
96
88
- // add subscription for price scheduling
89
- sid2_ = sub_.add ( sym->get_sched () );
90
-
91
97
// add subscription for product updates
92
- sid3_ = sub_.add ( sym->get_product () );
98
+ sid2_ = sub_.add ( sym->get_product () );
93
99
94
100
// get price exponent for this symbol
95
101
int64_t expo = sym->get_price_exponent ();
@@ -170,6 +176,34 @@ void test_connect::on_add_symbol( pc::manager *, pc::price *sym )
170
176
}
171
177
}
172
178
179
+ // Send any pending updates when a new slot is published.
180
+ void test_connect::on_slot_publish ( pc::manager * )
181
+ {
182
+ // Collect all the prices that are pending updates
183
+ std::vector<pc::price*> updates;
184
+ if ( pub1_ && pub1_->has_update_pending ) {
185
+ updates.emplace_back ( pub1_->sym_ );
186
+ }
187
+ if ( pub2_ && pub2_->has_update_pending ) {
188
+ updates.emplace_back ( pub2_->sym_ );
189
+ }
190
+
191
+ // Do nothing if there are no pending updates
192
+ if ( updates.empty () ) {
193
+ return ;
194
+ }
195
+
196
+ // Send the batch price update
197
+ if ( !pc::price::send ( updates.data (), updates.size ()) ) {
198
+ PC_LOG_ERR ( " batch send failed" ).end ();
199
+ }
200
+
201
+ // Mark the updates as completed
202
+ updates.clear ();
203
+ pub1_->has_update_pending = false ;
204
+ pub2_->has_update_pending = false ;
205
+ }
206
+
173
207
test_publish::~test_publish ()
174
208
{
175
209
unsubscribe ();
@@ -179,7 +213,6 @@ void test_publish::unsubscribe()
179
213
{
180
214
// unsubscribe to callbacks
181
215
sub_.del ( sid1_ ); // unsubscribe price updates
182
- sub_.del ( sid2_ ); // unsubscribe price schedule updates
183
216
}
184
217
185
218
void test_publish::on_response ( pc::product *prod, uint64_t )
@@ -264,56 +297,6 @@ void test_publish::on_response( pc::price *sym, uint64_t )
264
297
}
265
298
}
266
299
267
- void test_publish::on_response ( pc::price_sched *ptr, uint64_t sub_id )
268
- {
269
- // check if currently in error
270
- pc::price *sym = ptr->get_price ();
271
- if ( sym->get_is_err () ) {
272
- PC_LOG_ERR ( " aggregate price in error" )
273
- .add ( " err" , sym->get_err_msg () )
274
- .end ();
275
- unsubscribe ();
276
- return ;
277
- }
278
-
279
- // submit next price to block chain for this symbol
280
- if ( sym->update ( px_, sprd_, pc::symbol_status::e_trading ) ) {
281
- double price = expo_ * (double )px_;
282
- double spread = expo_ * (double )sprd_;
283
- PC_LOG_INF ( " submit price to block-chain" )
284
- .add ( " symbol" , sym->get_symbol () )
285
- .add ( " price_type" , pc::price_type_to_str ( sym->get_price_type () ) )
286
- .add ( " price" , price )
287
- .add ( " spread" , spread )
288
- .add ( " slot" , sym->get_manager ()->get_slot () )
289
- .add ( " sub_id" , sub_id )
290
- .end ();
291
- // increase price
292
- px_ += static_cast < int64_t >( sprd_ );
293
- } else if ( !sym->has_publisher () ) {
294
- PC_LOG_WRN ( " missing publish permission" )
295
- .add ( " symbol" , sym->get_symbol () )
296
- .add ( " price_type" , pc::price_type_to_str ( sym->get_price_type () ) )
297
- .end ();
298
- // should work once publisher has been permissioned
299
- } else if ( !sym->get_is_ready_publish () ) {
300
- PC_LOG_WRN ( " not ready to publish next price - check rpc / pyth_tx connection" )
301
- .add ( " symbol" , sym->get_symbol () )
302
- .add ( " price_type" , pc::price_type_to_str ( sym->get_price_type () ) )
303
- .end ();
304
- // likely that pyth_tx not yet connected
305
- } else if ( sym->get_is_err () ) {
306
- PC_LOG_WRN ( " block-chain error" )
307
- .add ( " symbol" , sym->get_symbol () )
308
- .add ( " price_type" , pc::price_type_to_str ( sym->get_price_type () ) )
309
- .add ( " err_msg" , sym->get_err_msg () )
310
- .end ();
311
- unsubscribe ();
312
- // either bad config or on-chain program problem - cant continue as is
313
- // could try calling reset_err() and continue once error is resolved
314
- }
315
- }
316
-
317
300
void test_publish::on_response ( pc::price_init *ptr, uint64_t )
318
301
{
319
302
pc::price *sym = ptr->get_price ();
@@ -323,6 +306,12 @@ void test_publish::on_response( pc::price_init *ptr, uint64_t )
323
306
.end ();
324
307
}
325
308
309
+ // Updates the price value stored locally, without sending the update.
310
+ void test_publish::update_price ( int64_t px_, uint64_t sprd_ ) {
311
+ sym_->update_no_send ( px_, sprd_, pc::symbol_status::e_trading, false );
312
+ has_update_pending = true ;
313
+ }
314
+
326
315
std::string get_rpc_host ()
327
316
{
328
317
return " localhost" ;
@@ -364,6 +353,11 @@ int usage()
364
353
return 1 ;
365
354
}
366
355
356
+ int64_t random_value ( )
357
+ {
358
+ return rand () % 10 + 1 ;
359
+ }
360
+
367
361
int main (int argc, char ** argv)
368
362
{
369
363
// unpack options
@@ -429,6 +423,15 @@ int main(int argc, char** argv)
429
423
// and requests to submit price
430
424
while ( do_run && !mgr.get_is_err () ) {
431
425
mgr.poll ( do_wait );
426
+
427
+ // Submit new price updates
428
+ if ( sub.pub1_ != nullptr ) {
429
+ sub.pub1_ ->update_price ( random_value () , uint64_t ( random_value () ) );
430
+ }
431
+ if ( sub.pub2_ != nullptr ) {
432
+ sub.pub2_ ->update_price ( random_value () , uint64_t ( random_value () ) );
433
+ }
434
+
432
435
}
433
436
434
437
// report any errors on exit
0 commit comments