Skip to content

Commit a6e9176

Browse files
committed
Stream large files during unpacking
Fixes #2632, #2145, #2564 Files over 16M are now written incrementally chunks rather than buffered in memory in one full linear buffer. This chunk size is not configurable. For threaded unpacking, the entire memory buffer will be used to buffer chunks and a single worker thread will dispatch IO operations from the buffer, so minimal performance impact should be anticipated (file size/16M round trips at worst, and most network file systems will latency hide linear writes). For immediate unpacking, each chunk is dispatched directly to disk, which may impact performance as less latency hiding is possible - but for immediate unpacking clarity of behaviour is the priority.
1 parent 84974df commit a6e9176

File tree

6 files changed

+617
-114
lines changed

6 files changed

+617
-114
lines changed

src/diskio/immediate.rs

Lines changed: 192 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,208 @@
22
///
33
/// Use for diagnosing bugs or working around any unexpected issues with the
44
/// threaded code paths.
5-
use super::{perform, Executor, Item};
5+
use std::{
6+
fmt::Debug,
7+
fs::{File, OpenOptions},
8+
io::{self, Write},
9+
path::Path,
10+
sync::{Arc, Mutex},
11+
time::Instant,
12+
};
13+
14+
use super::{CompletedIO, Executor, Item};
15+
16+
#[derive(Debug)]
17+
pub struct _IncrementalFileState {
18+
completed_chunks: Vec<usize>,
19+
err: Option<io::Result<()>>,
20+
item: Option<Item>,
21+
finished: bool,
22+
}
23+
24+
pub(super) type IncrementalFileState = Arc<Mutex<Option<_IncrementalFileState>>>;
25+
26+
#[derive(Default, Debug)]
27+
pub struct ImmediateUnpacker {
28+
incremental_state: IncrementalFileState,
29+
}
630

7-
#[derive(Default)]
8-
pub struct ImmediateUnpacker {}
931
impl ImmediateUnpacker {
1032
pub fn new() -> Self {
11-
Self {}
33+
Self {
34+
..Default::default()
35+
}
36+
}
37+
38+
fn deque(&self) -> Box<dyn Iterator<Item = CompletedIO>> {
39+
let mut guard = self.incremental_state.lock().unwrap();
40+
// incremental file in progress
41+
if let Some(ref mut state) = *guard {
42+
// Case 1: pending errors
43+
if state.finished {
44+
let mut item = state.item.take().unwrap();
45+
if state.err.is_some() {
46+
let err = state.err.take().unwrap();
47+
item.result = err;
48+
}
49+
item.finish = item
50+
.start
51+
.map(|s| Instant::now().saturating_duration_since(s));
52+
if state.finished {
53+
*guard = None;
54+
}
55+
Box::new(Some(CompletedIO::Item(item)).into_iter())
56+
} else {
57+
// Case 2: pending chunks (which might be empty)
58+
let mut completed_chunks = vec![];
59+
completed_chunks.append(&mut state.completed_chunks);
60+
Box::new(
61+
completed_chunks
62+
.into_iter()
63+
.map(|size| CompletedIO::Chunk(size)),
64+
)
65+
}
66+
} else {
67+
Box::new(None.into_iter())
68+
}
1269
}
1370
}
1471

1572
impl Executor for ImmediateUnpacker {
16-
fn dispatch(&self, mut item: Item) -> Box<dyn Iterator<Item = Item> + '_> {
17-
perform(&mut item);
18-
Box::new(Some(item).into_iter())
73+
fn dispatch(&self, mut item: Item) -> Box<dyn Iterator<Item = CompletedIO> + '_> {
74+
item.result = match &mut item.kind {
75+
super::Kind::Directory => super::create_dir(&item.full_path),
76+
super::Kind::File(ref contents) => {
77+
super::write_file(&item.full_path, &contents, item.mode)
78+
}
79+
super::Kind::IncrementalFile(_incremental_file) => {
80+
return {
81+
// If there is a pending error, return it, otherwise stash the
82+
// Item for eventual return when the file is finished.
83+
let mut guard = self.incremental_state.lock().unwrap();
84+
if let Some(ref mut state) = *guard {
85+
if state.err.is_some() {
86+
let err = state.err.take().unwrap();
87+
item.result = err;
88+
item.finish = item
89+
.start
90+
.map(|s| Instant::now().saturating_duration_since(s));
91+
*guard = None;
92+
Box::new(Some(CompletedIO::Item(item)).into_iter())
93+
} else {
94+
state.item = Some(item);
95+
Box::new(None.into_iter())
96+
}
97+
} else {
98+
unreachable!();
99+
}
100+
};
101+
}
102+
};
103+
item.finish = item
104+
.start
105+
.map(|s| Instant::now().saturating_duration_since(s));
106+
Box::new(Some(CompletedIO::Item(item)).into_iter())
107+
}
108+
109+
fn join(&mut self) -> Box<dyn Iterator<Item = CompletedIO>> {
110+
self.deque()
19111
}
20112

