@@ -45,21 +45,34 @@ impl SerializationSink for FileSerializationSink {
45
45
ref mut addr
46
46
} = * data;
47
47
48
- assert ! ( num_bytes <= buffer. len( ) ) ;
49
- let mut buf_start = * buf_pos;
50
- let mut buf_end = buf_start + num_bytes;
48
+ let curr_addr = * addr;
49
+ * addr += num_bytes as u32 ;
51
50
52
- if buf_end > buffer. len ( ) {
53
- file. write_all ( & buffer[ ..buf_start] ) . expect ( "failed to write buffer" ) ;
54
- buf_start = 0 ;
55
- buf_end = num_bytes;
56
- }
51
+ let buf_start = * buf_pos;
52
+ let buf_end = buf_start + num_bytes;
57
53
58
- write ( & mut buffer[ buf_start .. buf_end] ) ;
59
- * buf_pos = buf_end;
54
+ if buf_end <= buffer. len ( ) {
55
+ // We have enough space in the buffer, just write the data to it.
56
+ write ( & mut buffer[ buf_start .. buf_end] ) ;
57
+ * buf_pos = buf_end;
58
+ } else {
59
+ // We don't have enough space in the buffer, so flush to disk
60
+ file. write_all ( & buffer[ ..buf_start] ) . unwrap ( ) ;
61
+
62
+ if num_bytes <= buffer. len ( ) {
63
+ // There's enough space in the buffer, after flushing
64
+ write ( & mut buffer[ 0 .. num_bytes] ) ;
65
+ * buf_pos = num_bytes;
66
+ } else {
67
+ // Even after flushing the buffer there isn't enough space, so
68
+ // fall back to dynamic allocation
69
+ let mut temp_buffer = vec ! [ 0 ; num_bytes] ;
70
+ write ( & mut temp_buffer[ ..] ) ;
71
+ file. write_all ( & temp_buffer[ ..] ) . unwrap ( ) ;
72
+ * buf_pos = 0 ;
73
+ }
74
+ }
60
75
61
- let curr_addr = * addr;
62
- * addr += num_bytes as u32 ;
63
76
Addr ( curr_addr)
64
77
}
65
78
}
@@ -75,7 +88,7 @@ impl Drop for FileSerializationSink {
75
88
} = * data;
76
89
77
90
if * buf_pos > 0 {
78
- file. write_all ( & buffer[ ..* buf_pos] ) . expect ( "failed to write buffer" ) ;
91
+ file. write_all ( & buffer[ ..* buf_pos] ) . unwrap ( ) ;
79
92
}
80
93
}
81
94
}
0 commit comments