1
1
use crate :: serialization:: { Addr , SerializationSink } ;
2
2
use std:: error:: Error ;
3
3
use std:: fs;
4
- use std:: io:: { BufWriter , Write } ;
4
+ use std:: io:: { Write } ;
5
5
use std:: path:: Path ;
6
- use std :: sync :: Mutex ;
6
+ use parking_lot :: Mutex ;
7
7
8
8
pub struct FileSerializationSink {
9
- data : Mutex < ( BufWriter < fs:: File > , u32 ) > ,
9
+ data : Mutex < Inner > ,
10
+ }
11
+
12
+ struct Inner {
13
+ file : fs:: File ,
14
+ buffer : Vec < u8 > ,
15
+ buf_pos : usize ,
16
+ addr : u32 ,
10
17
}
11
18
12
19
impl SerializationSink for FileSerializationSink {
@@ -16,7 +23,12 @@ impl SerializationSink for FileSerializationSink {
16
23
let file = fs:: File :: create ( path) ?;
17
24
18
25
Ok ( FileSerializationSink {
19
- data : Mutex :: new ( ( BufWriter :: new ( file) , 0 ) ) ,
26
+ data : Mutex :: new ( Inner {
27
+ file,
28
+ buffer : vec ! [ 0 ; 1024 * 512 ] ,
29
+ buf_pos : 0 ,
30
+ addr : 0
31
+ } ) ,
20
32
} )
21
33
}
22
34
@@ -25,17 +37,45 @@ impl SerializationSink for FileSerializationSink {
25
37
where
26
38
W : FnOnce ( & mut [ u8 ] ) ,
27
39
{
28
- let mut buffer = vec ! [ 0 ; num_bytes] ;
29
- write ( buffer. as_mut_slice ( ) ) ;
40
+ let mut data = self . data . lock ( ) ;
41
+ let Inner {
42
+ ref mut file,
43
+ ref mut buffer,
44
+ ref mut buf_pos,
45
+ ref mut addr
46
+ } = * data;
30
47
31
- let mut data = self . data . lock ( ) . expect ( "couldn't acquire lock" ) ;
32
- let curr_addr = data . 1 ;
33
- let file = & mut data . 0 ;
48
+ assert ! ( num_bytes <= buffer . len ( ) ) ;
49
+ let mut buf_start = * buf_pos ;
50
+ let mut buf_end = buf_start + num_bytes ;
34
51
35
- file. write_all ( & buffer) . expect ( "failed to write buffer" ) ;
52
+ if buf_end > buffer. len ( ) {
53
+ file. write_all ( & buffer[ ..buf_start] ) . expect ( "failed to write buffer" ) ;
54
+ buf_start = 0 ;
55
+ buf_end = num_bytes;
56
+ }
36
57
37
- data. 1 += num_bytes as u32 ;
58
+ write ( & mut buffer[ buf_start .. buf_end] ) ;
59
+ * buf_pos = buf_end;
38
60
61
+ let curr_addr = * addr;
62
+ * addr += num_bytes as u32 ;
39
63
Addr ( curr_addr)
40
64
}
41
65
}
66
+
67
+ impl Drop for FileSerializationSink {
68
+ fn drop ( & mut self ) {
69
+ let mut data = self . data . lock ( ) ;
70
+ let Inner {
71
+ ref mut file,
72
+ ref mut buffer,
73
+ ref mut buf_pos,
74
+ addr : _,
75
+ } = * data;
76
+
77
+ if * buf_pos > 0 {
78
+ file. write_all ( & buffer[ ..* buf_pos] ) . expect ( "failed to write buffer" ) ;
79
+ }
80
+ }
81
+ }
0 commit comments