21-
fn join(&mut self) -> Box<dyn Iterator<Item = Item>> {
22-
Box::new(None.into_iter())
113+
fn completed(&self) -> Box<dyn Iterator<Item = CompletedIO>> {
114+
self.deque()
23115
}
24116

25-
fn completed(&self) -> Box<dyn Iterator<Item = Item>> {
26-
Box::new(None.into_iter())
117+
fn incremental_file_state(&self) -> super::IncrementalFileState {
118+
let mut state = self.incremental_state.lock().unwrap();
119+
if let Some(_) = *state {
120+
unreachable!();
121+
} else {
122+
*state = Some(_IncrementalFileState {
123+
completed_chunks: vec![],
124+
err: None,
125+
item: None,
126+
finished: false,
127+
});
128+
super::IncrementalFileState::Immediate(self.incremental_state.clone())
129+
}
130+
}
131+
}
132+
133+
/// The non-shared state for writing a file incrementally
134+
#[derive(Debug)]
135+
pub(super) struct IncrementalFileWriter {
136+
state: IncrementalFileState,
137+
file: Option<File>,
138+
path_display: String,
139+
}
140+
141+
impl IncrementalFileWriter {
142+
#[allow(unused_variables)]
143+
pub fn new<P: AsRef<Path>>(
144+
path: P,
145+
mode: u32,
146+
state: IncrementalFileState,
147+
) -> std::result::Result<Self, io::Error> {
148+
let mut opts = OpenOptions::new();
149+
#[cfg(unix)]
150+
{
151+
use std::os::unix::fs::OpenOptionsExt;
152+
opts.mode(mode);
153+
}
154+
let path = path.as_ref();
155+
let path_display = format!("{}", path.display());
156+
let file = Some({
157+
trace_scoped!("creat", "name": path_display);
158+
opts.write(true).create(true).truncate(true).open(path)?
159+
});
160+
Ok(IncrementalFileWriter {
161+
file,
162+
state,
163+
path_display,
164+
})
165+
}
166+
167+
pub fn chunk_submit(&mut self, chunk: Vec<u8>) -> bool {
168+
if (self.state.lock().unwrap()).is_none() {
169+
return false;
170+
}
171+
match self.write(chunk) {
172+
Ok(v) => v,
173+
Err(e) => {
174+
let mut state = self.state.lock().unwrap();
175+
if let Some(ref mut state) = *state {
176+
state.err.replace(Err(e));
177+
state.finished = true;
178+
false
179+
} else {
180+
false
181+
}
182+
}
183+
}
184+
}
185+
186+
fn write(&mut self, chunk: Vec<u8>) -> std::result::Result<bool, io::Error> {
187+
let mut state = self.state.lock().unwrap();
188+
if let Some(ref mut state) = *state {
189+
if let Some(ref mut file) = (&mut self.file).as_mut() {
190+
// Length 0 vector is used for clean EOF signalling.
191+
if chunk.len() == 0 {
192+
trace_scoped!("close", "name:": self.path_display);
193+
drop(std::mem::take(&mut self.file));
194+
state.finished = true;
195+
} else {
196+
trace_scoped!("write_segment", "name": self.path_display, "len": chunk.len());
197+
file.write_all(&chunk)?;
198+
199+
state.completed_chunks.push(chunk.len());
200+
}
201+
Ok(true)
202+
} else {
203+
Ok(false)
204+
}
205+
} else {
206+
unreachable!();
207+
}
27208
}
28209
}

0 commit comments

Comments
 (0)