Skip to content

Commit 665b384

Browse files
authored
Merge pull request #88 from michaelwoerister/opt-file-sink
Optimize FileSerializationSink by using parking_lot::Mutex and avoiding heap allocations in write_atomic.
2 parents 5d01e61 + b9d0111 commit 665b384

File tree

6 files changed

+185
-57
lines changed

6 files changed

+185
-57
lines changed

analyzeme/benches/serialization_bench.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,35 @@ use measureme::{FileSerializationSink, MmapSerializationSink};
88
#[bench]
99
fn bench_file_serialization_sink(bencher: &mut test::Bencher) {
1010
bencher.iter(|| {
11-
testing_common::run_end_to_end_serialization_test::<FileSerializationSink>(
12-
"file_serialization_sink_test",
11+
testing_common::run_serialization_bench::<FileSerializationSink>(
12+
"file_serialization_sink_test", 500_000, 1
1313
);
1414
});
1515
}
1616

1717
#[bench]
1818
fn bench_mmap_serialization_sink(bencher: &mut test::Bencher) {
1919
bencher.iter(|| {
20-
testing_common::run_end_to_end_serialization_test::<MmapSerializationSink>(
21-
"mmap_serialization_sink_test",
20+
testing_common::run_serialization_bench::<MmapSerializationSink>(
21+
"mmap_serialization_sink_test", 500_000, 1
22+
);
23+
});
24+
}
25+
26+
#[bench]
27+
fn bench_file_serialization_sink_8_threads(bencher: &mut test::Bencher) {
28+
bencher.iter(|| {
29+
testing_common::run_serialization_bench::<FileSerializationSink>(
30+
"file_serialization_sink_test", 50_000, 8
31+
);
32+
});
33+
}
34+
35+
#[bench]
36+
fn bench_mmap_serialization_sink_8_threads(bencher: &mut test::Bencher) {
37+
bencher.iter(|| {
38+
testing_common::run_serialization_bench::<MmapSerializationSink>(
39+
"mmap_serialization_sink_test", 50_000, 8
2240
);
2341
});
2442
}

analyzeme/src/testing_common.rs

Lines changed: 80 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::timestamp::Timestamp;
2-
use crate::{Event, LightweightEvent, ProfilingData};
2+
use crate::{Event, ProfilingData};
33
use measureme::{Profiler, SerializationSink, StringId};
44
use rustc_hash::FxHashMap;
55
use std::borrow::Cow;
@@ -19,12 +19,16 @@ fn mk_filestem(file_name_stem: &str) -> PathBuf {
1919
}
2020

2121
// Generate some profiling data. This is the part that would run in rustc.
22-
fn generate_profiling_data<S: SerializationSink>(filestem: &Path) -> Vec<Event<'static>> {
22+
fn generate_profiling_data<S: SerializationSink>(
23+
filestem: &Path,
24+
num_stacks: usize,
25+
num_threads: usize,
26+
) -> Vec<Event<'static>> {
2327
let profiler = Arc::new(Profiler::<S>::new(Path::new(filestem)).unwrap());
2428

2529
let event_id_reserved = StringId::reserved(42);
2630

27-
let event_ids = &[
31+
let event_ids = vec![
2832
(
2933
profiler.alloc_string("Generic"),
3034
profiler.alloc_string("SomeGenericActivity"),
@@ -39,20 +43,33 @@ fn generate_profiling_data<S: SerializationSink>(filestem: &Path) -> Vec<Event<'
3943
event_ids_as_str.insert(event_ids[1].0, "Query");
4044
event_ids_as_str.insert(event_ids[1].1, "SomeQuery");
4145

42-
let mut expected_events = Vec::new();
46+
let threads: Vec<_> = (0.. num_threads).map(|thread_id| {
47+
let event_ids = event_ids.clone();
48+
let profiler = profiler.clone();
49+
let event_ids_as_str = event_ids_as_str.clone();
4350

44-
for i in 0..10_000 {
45-
// Allocate some invocation stacks
51+
std::thread::spawn(move || {
52+
let mut expected_events = Vec::new();
4653

47-
pseudo_invocation(
48-
&profiler,
49-
i,
50-
4,
51-
event_ids,
52-
&event_ids_as_str,
53-
&mut expected_events,
54-
);
55-
}
54+
for i in 0..num_stacks {
55+
// Allocate some invocation stacks
56+
57+
pseudo_invocation(
58+
&profiler,
59+
i,
60+
thread_id as u32,
61+
4,
62+
&event_ids[..],
63+
&event_ids_as_str,
64+
&mut expected_events,
65+
);
66+
}
67+
68+
expected_events
69+
})
70+
}).collect();
71+
72+
let expected_events: Vec<_> = threads.into_iter().flat_map(|t| t.join().unwrap()).collect();
5673

5774
// An example of allocating the string contents of an event id that has
5875
// already been used
@@ -67,53 +84,83 @@ fn process_profiling_data(filestem: &Path, expected_events: &[Event<'static>]) {
6784
let profiling_data = ProfilingData::new(filestem).unwrap();
6885

6986
check_profiling_data(
70-
&mut profiling_data.iter(),
87+
&mut profiling_data.iter().map(|e| e.to_event()),
7188
&mut expected_events.iter().cloned(),
7289
expected_events.len(),
7390
);
7491
check_profiling_data(
75-
&mut profiling_data.iter().rev(),
92+
&mut profiling_data.iter().rev().map(|e| e.to_event()),
7693
&mut expected_events.iter().rev().cloned(),
7794
expected_events.len(),
7895
);
7996
}
8097

8198
fn check_profiling_data(
82-
actual_lightweight_events: &mut dyn Iterator<Item = LightweightEvent<'_>>,
99+
actual_events: &mut dyn Iterator<Item = Event<'_>>,
83100
expected_events: &mut dyn Iterator<Item = Event<'_>>,
84101
num_expected_events: usize,
85102
) {
86103
let mut count = 0;
87104

105+
// This assertion makes sure that the ExactSizeIterator impl works as expected.
88106
assert_eq!(
89107
(num_expected_events, Some(num_expected_events)),
90-
actual_lightweight_events.size_hint()
108+
actual_events.size_hint()
91109
);
92110

93-
for (actual_lightweight_event, expected_event) in actual_lightweight_events.zip(expected_events) {
94-
let actual_event = actual_lightweight_event.to_event();
95-
assert_eq!(actual_event.event_kind, expected_event.event_kind);
96-
assert_eq!(actual_event.label, expected_event.label);
97-
assert_eq!(actual_event.additional_data, expected_event.additional_data);
98-
assert_eq!(
99-
actual_event.timestamp.is_instant(),
100-
expected_event.timestamp.is_instant()
101-
);
102-
103-
count += 1;
111+
let actual_events_per_thread = collect_events_per_thread(actual_events);
112+
let expected_events_per_thread = collect_events_per_thread(expected_events);
113+
114+
let thread_ids: Vec<_> = actual_events_per_thread.keys().collect();
115+
assert_eq!(thread_ids, expected_events_per_thread.keys().collect::<Vec<_>>());
116+
117+
for thread_id in thread_ids {
118+
let actual_events = &actual_events_per_thread[thread_id];
119+
let expected_events = &expected_events_per_thread[thread_id];
120+
121+
assert_eq!(actual_events.len(), expected_events.len());
122+
123+
for (actual_event, expected_event) in actual_events.iter().zip(expected_events.iter()) {
124+
assert_eq!(actual_event.event_kind, expected_event.event_kind);
125+
assert_eq!(actual_event.label, expected_event.label);
126+
assert_eq!(actual_event.additional_data, expected_event.additional_data);
127+
assert_eq!(
128+
actual_event.timestamp.is_instant(),
129+
expected_event.timestamp.is_instant()
130+
);
131+
132+
count += 1;
133+
}
104134
}
135+
105136
assert_eq!(count, num_expected_events);
106137
}
107138

108-
pub fn run_end_to_end_serialization_test<S: SerializationSink>(file_name_stem: &str) {
139+
fn collect_events_per_thread<'a>(events: &mut dyn Iterator<Item = Event<'a>>) -> FxHashMap<u32, Vec<Event<'a>>> {
140+
let mut per_thread: FxHashMap<_, _> = Default::default();
141+
142+
for event in events {
143+
per_thread.entry(event.thread_id).or_insert(Vec::new()).push(event);
144+
}
145+
146+
per_thread
147+
}
148+
149+
pub fn run_serialization_bench<S: SerializationSink>(file_name_stem: &str, num_events: usize, num_threads: usize) {
109150
let filestem = mk_filestem(file_name_stem);
110-
let expected_events = generate_profiling_data::<S>(&filestem);
151+
generate_profiling_data::<S>(&filestem, num_events, num_threads);
152+
}
153+
154+
pub fn run_end_to_end_serialization_test<S: SerializationSink>(file_name_stem: &str, num_threads: usize) {
155+
let filestem = mk_filestem(file_name_stem);
156+
let expected_events = generate_profiling_data::<S>(&filestem, 10_000, num_threads);
111157
process_profiling_data(&filestem, &expected_events);
112158
}
113159

114160
fn pseudo_invocation<S: SerializationSink>(
115161
profiler: &Profiler<S>,
116162
random: usize,
163+
thread_id: u32,
117164
recursions_left: usize,
118165
event_ids: &[(StringId, StringId)],
119166
event_ids_as_str: &FxHashMap<StringId, &'static str>,
@@ -123,15 +170,14 @@ fn pseudo_invocation<S: SerializationSink>(
123170
return;
124171
}
125172

126-
let thread_id = (random % 3) as u32;
127-
128173
let (event_kind, event_id) = event_ids[random % event_ids.len()];
129174

130175
let _prof_guard = profiler.start_recording_interval_event(event_kind, event_id, thread_id);
131176

132177
pseudo_invocation(
133178
profiler,
134179
random,
180+
thread_id,
135181
recursions_left - 1,
136182
event_ids,
137183
event_ids_as_str,

analyzeme/tests/serialization.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,21 @@ use analyzeme::testing_common::run_end_to_end_serialization_test;
22
use measureme::{FileSerializationSink, MmapSerializationSink};
33

44
#[test]
5-
fn test_file_serialization_sink() {
6-
run_end_to_end_serialization_test::<FileSerializationSink>("file_serialization_sink_test");
5+
fn test_file_serialization_sink_1_thread() {
6+
run_end_to_end_serialization_test::<FileSerializationSink>("file_serialization_sink_test_1_thread", 1);
77
}
88

99
#[test]
10-
fn test_mmap_serialization_sink() {
11-
run_end_to_end_serialization_test::<MmapSerializationSink>("mmap_serialization_sink_test");
10+
fn test_file_serialization_sink_8_threads() {
11+
run_end_to_end_serialization_test::<FileSerializationSink>("file_serialization_sink_test_8_threads", 8);
12+
}
13+
14+
#[test]
15+
fn test_mmap_serialization_sink_1_thread() {
16+
run_end_to_end_serialization_test::<MmapSerializationSink>("mmap_serialization_sink_test_1_thread", 1);
17+
}
18+
19+
#[test]
20+
fn test_mmap_serialization_sink_8_threads() {
21+
run_end_to_end_serialization_test::<MmapSerializationSink>("mmap_serialization_sink_test_8_threads", 8);
1222
}

measureme/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ travis-ci = { repository = "rust-lang/measureme" }
1515
[dependencies]
1616
byteorder = "1.2.7"
1717
rustc-hash = "1.0.1"
18+
parking_lot = "0.9"
1819

1920
[target.'cfg(not(target_arch="wasm32"))'.dependencies]
2021
memmap = "0.6.0"
Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
use crate::serialization::{Addr, SerializationSink};
22
use std::error::Error;
33
use std::fs;
4-
use std::io::{BufWriter, Write};
4+
use std::io::{Write};
55
use std::path::Path;
6-
use std::sync::Mutex;
6+
use parking_lot::Mutex;
77

88
pub struct FileSerializationSink {
9-
data: Mutex<(BufWriter<fs::File>, u32)>,
9+
data: Mutex<Inner>,
10+
}
11+
12+
struct Inner {
13+
file: fs::File,
14+
buffer: Vec<u8>,
15+
buf_pos: usize,
16+
addr: u32,
1017
}
1118

1219
impl SerializationSink for FileSerializationSink {
@@ -16,7 +23,12 @@ impl SerializationSink for FileSerializationSink {
1623
let file = fs::File::create(path)?;
1724

1825
Ok(FileSerializationSink {
19-
data: Mutex::new((BufWriter::new(file), 0)),
26+
data: Mutex::new(Inner {
27+
file,
28+
buffer: vec![0; 1024*512],
29+
buf_pos: 0,
30+
addr: 0
31+
}),
2032
})
2133
}
2234

@@ -25,17 +37,58 @@ impl SerializationSink for FileSerializationSink {
2537
where
2638
W: FnOnce(&mut [u8]),
2739
{
28-
let mut buffer = vec![0; num_bytes];
29-
write(buffer.as_mut_slice());
40+
let mut data = self.data.lock();
41+
let Inner {
42+
ref mut file,
43+
ref mut buffer,
44+
ref mut buf_pos,
45+
ref mut addr
46+
} = *data;
3047

31-
let mut data = self.data.lock().expect("couldn't acquire lock");
32-
let curr_addr = data.1;
33-
let file = &mut data.0;
48+
let curr_addr = *addr;
49+
*addr += num_bytes as u32;
3450

35-
file.write_all(&buffer).expect("failed to write buffer");
51+
let buf_start = *buf_pos;
52+
let buf_end = buf_start + num_bytes;
3653

37-
data.1 += num_bytes as u32;
54+
if buf_end <= buffer.len() {
55+
// We have enough space in the buffer, just write the data to it.
56+
write(&mut buffer[buf_start .. buf_end]);
57+
*buf_pos = buf_end;
58+
} else {
59+
// We don't have enough space in the buffer, so flush to disk
60+
file.write_all(&buffer[..buf_start]).unwrap();
61+
62+
if num_bytes <= buffer.len() {
63+
// There's enough space in the buffer, after flushing
64+
write(&mut buffer[0 .. num_bytes]);
65+
*buf_pos = num_bytes;
66+
} else {
67+
// Even after flushing the buffer there isn't enough space, so
68+
// fall back to dynamic allocation
69+
let mut temp_buffer = vec![0; num_bytes];
70+
write(&mut temp_buffer[..]);
71+
file.write_all(&temp_buffer[..]).unwrap();
72+
*buf_pos = 0;
73+
}
74+
}
3875

3976
Addr(curr_addr)
4077
}
4178
}
79+
80+
impl Drop for FileSerializationSink {
81+
fn drop(&mut self) {
82+
let mut data = self.data.lock();
83+
let Inner {
84+
ref mut file,
85+
ref mut buffer,
86+
ref mut buf_pos,
87+
addr: _,
88+
} = *data;
89+
90+
if *buf_pos > 0 {
91+
file.write_all(&buffer[..*buf_pos]).unwrap();
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)