17
17
//! Spawning of all tasks happens in this module
18
18
//! Nowhere else is anything ever spawned
19
19
20
- use log:: * ;
21
- use tokio:: runtime:: Runtime ;
22
- use substrate_rpc_primitives:: number:: NumberOrHex ;
23
20
use futures:: {
24
- Future , Stream ,
21
+ future :: { self , loop_fn , Loop } ,
25
22
sync:: mpsc:: { self , UnboundedReceiver , UnboundedSender } ,
26
- future:: { self , loop_fn, Loop }
27
- } ;
28
- use substrate_primitives:: {
29
- U256 ,
30
- storage:: StorageKey ,
31
- twox_128
23
+ Future , Stream ,
32
24
} ;
25
+ use log:: * ;
26
+ use substrate_primitives:: { storage:: StorageKey , twox_128, U256 } ;
27
+ use substrate_rpc_primitives:: number:: NumberOrHex ;
28
+ use tokio:: runtime:: Runtime ;
33
29
34
30
use std:: sync:: Arc ;
35
31
36
32
use crate :: {
37
33
database:: Database ,
38
- rpc:: Rpc ,
39
34
error:: Error as ArchiveError ,
40
- types:: { System , Data }
35
+ rpc:: Rpc ,
36
+ types:: { Data , System } ,
41
37
} ;
42
38
43
39
// with the hopeful and long-anticipated release of async-await
44
40
pub struct Archive < T : System > {
45
41
rpc : Arc < Rpc < T > > ,
46
42
db : Arc < Database > ,
47
- runtime : Runtime
43
+ runtime : Runtime ,
48
44
}
49
45
50
- impl < T > Archive < T > where T : System {
51
-
46
+ impl < T > Archive < T >
47
+ where
48
+ T : System ,
49
+ {
52
50
pub fn new ( ) -> Result < Self , ArchiveError > {
53
51
let mut runtime = Runtime :: new ( ) ?;
54
52
let rpc = runtime. block_on ( Rpc :: < T > :: new ( url:: Url :: parse ( "ws://127.0.0.1:9944" ) ?) ) ?;
55
53
let db = Database :: new ( ) ?;
56
54
let ( rpc, db) = ( Arc :: new ( rpc) , Arc :: new ( db) ) ;
57
55
debug ! ( "METADATA: {}" , rpc. metadata( ) ) ;
58
56
debug ! ( "KEYS: {:?}" , rpc. keys( ) ) ;
59
- Ok ( Self { rpc, db, runtime } )
57
+ Ok ( Self { rpc, db, runtime } )
60
58
}
61
59
62
60
pub fn run ( mut self ) -> Result < ( ) , ArchiveError > {
@@ -65,45 +63,44 @@ impl<T> Archive<T> where T: System {
65
63
self . runtime . spawn (
66
64
self . rpc
67
65
. clone ( )
68
- . subscribe_blocks ( sender. clone ( ) ) . map_err ( |e| println ! ( "{:?}" , e) )
69
- ) ;
70
- self . runtime . spawn (
71
- Self :: sync ( self . db . clone ( ) , self . rpc . clone ( ) , sender. clone ( ) )
72
- . map ( |_| ( ) )
66
+ . subscribe_blocks ( sender. clone ( ) )
67
+ . map_err ( |e| println ! ( "{:?}" , e) ) ,
73
68
) ;
69
+ self . runtime
70
+ . spawn ( Self :: sync ( self . db . clone ( ) , self . rpc . clone ( ) , sender. clone ( ) ) . map ( |_| ( ) ) ) ;
74
71
tokio:: run ( Self :: handle_data ( receiver, self . db . clone ( ) ) ) ;
75
72
Ok ( ( ) )
76
73
}
77
74
78
75
// TODO return a float between 0 and 1 corresponding to percent of database that is up-to-date?
79
76
/// Verification task that ensures all blocks are in the database
80
- fn sync ( db : Arc < Database > ,
81
- rpc : Arc < Rpc < T > > ,
82
- sender : UnboundedSender < Data < T > >
83
- ) -> impl Future < Item = Sync , Error = ( ) > + ' static
84
- {
77
+ fn sync (
78
+ db : Arc < Database > ,
79
+ rpc : Arc < Rpc < T > > ,
80
+ sender : UnboundedSender < Data < T > > ,
81
+ ) -> impl Future < Item = Sync , Error = ( ) > + ' static {
85
82
loop_fn ( Sync :: new ( ) , move |v| {
86
83
let sender0 = sender. clone ( ) ;
87
84
v. sync ( db. clone ( ) , rpc. clone ( ) , sender0. clone ( ) )
88
- . and_then ( move |( sync, done) | {
89
- if done {
90
- Ok ( Loop :: Break ( sync) )
91
- } else {
92
- Ok ( Loop :: Continue ( sync) )
93
- }
94
- } )
85
+ . and_then ( move |( sync, done) | {
86
+ if done {
87
+ Ok ( Loop :: Break ( sync) )
88
+ } else {
89
+ Ok ( Loop :: Continue ( sync) )
90
+ }
91
+ } )
95
92
} )
96
93
}
97
94
98
- fn handle_data ( receiver : UnboundedReceiver < Data < T > > ,
99
- db : Arc < Database > ,
100
- ) -> impl Future < Item = ( ) , Error = ( ) > + ' static
101
- {
95
+ fn handle_data (
96
+ receiver : UnboundedReceiver < Data < T > > ,
97
+ db : Arc < Database > ,
98
+ ) -> impl Future < Item = ( ) , Error = ( ) > + ' static {
102
99
receiver. for_each ( move |data| {
103
100
match data {
104
101
Data :: SyncProgress ( missing_blocks) => {
105
102
println ! ( "{} blocks missing" , missing_blocks) ;
106
- } ,
103
+ }
107
104
c => {
108
105
tokio:: spawn ( db. insert ( c) . map_err ( |e| error ! ( "{:?}" , e) ) ) ;
109
106
}
@@ -120,60 +117,64 @@ struct Sync {
120
117
121
118
impl Sync {
122
119
fn new ( ) -> Self {
123
- Self {
124
- looped : 0 ,
125
- }
120
+ Self { looped : 0 }
126
121
}
127
122
128
- fn sync < T > ( self ,
129
- db : Arc < Database > ,
130
- rpc : Arc < Rpc < T > > ,
131
- sender : UnboundedSender < Data < T > > ,
123
+ fn sync < T > (
124
+ self ,
125
+ db : Arc < Database > ,
126
+ rpc : Arc < Rpc < T > > ,
127
+ sender : UnboundedSender < Data < T > > ,
132
128
) -> impl Future < Item = ( Self , bool ) , Error = ( ) > + ' static
133
- where T : System + std:: fmt:: Debug + ' static
129
+ where
130
+ T : System + std:: fmt:: Debug + ' static ,
134
131
{
135
132
let ( sender0, sender1) = ( sender. clone ( ) , sender. clone ( ) ) ;
136
133
let ( rpc0, rpc1) = ( rpc. clone ( ) , rpc. clone ( ) ) ;
137
134
let looped = self . looped ;
138
135
info ! ( "Looped: {}" , looped) ;
139
136
140
- let missing_blocks =
141
- db. query_missing_blocks ( )
142
- . and_then ( move |blocks| {
143
- match sender0
144
- . unbounded_send ( Data :: SyncProgress ( blocks. len ( ) ) )
145
- . map_err ( Into :: into) {
146
- Ok ( ( ) ) => ( ) ,
147
- Err ( e) => return future:: err ( e)
148
- }
149
- future:: ok (
150
- blocks
151
- . into_iter ( )
152
- . take ( 100_000 ) // just do 100K blocks at a time
153
- . map ( |b| NumberOrHex :: Hex ( U256 :: from ( b) ) )
154
- . collect :: < Vec < NumberOrHex < T :: BlockNumber > > > ( )
155
- )
156
- } ) . and_then ( move |blocks| {
157
- rpc0. batch_block_from_number ( blocks, sender)
158
- . and_then ( move |_| {
159
- let looped = looped + 1 ;
160
- future:: ok ( ( Self { looped} , false ) )
161
- } )
162
- } ) ;
163
-
164
- let missing_timestamps =
165
- db. query_missing_timestamps :: < T > ( )
166
- . and_then ( move |hashes| {
167
- info ! ( "Launching timestamp insertion thread for {} items" , hashes. len( ) ) ;
168
- let timestamp_key = b"Timestamp Now" ;
169
- let storage_key = twox_128 ( timestamp_key) ;
170
- let keys = std:: iter:: repeat ( StorageKey ( storage_key. to_vec ( ) ) )
171
- . take ( hashes. len ( ) )
172
- . collect :: < Vec < StorageKey > > ( ) ;
173
- rpc1. batch_storage ( sender1, keys, hashes)
137
+ let missing_blocks = db
138
+ . query_missing_blocks ( )
139
+ . and_then ( move |blocks| {
140
+ match sender0
141
+ . unbounded_send ( Data :: SyncProgress ( blocks. len ( ) ) )
142
+ . map_err ( Into :: into)
143
+ {
144
+ Ok ( ( ) ) => ( ) ,
145
+ Err ( e) => return future:: err ( e) ,
146
+ }
147
+ future:: ok (
148
+ blocks
149
+ . into_iter ( )
150
+ . take ( 100_000 ) // just do 100K blocks at a time
151
+ . map ( |b| NumberOrHex :: Hex ( U256 :: from ( b) ) )
152
+ . collect :: < Vec < NumberOrHex < T :: BlockNumber > > > ( ) ,
153
+ )
154
+ } )
155
+ . and_then ( move |blocks| {
156
+ rpc0. batch_block_from_number ( blocks, sender)
157
+ . and_then ( move |_| {
158
+ let looped = looped + 1 ;
159
+ future:: ok ( ( Self { looped } , false ) )
160
+ } )
174
161
} ) ;
175
- missing_timestamps. join ( missing_blocks)
176
- . map_err ( |e| error ! ( "{:?}" , e) )
177
- . map ( |( _, b) | b)
162
+
163
+ let missing_timestamps = db. query_missing_timestamps :: < T > ( ) . and_then ( move |hashes| {
164
+ info ! (
165
+ "Launching timestamp insertion thread for {} items" ,
166
+ hashes. len( )
167
+ ) ;
168
+ let timestamp_key = b"Timestamp Now" ;
169
+ let storage_key = twox_128 ( timestamp_key) ;
170
+ let keys = std:: iter:: repeat ( StorageKey ( storage_key. to_vec ( ) ) )
171
+ . take ( hashes. len ( ) )
172
+ . collect :: < Vec < StorageKey > > ( ) ;
173
+ rpc1. batch_storage ( sender1, keys, hashes)
174
+ } ) ;
175
+ missing_timestamps
176
+ . join ( missing_blocks)
177
+ . map_err ( |e| error ! ( "{:?}" , e) )
178
+ . map ( |( _, b) | b)
178
179
}
179
180
}
0 commit comments