Skip to content

Commit 6b094f7

Browse files
authored
Merge pull request #2779 from kinnison/fix-lowmem-lowtier
Fix low-memory issue and lower tier platforms with no sysinfo
2 parents 494b307 + 1e4de78 commit 6b094f7

File tree

4 files changed

+22
-4
lines changed

4 files changed

+22
-4
lines changed

src/diskio/immediate.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ impl Executor for ImmediateUnpacker {
136136
fn buffer_available(&self, _len: usize) -> bool {
137137
true
138138
}
139+
140+
#[cfg(test)]
141+
fn buffer_used(&self) -> usize {
142+
0
143+
}
139144
}
140145

141146
/// The non-shared state for writing a file incrementally

src/diskio/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,10 @@ pub(crate) trait Executor {
328328

329329
/// Query the memory budget to see if a particular size buffer is available
330330
fn buffer_available(&self, len: usize) -> bool;
331+
332+
#[cfg(test)]
333+
/// Query the memory budget to see how much of the buffer pool is in use
334+
fn buffer_used(&self) -> usize;
331335
}
332336

333337
/// Trivial single threaded IO to be used from executors.
@@ -418,9 +422,11 @@ pub(crate) fn write_file_incremental<P: AsRef<Path>, F: Fn(usize)>(
418422
let len = contents.len();
419423
// Length 0 vector is used for clean EOF signalling.
420424
if len == 0 {
425+
trace_scoped!("EOF_chunk", "name": path_display, "len": len);
426+
drop(contents);
427+
chunk_complete_callback(len);
421428
break;
422-
}
423-
{
429+
} else {
424430
trace_scoped!("write_segment", "name": path_display, "len": len);
425431
f.write_all(&contents)?;
426432
drop(contents);

src/diskio/test.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ fn test_incremental_file(io_threads: &str) -> Result<()> {
4848
}
4949
}
5050
// sending a zero length chunk closes the file
51-
let mut chunk = io_executor.get_buffer(0);
51+
let mut chunk = io_executor.get_buffer(super::IO_CHUNK_SIZE);
5252
chunk = chunk.finished();
5353
sender(chunk);
5454
loop {
5555
for work in io_executor.completed().collect::<Vec<_>>() {
5656
match work {
57-
super::CompletedIo::Chunk(_) => unreachable!(),
57+
super::CompletedIo::Chunk(_) => {}
5858
super::CompletedIo::Item(_) => {
5959
file_finished = true;
6060
}
@@ -69,6 +69,8 @@ fn test_incremental_file(io_threads: &str) -> Result<()> {
6969
// no more work should be outstanding
7070
unreachable!();
7171
}
72+
73+
assert_eq!(io_executor.buffer_used(), 0);
7274
Ok(())
7375
})?;
7476
// We should be able to read back the file

src/diskio/threaded.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,11 @@ impl<'a> Executor for Threaded<'a> {
350350
let total_used = Threaded::ram_highwater(&self.vec_pools);
351351
total_used + size < self.ram_budget
352352
}
353+
354+
#[cfg(test)]
355+
fn buffer_used(&self) -> usize {
356+
self.vec_pools.iter().map(|(_, p)| *p.in_use.borrow()).sum()
357+
}
353358
}
354359

355360
impl<'a> Drop for Threaded<'a> {

0 commit comments

Comments
 (0)