Skip to content

Commit 4c75403

Browse files
committed
feat(query): new input format framework.
1 parent 640238d commit 4c75403

File tree

14 files changed

+1095
-0
lines changed

14 files changed

+1095
-0
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/pipeline/sources/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,22 @@ doctest = false
99
test = false
1010

1111
[dependencies]
12+
async-channel = "1.7.1"
13+
common-arrow = { path = "../../../common/arrow" }
1214
common-base = { path = "../../../common/base" }
1315
common-catalog = { path = "../../catalog" }
1416
common-datablocks = { path = "../../datablocks" }
17+
common-datavalues = { path = "../../datavalues" }
1518
common-exception = { path = "../../../common/exception" }
1619
common-formats = { path = "../../formats" }
1720
common-io = { path = "../../../common/io" }
1821
common-meta-types = { path = "../../../meta/types" }
1922
common-pipeline-core = { path = "../core" }
23+
common-settings = { path = "../../settings" }
2024
common-storage = { path = "../../../common/storage" }
2125
common-streams = { path = "../../streams" }
2226

27+
tracing = "0.1.35"
2328
async-trait = { version = "0.1.0", package = "async-trait-fn" }
2429
futures = "0.3.21"
2530
futures-util = "0.3.21"
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
use std::fmt::Debug;
2+
use std::fmt::Formatter;
3+
use std::sync::Arc;
4+
5+
use common_arrow::parquet::metadata::RowGroupMetaData;
6+
use common_datablocks::DataBlock;
7+
use common_exception::Result;
8+
use common_pipeline_core::Pipeline;
9+
use opendal::Object;
10+
11+
use crate::processors::sources::input_formats::input_context::InputContext;
12+
use crate::processors::sources::input_formats::input_format::FileInfo;
13+
use crate::processors::sources::input_formats::input_format::InputData;
14+
use crate::processors::sources::input_formats::input_format::SplitInfo;
15+
use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait;
16+
use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait;
17+
use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe;
18+
use crate::processors::sources::input_formats::InputFormat;
19+
20+
struct InputFormatParquet;
21+
22+
#[async_trait::async_trait]
23+
impl InputFormat for InputFormatParquet {
24+
async fn read_file_meta(
25+
&self,
26+
obj: &Object,
27+
size: usize,
28+
) -> Result<Option<Arc<dyn InputData>>> {
29+
todo!()
30+
}
31+
32+
async fn read_split_meta(
33+
&self,
34+
obj: &Object,
35+
split_info: &SplitInfo,
36+
) -> Result<Option<Box<dyn InputData>>> {
37+
todo!()
38+
}
39+
40+
fn split_files(&self, file_infos: Vec<FileInfo>) -> Vec<SplitInfo> {
41+
todo!()
42+
}
43+
44+
fn exec_copy(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
45+
todo!()
46+
}
47+
48+
fn exec_stream(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
49+
todo!()
50+
}
51+
}
52+
53+
pub struct ParquetFormatPipe;
54+
55+
#[async_trait::async_trait]
56+
impl InputFormatPipe for ParquetFormatPipe {
57+
type ReadBatch = ReadBatch;
58+
type RowBatch = RowGroupInMemory;
59+
type AligningState = AligningState;
60+
type BlockBuilder = ParquetBlockBuilder;
61+
}
62+
63+
pub struct SplitMeta {
64+
row_groups: Vec<RowGroupMetaData>,
65+
}
66+
67+
pub struct RowGroupInMemory {}
68+
69+
impl Debug for RowGroupInMemory {
70+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71+
write!(f, "RowGroupInMemory")
72+
}
73+
}
74+
75+
#[derive(Debug)]
76+
pub enum ReadBatch {
77+
Buffer(Vec<u8>),
78+
RowGroup(RowGroupInMemory),
79+
}
80+
81+
impl From<Vec<u8>> for ReadBatch {
82+
fn from(v: Vec<u8>) -> Self {
83+
Self::Buffer(v)
84+
}
85+
}
86+
87+
pub struct ParquetBlockBuilder {
88+
ctx: Arc<InputContext>,
89+
}
90+
91+
impl BlockBuilderTrait for ParquetBlockBuilder {
92+
type Pipe = ParquetFormatPipe;
93+
94+
fn create(ctx: Arc<InputContext>) -> Self {
95+
ParquetBlockBuilder { ctx }
96+
}
97+
98+
fn deserialize(&mut self, batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
99+
todo!()
100+
}
101+
}
102+
103+
pub struct AligningState {
104+
buffers: Vec<Vec<u8>>,
105+
}
106+
107+
impl AligningStateTrait for AligningState {
108+
type Pipe = ParquetFormatPipe;
109+
110+
fn try_create(ctx: &Arc<InputContext>, split_info: &SplitInfo) -> Result<Self> {
111+
todo!()
112+
}
113+
114+
fn align(&mut self, read_batch: Option<ReadBatch>) -> Result<Vec<RowGroupInMemory>> {
115+
todo!()
116+
}
117+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase;
2+
3+
pub struct InputFormatTSV {}
4+
5+
impl InputFormatTextBase for InputFormatTSV {}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod input_format_parquet;
2+
pub mod input_format_tsv;
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
use std::sync::Arc;
2+
3+
use common_base::base::tokio::sync::Mutex;
4+
use common_base::base::Progress;
5+
use common_datavalues::DataSchemaRef;
6+
use common_exception::ErrorCode;
7+
use common_exception::Result;
8+
use common_meta_types::StageFileCompression;
9+
use common_meta_types::StageFileFormatType;
10+
use common_meta_types::UserStageInfo;
11+
use common_settings::Settings;
12+
use opendal::io_util::CompressAlgorithm;
13+
use opendal::Operator;
14+
15+
use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV;
16+
use crate::processors::sources::input_formats::input_format::FileInfo;
17+
use crate::processors::sources::input_formats::input_format::SplitInfo;
18+
use crate::processors::sources::input_formats::input_format_text::InputFormatText;
19+
use crate::processors::sources::input_formats::InputFormat;
20+
21+
pub enum InputPlan {
22+
CopyInto(Box<CopyIntoPlan>),
23+
StreamingLoad,
24+
ClickHouseInsert,
25+
}
26+
27+
pub struct CopyIntoPlan {
28+
pub stage_info: UserStageInfo,
29+
pub files: Vec<String>,
30+
}
31+
32+
pub struct InputProgress {
33+
// todo(youngsofun): add write progress and errors
34+
scan_progress: Progress,
35+
size: usize,
36+
}
37+
38+
pub struct InputContext {
39+
pub plan: InputPlan,
40+
pub schema: DataSchemaRef,
41+
pub operator: Operator,
42+
pub format: Arc<dyn InputFormat>,
43+
pub splits: Vec<SplitInfo>,
44+
45+
// runtime config
46+
pub settings: Settings,
47+
pub read_buffer_size: usize,
48+
pub min_split_size: usize,
49+
pub min_block_size: usize,
50+
51+
pub progress_total: Mutex<Vec<Arc<InputProgress>>>,
52+
pub progress_by_file: Mutex<Vec<Arc<InputProgress>>>,
53+
}
54+
55+
impl InputContext {
56+
pub fn get_input_format(format: &StageFileFormatType) -> Result<Arc<dyn InputFormat>> {
57+
match format {
58+
StageFileFormatType::Tsv => Ok(Arc::new(InputFormatText::<InputFormatTSV>::create())),
59+
format => {
60+
Err(ErrorCode::LogicalError(format!(
61+
"Unsupported file format: {:?}",
62+
format
63+
)))
64+
}
65+
}
66+
}
67+
68+
async fn try_create_from_copy(
69+
operator: Operator,
70+
settings: Settings,
71+
schema: DataSchemaRef,
72+
plan: CopyIntoPlan,
73+
) -> Result<Self> {
74+
let format = Self::get_input_format(&plan.stage_info.file_format_options.format)?;
75+
let files = Self::get_file_infos(&format, &operator, &plan).await?;
76+
let splits = format.split_files(files);
77+
Ok(InputContext {
78+
format,
79+
schema,
80+
operator,
81+
splits,
82+
settings,
83+
read_buffer_size: 0,
84+
min_split_size: 0,
85+
min_block_size: 0,
86+
progress_total: Default::default(),
87+
plan: InputPlan::CopyInto(Box::new(plan)),
88+
progress_by_file: Default::default(),
89+
})
90+
}
91+
92+
async fn get_file_infos(
93+
format: &Arc<dyn InputFormat>,
94+
op: &Operator,
95+
plan: &CopyIntoPlan,
96+
) -> Result<Vec<FileInfo>> {
97+
let mut infos = vec![];
98+
for p in &plan.files {
99+
let obj = op.object(p);
100+
let size = obj.metadata().await?.content_length() as usize;
101+
let file_meta = format.read_file_meta(&obj, size).await?;
102+
let compress_alg = InputContext::get_compression_alg_copy(
103+
plan.stage_info.file_format_options.compression,
104+
p,
105+
)?;
106+
let info = FileInfo {
107+
path: p.clone(),
108+
size,
109+
compress_alg,
110+
file_meta,
111+
};
112+
infos.push(info)
113+
}
114+
Ok(infos)
115+
}
116+
117+
pub fn num_prefetch_splits(&self) -> Result<usize> {
118+
Ok(self.settings.get_max_threads()? as usize)
119+
}
120+
121+
pub fn num_prefetch_per_split(&self) -> usize {
122+
1
123+
}
124+
125+
pub fn get_compression_alg(&self, path: &str) -> Result<Option<CompressAlgorithm>> {
126+
let opt = match &self.plan {
127+
InputPlan::CopyInto(p) => p.stage_info.file_format_options.compression,
128+
_ => StageFileCompression::None,
129+
};
130+
Self::get_compression_alg_copy(opt, path)
131+
}
132+
133+
pub fn get_compression_alg_copy(
134+
compress_option: StageFileCompression,
135+
path: &str,
136+
) -> Result<Option<CompressAlgorithm>> {
137+
let compression_algo = match compress_option {
138+
StageFileCompression::Auto => CompressAlgorithm::from_path(path),
139+
StageFileCompression::Gzip => Some(CompressAlgorithm::Gzip),
140+
StageFileCompression::Bz2 => Some(CompressAlgorithm::Bz2),
141+
StageFileCompression::Brotli => Some(CompressAlgorithm::Brotli),
142+
StageFileCompression::Zstd => Some(CompressAlgorithm::Zstd),
143+
StageFileCompression::Deflate => Some(CompressAlgorithm::Zlib),
144+
StageFileCompression::RawDeflate => Some(CompressAlgorithm::Deflate),
145+
StageFileCompression::Xz => Some(CompressAlgorithm::Xz),
146+
StageFileCompression::Lzo => {
147+
return Err(ErrorCode::UnImplement("compress type lzo is unimplemented"));
148+
}
149+
StageFileCompression::Snappy => {
150+
return Err(ErrorCode::UnImplement(
151+
"compress type snappy is unimplemented",
152+
));
153+
}
154+
StageFileCompression::None => None,
155+
};
156+
Ok(compression_algo)
157+
}
158+
}

0 commit comments

Comments
 (0)