15
15
use std:: fmt:: Debug ;
16
16
use std:: future;
17
17
use std:: io;
18
+ use std:: iter:: repeat_with;
18
19
use std:: sync:: Arc ;
19
20
20
21
use databend_common_meta_kvapi:: kvapi;
@@ -38,6 +39,7 @@ use databend_common_meta_types::UpsertKV;
38
39
use futures:: Stream ;
39
40
use futures_util:: StreamExt ;
40
41
use futures_util:: TryStreamExt ;
42
+ use itertools:: Itertools ;
41
43
use log:: debug;
42
44
use log:: info;
43
45
use log:: warn;
@@ -150,36 +152,51 @@ impl SMV002 {
150
152
151
153
let mut importer = sm_v002:: SMV002 :: new_importer ( ) ;
152
154
153
- // AsyncBufReadExt::lines() is a bit slow.
154
- //
155
- // let br = BufReader::with_capacity(16 * 1024 * 1024, data);
156
- // let mut lines = AsyncBufReadExt::lines(br);
157
- // while let Some(l) = lines.next_line().await? {
158
- // let ent: RaftStoreEntry = serde_json::from_str(&l)
159
- // .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
160
- // importer.import(ent)?;
161
- // }
162
-
163
155
let f = data. into_std ( ) . await ;
164
156
165
157
let h = databend_common_base:: runtime:: spawn_blocking ( move || {
166
- let mut br = std:: io:: BufReader :: with_capacity ( 16 * 1024 * 1024 , f) ;
167
- let mut line_buf = String :: with_capacity ( 4 * 1024 ) ;
168
-
169
- loop {
170
- line_buf. clear ( ) ;
171
- let n_read = std:: io:: BufRead :: read_line ( & mut br, & mut line_buf) ?;
172
- if n_read == 0 {
173
- break ;
158
+ // Create a worker pool to deserialize the entries.
159
+
160
+ let queue_depth = 1024 ;
161
+ let n_workers = 16 ;
162
+ let ( tx, rx) = ordq:: new ( queue_depth, repeat_with ( || Deserializer ) . take ( n_workers) ) ;
163
+
164
+ // Spawn a thread to import the deserialized entries.
165
+
166
+ let import_th = databend_common_base:: runtime:: Thread :: spawn ( move || {
167
+ while let Some ( res) = rx. recv ( ) {
168
+ let entries: Result < Vec < RaftStoreEntry > , io:: Error > =
169
+ res. map_err ( |e| io:: Error :: new ( io:: ErrorKind :: Other , e) ) ?;
170
+
171
+ let entries = entries?;
172
+
173
+ for ent in entries {
174
+ importer. import ( ent) ?;
175
+ }
174
176
}
175
177
176
- let ent: RaftStoreEntry = serde_json:: from_str ( & line_buf)
177
- . map_err ( |e| io:: Error :: new ( io:: ErrorKind :: InvalidData , e) ) ?;
178
+ let level_data = importer. commit ( ) ;
179
+ Ok :: < _ , io:: Error > ( level_data)
180
+ } ) ;
178
181
179
- importer. import ( ent) ?;
182
+ // Feed input strings to the worker pool.
183
+ {
184
+ let mut br = io:: BufReader :: with_capacity ( 16 * 1024 * 1024 , f) ;
185
+ let lines = io:: BufRead :: lines ( & mut br) ;
186
+ for c in & lines. into_iter ( ) . chunks ( 1024 ) {
187
+ let chunk = c. collect :: < Result < Vec < _ > , _ > > ( ) ?;
188
+ tx. send ( chunk)
189
+ . map_err ( |e| io:: Error :: new ( io:: ErrorKind :: Other , e) ) ?
190
+ }
191
+
192
+ // drop `tx` to notify the worker threads to exit.
193
+ tx. close ( )
180
194
}
181
195
182
- let level_data = importer. commit ( ) ;
196
+ let level_data = import_th
197
+ . join ( )
198
+ . map_err ( |_e| io:: Error :: new ( io:: ErrorKind :: Other , "import thread failure" ) ) ??;
199
+
183
200
Ok :: < _ , io:: Error > ( level_data)
184
201
} ) ;
185
202
@@ -493,3 +510,20 @@ impl SMV002 {
493
510
Ok ( ( ) )
494
511
}
495
512
}
513
+
514
+ struct Deserializer ;
515
+
516
+ impl ordq:: Work for Deserializer {
517
+ type I = Vec < String > ;
518
+ type O = Result < Vec < RaftStoreEntry > , io:: Error > ;
519
+
520
+ fn run ( & mut self , strings : Self :: I ) -> Self :: O {
521
+ let mut res = Vec :: with_capacity ( strings. len ( ) ) ;
522
+ for s in strings {
523
+ let ent: RaftStoreEntry = serde_json:: from_str ( & s)
524
+ . map_err ( |e| io:: Error :: new ( io:: ErrorKind :: InvalidData , e) ) ?;
525
+ res. push ( ent) ;
526
+ }
527
+ Ok ( res)
528
+ }
529
+ }
0 commit comments