Skip to content

Commit e1127aa

Browse files
authored
Merge pull request #97 from michaelwoerister/write_bytes_atomic
SerializationSink: Add write_bytes_atomic method.
2 parents fb50acb + a5e7381 commit e1127aa

File tree

2 files changed

+52
-8
lines changed

2 files changed

+52
-8
lines changed

measureme/src/file_serialization_sink.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::serialization::{Addr, SerializationSink};
2+
use parking_lot::Mutex;
23
use std::error::Error;
34
use std::fs;
4-
use std::io::{Write};
5+
use std::io::Write;
56
use std::path::Path;
6-
use parking_lot::Mutex;
77

88
pub struct FileSerializationSink {
99
data: Mutex<Inner>,
@@ -25,9 +25,9 @@ impl SerializationSink for FileSerializationSink {
2525
Ok(FileSerializationSink {
2626
data: Mutex::new(Inner {
2727
file,
28-
buffer: vec![0; 1024*512],
28+
buffer: vec![0; 1024 * 512],
2929
buf_pos: 0,
30-
addr: 0
30+
addr: 0,
3131
}),
3232
})
3333
}
@@ -42,7 +42,7 @@ impl SerializationSink for FileSerializationSink {
4242
ref mut file,
4343
ref mut buffer,
4444
ref mut buf_pos,
45-
ref mut addr
45+
ref mut addr,
4646
} = *data;
4747

4848
let curr_addr = *addr;
@@ -53,15 +53,15 @@ impl SerializationSink for FileSerializationSink {
5353

5454
if buf_end <= buffer.len() {
5555
// We have enough space in the buffer, just write the data to it.
56-
write(&mut buffer[buf_start .. buf_end]);
56+
write(&mut buffer[buf_start..buf_end]);
5757
*buf_pos = buf_end;
5858
} else {
5959
// We don't have enough space in the buffer, so flush to disk
6060
file.write_all(&buffer[..buf_start]).unwrap();
6161

6262
if num_bytes <= buffer.len() {
6363
// There's enough space in the buffer, after flushing
64-
write(&mut buffer[0 .. num_bytes]);
64+
write(&mut buffer[0..num_bytes]);
6565
*buf_pos = num_bytes;
6666
} else {
6767
// Even after flushing the buffer there isn't enough space, so
@@ -75,6 +75,36 @@ impl SerializationSink for FileSerializationSink {
7575

7676
Addr(curr_addr)
7777
}
78+
79+
fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr {
80+
if bytes.len() < 128 {
81+
// For "small" pieces of data, use the regular implementation so we
82+
// don't repeatedly flush an almost empty buffer to disk.
83+
return self.write_atomic(bytes.len(), |sink| sink.copy_from_slice(bytes));
84+
}
85+
86+
let mut data = self.data.lock();
87+
let Inner {
88+
ref mut file,
89+
ref mut buffer,
90+
ref mut buf_pos,
91+
ref mut addr,
92+
} = *data;
93+
94+
let curr_addr = *addr;
95+
*addr += bytes.len() as u32;
96+
97+
if *buf_pos > 0 {
98+
// There's something in the buffer, flush it to disk
99+
file.write_all(&buffer[..*buf_pos]).unwrap();
100+
*buf_pos = 0;
101+
}
102+
103+
// Now write the whole input to disk, skipping the write buffer
104+
file.write_all(bytes).unwrap();
105+
106+
Addr(curr_addr)
107+
}
78108
}
79109

80110
impl Drop for FileSerializationSink {

measureme/src/serialization.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
use parking_lot::Mutex;
12
use std::error::Error;
23
use std::path::Path;
3-
use parking_lot::Mutex;
44

55
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
66
pub struct Addr(pub u32);
@@ -14,9 +14,23 @@ impl Addr {
1414
pub trait SerializationSink: Sized + Send + Sync + 'static {
1515
fn from_path(path: &Path) -> Result<Self, Box<dyn Error>>;
1616

17+
/// Atomically write `num_bytes` to the sink. The implementation must ensure
18+
/// that concurrent invocations of `write_atomic` do not conflict with each
19+
/// other.
20+
///
21+
/// The `write` argument is a function that must fill the output buffer
22+
/// passed to it. The output buffer is guaranteed to be exactly `num_bytes`
23+
/// large.
1724
fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr
1825
where
1926
W: FnOnce(&mut [u8]);
27+
28+
/// Same as write_atomic() but might be faster in cases where bytes to be
29+
/// written are already present in a buffer (as opposed to when it is
30+
/// benefical to directly serialize into the output buffer).
31+
fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr {
32+
self.write_atomic(bytes.len(), |sink| sink.copy_from_slice(bytes))
33+
}
2034
}
2135

2236
/// A `SerializationSink` that writes to an internal `Vec<u8>` and can be

0 commit comments

Comments
 (0)