1
1
use crate :: serialization:: { Addr , SerializationSink } ;
2
+ use parking_lot:: Mutex ;
2
3
use std:: error:: Error ;
3
4
use std:: fs;
4
- use std:: io:: { Write } ;
5
+ use std:: io:: Write ;
5
6
use std:: path:: Path ;
6
- use parking_lot:: Mutex ;
7
7
8
8
pub struct FileSerializationSink {
9
9
data : Mutex < Inner > ,
@@ -25,9 +25,9 @@ impl SerializationSink for FileSerializationSink {
25
25
Ok ( FileSerializationSink {
26
26
data : Mutex :: new ( Inner {
27
27
file,
28
- buffer : vec ! [ 0 ; 1024 * 512 ] ,
28
+ buffer : vec ! [ 0 ; 1024 * 512 ] ,
29
29
buf_pos : 0 ,
30
- addr : 0
30
+ addr : 0 ,
31
31
} ) ,
32
32
} )
33
33
}
@@ -42,7 +42,7 @@ impl SerializationSink for FileSerializationSink {
42
42
ref mut file,
43
43
ref mut buffer,
44
44
ref mut buf_pos,
45
- ref mut addr
45
+ ref mut addr,
46
46
} = * data;
47
47
48
48
let curr_addr = * addr;
@@ -53,15 +53,15 @@ impl SerializationSink for FileSerializationSink {
53
53
54
54
if buf_end <= buffer. len ( ) {
55
55
// We have enough space in the buffer, just write the data to it.
56
- write ( & mut buffer[ buf_start .. buf_end] ) ;
56
+ write ( & mut buffer[ buf_start.. buf_end] ) ;
57
57
* buf_pos = buf_end;
58
58
} else {
59
59
// We don't have enough space in the buffer, so flush to disk
60
60
file. write_all ( & buffer[ ..buf_start] ) . unwrap ( ) ;
61
61
62
62
if num_bytes <= buffer. len ( ) {
63
63
// There's enough space in the buffer, after flushing
64
- write ( & mut buffer[ 0 .. num_bytes] ) ;
64
+ write ( & mut buffer[ 0 .. num_bytes] ) ;
65
65
* buf_pos = num_bytes;
66
66
} else {
67
67
// Even after flushing the buffer there isn't enough space, so
@@ -75,6 +75,36 @@ impl SerializationSink for FileSerializationSink {
75
75
76
76
Addr ( curr_addr)
77
77
}
78
+
79
+ fn write_bytes_atomic ( & self , bytes : & [ u8 ] ) -> Addr {
80
+ if bytes. len ( ) < 128 {
81
+ // For "small" pieces of data, use the regular implementation so we
82
+ // don't repeatedly flush an almost empty buffer to disk.
83
+ return self . write_atomic ( bytes. len ( ) , |sink| sink. copy_from_slice ( bytes) ) ;
84
+ }
85
+
86
+ let mut data = self . data . lock ( ) ;
87
+ let Inner {
88
+ ref mut file,
89
+ ref mut buffer,
90
+ ref mut buf_pos,
91
+ ref mut addr,
92
+ } = * data;
93
+
94
+ let curr_addr = * addr;
95
+ * addr += bytes. len ( ) as u32 ;
96
+
97
+ if * buf_pos > 0 {
98
+ // There's something in the buffer, flush it to disk
99
+ file. write_all ( & buffer[ ..* buf_pos] ) . unwrap ( ) ;
100
+ * buf_pos = 0 ;
101
+ }
102
+
103
+ // Now write the whole input to disk, skipping the write buffer
104
+ file. write_all ( bytes) . unwrap ( ) ;
105
+
106
+ Addr ( curr_addr)
107
+ }
78
108
}
79
109
80
110
impl Drop for FileSerializationSink {
0 commit comments