Skip to content

Commit 22362e6

Browse files
authored
refactor: spead up csv reader. (#15043)
1 parent f281dc3 commit 22362e6

File tree

1 file changed

+9
-8
lines changed
  • src/query/storages/stage/src/read/row_based/processors

1 file changed

+9
-8
lines changed

src/query/storages/stage/src/read/row_based/processors/reader.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::read::row_based::batch::BytesBatch;
3434
struct FileState {
3535
file: OneFilePartition,
3636
offset: usize,
37+
reader: opendal::Reader,
3738
}
3839
pub struct BytesReader {
3940
table_ctx: Arc<dyn TableContext>,
@@ -62,14 +63,8 @@ impl BytesReader {
6263
pub async fn read_batch(&mut self) -> Result<DataBlock> {
6364
if let Some(state) = &mut self.file_state {
6465
let end = min(self.read_batch_size + state.offset, state.file.size);
65-
let mut reader = self
66-
.op
67-
.reader_with(&state.file.path)
68-
.range((state.offset as u64)..(end as u64))
69-
.await?;
70-
7166
let mut buffer = vec![0u8; end - state.offset];
72-
let n = read_full(&mut reader, &mut buffer[0..]).await?;
67+
let n = read_full(&mut state.reader, &mut buffer[..]).await?;
7368
if n == 0 {
7469
return Err(ErrorCode::BadBytes(format!(
7570
"Unexpected EOF {} expect {} bytes, read only {} bytes.",
@@ -123,7 +118,13 @@ impl PrefetchAsyncSource for BytesReader {
123118
None => return Ok(None),
124119
};
125120
let file = OneFilePartition::from_part(&part)?.clone();
126-
self.file_state = Some(FileState { file, offset: 0 })
121+
122+
let reader = self.op.reader_with(&file.path).await?;
123+
self.file_state = Some(FileState {
124+
file,
125+
reader,
126+
offset: 0,
127+
})
127128
}
128129
match self.read_batch().await {
129130
Ok(block) => Ok(Some(block)),

0 commit comments

Comments
 (0)