Skip to content

Commit ceb2d03

Browse files
authored
fix: replace risky dma_buffer_as_vec implementations (#16829)
* dma_buffer_to_bytes Signed-off-by: coldWater <forsaken628@gmail.com> * fix Signed-off-by: coldWater <forsaken628@gmail.com> * Memory fitting Signed-off-by: coldWater <forsaken628@gmail.com> --------- Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent 2da2c8e commit ceb2d03

File tree

6 files changed

+65
-20
lines changed

6 files changed

+65
-20
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/base/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ databend-common-exception = { workspace = true }
2626
async-backtrace = { workspace = true }
2727
async-trait = { workspace = true }
2828
borsh = { workspace = true }
29+
bytes = { workspace = true }
2930
bytesize = { workspace = true }
3031
chrono = { workspace = true }
3132
ctrlc = { workspace = true }

src/common/base/src/base/dma.rs

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use std::path::Path;
2828
use std::ptr;
2929
use std::ptr::NonNull;
3030

31+
use bytes::Bytes;
3132
use rustix::fs::OFlags;
3233
use tokio::fs::File;
3334
use tokio::io::AsyncSeekExt;
@@ -116,10 +117,6 @@ impl DmaAllocator {
116117
Layout::from_size_align(layout.size(), self.0.as_usize()).unwrap()
117118
}
118119
}
119-
120-
fn real_cap(&self, cap: usize) -> usize {
121-
self.0.align_up(cap)
122-
}
123120
}
124121

