1
+ /// This module implements the "container" file format that `measureme` uses for
2
+ /// storing things on disk. The format supports storing three independent
3
+ /// streams of data: one for events, one for string data, and one for string
4
+ /// index data (in theory it could support an arbitrary number of separate
5
+ /// streams but three is all we need). The data of each stream is split into
6
+ /// "pages", where each page has a small header designating what kind of
7
+ /// data it is (i.e. event, string data, or string index), and the length of
8
+ /// the page.
9
+ ///
10
+ /// Pages of different kinds can be arbitrarily interleaved. The headers allow
11
+ /// for reconstructing each of the streams later on. An example file might thus
12
+ /// look like this:
13
+ ///
14
+ /// ```ignore
15
+ /// | file header | page (events) | page (string data) | page (events) | page (string index) |
16
+ /// ```
17
+ ///
18
+ /// The exact encoding of a page is:
19
+ ///
20
+ /// | byte slice | contents |
21
+ /// |-------------------------|-----------------------------------------|
22
+ /// | &[0 .. 1] | page tag |
23
+ /// | &[1 .. 5] | page size as little endian u32 |
24
+ /// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) |
25
+ ///
26
+ /// A page is immediately followed by the next page, without any padding.
1
27
use parking_lot:: Mutex ;
28
+ use rustc_hash:: FxHashMap ;
29
+ use std:: cmp:: min;
2
30
use std:: convert:: TryInto ;
3
31
use std:: error:: Error ;
4
32
use std:: fmt:: Debug ;
5
33
use std:: fs;
6
34
use std:: io:: Write ;
7
35
use std:: sync:: Arc ;
8
- use std:: { cmp:: min, collections:: HashMap } ;
9
36
10
37
const MAX_PAGE_SIZE : usize = 256 * 1024 ;
38
+
39
+ /// The number of bytes we consider enough to warrant their own page when
40
+ /// deciding whether to flush a partially full buffer. Actual pages may need
41
+ /// to be smaller, e.g. when writing the tail of the data stream.
11
42
const MIN_PAGE_SIZE : usize = MAX_PAGE_SIZE / 2 ;
12
43
13
44
#[ derive( Copy , Clone , Debug , PartialEq , Eq , Hash ) ]
@@ -31,6 +62,13 @@ impl std::convert::TryFrom<u8> for PageTag {
31
62
}
32
63
}
33
64
65
+ /// An address within a data stream. Each data stream has its own address space,
66
+ /// i.e. the first piece of data written to the events stream will have
67
+ /// `Addr(0)` and the first piece of data written to the string data stream
68
+ /// will *also* have `Addr(0)`.
69
+ //
70
+ // TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to
71
+ // prevent accidental use of `Addr` values with the wrong address space.
34
72
#[ derive( Clone , Copy , Eq , PartialEq , Debug ) ]
35
73
pub struct Addr ( pub u32 ) ;
36
74
@@ -74,7 +112,8 @@ impl SerializationSinkBuilder {
74
112
}
75
113
}
76
114
77
- /// The `BackingStorage` is what the data gets written to.
115
+ /// The `BackingStorage` is what the data gets written to. Usually that is a
116
+ /// file but for testing purposes it can also be an in-memory vec of bytes.
78
117
#[ derive( Debug ) ]
79
118
enum BackingStorage {
80
119
File ( fs:: File ) ,
@@ -101,6 +140,7 @@ impl Write for BackingStorage {
101
140
}
102
141
}
103
142
143
+ /// This struct allows to treat `SerializationSink` as `std::io::Write`.
104
144
pub struct StdWriteAdapter < ' a > ( & ' a SerializationSink ) ;
105
145
106
146
impl < ' a > Write for StdWriteAdapter < ' a > {
@@ -132,23 +172,48 @@ struct SerializationSinkInner {
132
172
addr : u32 ,
133
173
}
134
174
175
+ /// This state is shared between all `SerializationSink`s writing to the same
176
+ /// backing storage (e.g. the same file).
135
177
#[ derive( Clone , Debug ) ]
136
178
struct SharedState ( Arc < Mutex < BackingStorage > > ) ;
137
179
138
180
impl SharedState {
181
+ /// Copies out the contents of all pages with the given tag and
182
+ /// concatenates them into a single byte vec. This method is only meant to
183
+ /// be used for testing and will panic if the underlying backing storage is
184
+ /// a file instead of in memory.
139
185
fn copy_bytes_with_page_tag ( & self , page_tag : PageTag ) -> Vec < u8 > {
140
186
let data = self . 0 . lock ( ) ;
141
187
let data = match * data {
142
188
BackingStorage :: File ( _) => panic ! ( ) ,
143
189
BackingStorage :: Memory ( ref data) => data,
144
190
} ;
145
191
146
- split_streams ( data) . remove ( & page_tag) . unwrap ( )
192
+ split_streams ( data) . remove ( & page_tag) . unwrap_or ( Vec :: new ( ) )
147
193
}
148
194
}
149
195
150
- pub fn split_streams ( paged_data : & [ u8 ] ) -> HashMap < PageTag , Vec < u8 > > {
151
- let mut result: HashMap < PageTag , Vec < u8 > > = HashMap :: new ( ) ;
196
+ /// This function reconstructs the individual data streams from their paged
197
+ /// version.
198
+ ///
199
+ /// For example, if `E` denotes the page header of an events page, `S` denotes
200
+ /// the header of a string data page, and lower case letters denote page
201
+ /// contents then a paged stream could look like:
202
+ ///
203
+ /// ```ignore
204
+ /// s = Eabcd_Sopq_Eef_Eghi_Srst
205
+ /// ```
206
+ ///
207
+ /// and `split_streams` would result in the following set of streams:
208
+ ///
209
+ /// ```ignore
210
+ /// split_streams(s) = {
211
+ /// events: [abcdefghi],
212
+ /// string_data: [opqrst],
213
+ /// }
214
+ /// ```
215
+ pub fn split_streams ( paged_data : & [ u8 ] ) -> FxHashMap < PageTag , Vec < u8 > > {
216
+ let mut result: FxHashMap < PageTag , Vec < u8 > > = FxHashMap :: default ( ) ;
152
217
153
218
let mut pos = 0 ;
154
219
while pos < paged_data. len ( ) {
@@ -170,13 +235,17 @@ pub fn split_streams(paged_data: &[u8]) -> HashMap<PageTag, Vec<u8>> {
170
235
}
171
236
172
237
impl SerializationSink {
173
- fn flush ( & self , buffer : & mut Vec < u8 > ) {
174
- self . write_page ( & buffer[ ..] ) ;
175
- buffer. clear ( ) ;
176
- }
177
-
238
+ /// Writes `bytes` as a single page to the shared backing storage. The
239
+ /// method will first write the page header (consisting of the page tag and
240
+ /// the number of bytes in the page) and then the page contents
241
+ /// (i.e. `bytes`).
178
242
fn write_page ( & self , bytes : & [ u8 ] ) {
179
243
if bytes. len ( ) > 0 {
244
+ // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because
245
+ // `MIN_PAGE_SIZE` is just a recommendation and the last page will
246
+ // often be smaller than that.
247
+ assert ! ( bytes. len( ) <= MAX_PAGE_SIZE ) ;
248
+
180
249
let mut file = self . shared_state . 0 . lock ( ) ;
181
250
182
251
file. write_all ( & [ self . page_tag as u8 ] ) . unwrap ( ) ;
@@ -187,9 +256,16 @@ impl SerializationSink {
187
256
}
188
257
}
189
258
190
- /// Create a copy of all data written so far. This method meant to be used
191
- /// for writing unit tests. It will panic if the underlying `BackingStorage`
192
- /// does not implement `extract_bytes`.
259
+ /// Flushes `buffer` by writing its contents as a new page to the backing
260
+ /// storage and then clearing it.
261
+ fn flush ( & self , buffer : & mut Vec < u8 > ) {
262
+ self . write_page ( & buffer[ ..] ) ;
263
+ buffer. clear ( ) ;
264
+ }
265
+
266
+ /// Creates a copy of all data written so far. This method is meant to be
267
+ /// used for writing unit tests. It will panic if the underlying
268
+ /// `BackingStorage` is a file.
193
269
pub fn into_bytes ( mut self ) -> Vec < u8 > {
194
270
// Swap out the contains of `self` with something that can safely be
195
271
// dropped without side effects.
@@ -205,11 +281,23 @@ impl SerializationSink {
205
281
addr : _,
206
282
} = data. into_inner ( ) ;
207
283
284
+ // Make sure we write the current contents of the buffer to the
285
+ // backing storage before proceeding.
208
286
self . flush ( buffer) ;
209
287
210
288
self . shared_state . copy_bytes_with_page_tag ( self . page_tag )
211
289
}
212
290
291
+ /// Atomically writes `num_bytes` of data to this `SerializationSink`.
292
+ /// Atomic means the data is guaranteed to be written as a contiguous range
293
+ /// of bytes.
294
+ ///
295
+ /// The buffer provided to the `write` callback is guaranteed to be of size
296
+ /// `num_bytes` and `write` is supposed to completely fill it with the data
297
+ /// to be written.
298
+ ///
299
+ /// The return value is the address of the data written and can be used to
300
+ /// refer to the data later on.
213
301
pub fn write_atomic < W > ( & self , num_bytes : usize , write : W ) -> Addr
214
302
where
215
303
W : FnOnce ( & mut [ u8 ] ) ,
@@ -243,7 +331,18 @@ impl SerializationSink {
243
331
Addr ( curr_addr)
244
332
}
245
333
334
+ /// Atomically writes the data in `bytes` to this `SerializationSink`.
335
+ /// Atomic means the data is guaranteed to be written as a contiguous range
336
+ /// of bytes.
337
+ ///
338
+ /// This method may perform better than `write_atomic` because it may be
339
+ /// able to skip the sink's internal buffer. Use this method if the data to
340
+ /// be written is already available as a `&[u8]`.
341
+ ///
342
+ /// The return value is the address of the data written and can be used to
343
+ /// refer to the data later on.
246
344
pub fn write_bytes_atomic ( & self , bytes : & [ u8 ] ) -> Addr {
345
+ // For "small" data we go to the buffered version immediately.
247
346
if bytes. len ( ) <= 128 {
248
347
return self . write_atomic ( bytes. len ( ) , |sink| {
249
348
sink. copy_from_slice ( bytes) ;
@@ -280,8 +379,8 @@ impl SerializationSink {
280
379
if chunk. len ( ) == MAX_PAGE_SIZE {
281
380
// This chunk has the maximum size. It might or might not be the
282
381
// last one. In either case we want to write it to disk
283
- // immediately because the is no reason to copy it to the buffer
284
- // first.
382
+ // immediately because there is no reason to copy it to the
383
+ // buffer first.
285
384
self . write_page ( chunk) ;
286
385
} else {
287
386
// This chunk is less than the chunk size that we requested, so
0 commit comments