125122
unsafe impl Allocator for DmaAllocator {
@@ -131,6 +128,10 @@ unsafe impl Allocator for DmaAllocator {
131128
Global {}.allocate_zeroed(self.real_layout(layout))
132129
}
133130

131+
unsafe fn deallocate(&self, ptr: std::ptr::NonNull<u8>, layout: Layout) {
132+
Global {}.deallocate(ptr, self.real_layout(layout))
133+
}
134+
134135
unsafe fn grow(
135136
&self,
136137
ptr: NonNull<u8>,
@@ -157,20 +158,38 @@ unsafe impl Allocator for DmaAllocator {
157158
)
158159
}
159160

160-
unsafe fn deallocate(&self, ptr: std::ptr::NonNull<u8>, layout: Layout) {
161-
Global {}.deallocate(ptr, self.real_layout(layout))
161+
unsafe fn shrink(
162+
&self,
163+
ptr: NonNull<u8>,
164+
old_layout: Layout,
165+
new_layout: Layout,
166+
) -> Result<NonNull<[u8]>, AllocError> {
167+
Global {}.shrink(
168+
ptr,
169+
self.real_layout(old_layout),
170+
self.real_layout(new_layout),
171+
)
162172
}
163173
}
164174

165175
type DmaBuffer = Vec<u8, DmaAllocator>;
166176

167-
pub fn dma_buffer_as_vec(mut buf: DmaBuffer) -> Vec<u8> {
168-
let ptr = buf.as_mut_ptr();
169-
let len = buf.len();
170-
let cap = buf.allocator().real_cap(buf.capacity());
171-
std::mem::forget(buf);
172-
173-
unsafe { Vec::from_raw_parts(ptr, len, cap) }
177+
pub fn dma_buffer_to_bytes(buf: DmaBuffer) -> Bytes {
178+
if buf.is_empty() {
179+
return Bytes::new();
180+
}
181+
let (ptr, len, cap, alloc) = buf.into_raw_parts_with_alloc();
182+
// Memory fitting
183+
let old_layout = Layout::from_size_align(cap, alloc.0.as_usize()).unwrap();
184+
let new_layout = Layout::from_size_align(len, std::mem::align_of::<u8>()).unwrap();
185+
let data = unsafe {
186+
let p = Global {}
187+
.shrink(NonNull::new(ptr).unwrap(), old_layout, new_layout)
188+
.unwrap();
189+
let cap = p.len();
190+
Vec::from_raw_parts(p.cast().as_mut(), len, cap)
191+
};
192+
Bytes::from(data)
174193
}
175194

176195
/// A `DmaFile` is similar to a `File`, but it is opened with the `O_DIRECT` file in order to
@@ -697,4 +716,28 @@ mod tests {
697716

698717
let _ = std::fs::remove_file(filename);
699718
}
719+
720+
#[test]
721+
fn test_dma_buffer_to_bytes() {
722+
let want = (0..10_u8).collect::<Vec<_>>();
723+
let alloc = DmaAllocator::new(Alignment::new(4096).unwrap());
724+
let mut buf = DmaBuffer::with_capacity_in(3000, alloc);
725+
buf.extend_from_slice(&want);
726+
727+
println!("{:?} {}", buf.as_ptr(), buf.capacity());
728+
buf.shrink_to_fit();
729+
println!("{:?} {}", buf.as_ptr(), buf.capacity());
730+
buf.reserve(3000 - buf.capacity());
731+
println!("{:?} {}", buf.as_ptr(), buf.capacity());
732+
733+
// let slice = buf.into_boxed_slice();
734+
// println!("{:?}", slice.as_ptr());
735+
736+
let got = dma_buffer_to_bytes(buf);
737+
println!("{:?}", got.as_ptr());
738+
assert_eq!(&want, &got);
739+
740+
let buf = got.to_vec();
741+
println!("{:?} {}", buf.as_ptr(), buf.capacity());
742+
}
700743
}

src/common/base/src/base/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ mod take_mut;
2828
mod uniq_id;
2929
mod watch_notify;
3030

31-
pub use dma::dma_buffer_as_vec;
31+
pub use dma::dma_buffer_to_bytes;
3232
pub use dma::dma_read_file;
3333
pub use dma::dma_read_file_range;
3434
pub use dma::dma_write_file_vectored;

src/common/base/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#![feature(slice_swap_unchecked)]
2626
#![feature(variant_count)]
2727
#![feature(ptr_alignment_type)]
28+
#![feature(vec_into_raw_parts)]
2829

2930
pub mod base;
3031
pub mod containers;

src/query/service/src/spillers/spiller.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ use std::ops::Range;
2020
use std::sync::Arc;
2121
use std::time::Instant;
2222

23-
use bytes::Bytes;
24-
use databend_common_base::base::dma_buffer_as_vec;
23+
use databend_common_base::base::dma_buffer_to_bytes;
2524
use databend_common_base::base::dma_read_file_range;
2625
use databend_common_base::base::Alignment;
2726
use databend_common_base::base::DmaWriteBuf;
@@ -277,7 +276,7 @@ impl Spiller {
277276
None => {
278277
let file_size = path.size();
279278
let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?;
280-
Buffer::from(dma_buffer_as_vec(buf)).slice(range)
279+
Buffer::from(dma_buffer_to_bytes(buf)).slice(range)
281280
}
282281
}
283282
}
@@ -330,7 +329,7 @@ impl Spiller {
330329
);
331330

332331
let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?;
333-
Buffer::from(dma_buffer_as_vec(buf)).slice(range)
332+
Buffer::from(dma_buffer_to_bytes(buf)).slice(range)
334333
}
335334
(Location::Local(path), Some(ref local)) => {
336335
local
@@ -371,7 +370,7 @@ impl Spiller {
371370
}
372371
None => {
373372
let (buf, range) = dma_read_file_range(path, data_range).await?;
374-
Buffer::from(dma_buffer_as_vec(buf)).slice(range)
373+
Buffer::from(dma_buffer_to_bytes(buf)).slice(range)
375374
}
376375
},
377376
Location::Remote(loc) => self.operator.read_with(loc).range(data_range).await?,
@@ -410,7 +409,7 @@ impl Spiller {
410409
let buf = buf
411410
.into_data()
412411
.into_iter()
413-
.map(|x| Bytes::from(dma_buffer_as_vec(x)))
412+
.map(dma_buffer_to_bytes)
414413
.collect::<Buffer>();
415414
let written = buf.len();
416415
writer.write(buf).await?;

0 commit comments

Comments
 (0)