Skip to content

Commit 7a958b8

Browse files
committed
feat(query): new input format framework.
1 parent 640238d commit 7a958b8

File tree

14 files changed

+1177
-0
lines changed

14 files changed

+1177
-0
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/pipeline/sources/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,22 @@ doctest = false
99
test = false
1010

1111
[dependencies]
12+
async-channel = "1.7.1"
13+
common-arrow = { path = "../../../common/arrow" }
1214
common-base = { path = "../../../common/base" }
1315
common-catalog = { path = "../../catalog" }
1416
common-datablocks = { path = "../../datablocks" }
17+
common-datavalues = { path = "../../datavalues" }
1518
common-exception = { path = "../../../common/exception" }
1619
common-formats = { path = "../../formats" }
1720
common-io = { path = "../../../common/io" }
1821
common-meta-types = { path = "../../../meta/types" }
1922
common-pipeline-core = { path = "../core" }
23+
common-settings = { path = "../../settings" }
2024
common-storage = { path = "../../../common/storage" }
2125
common-streams = { path = "../../streams" }
2226

27+
tracing = "0.1.35"
2328
async-trait = { version = "0.1.0", package = "async-trait-fn" }
2429
futures = "0.3.21"
2530
futures-util = "0.3.21"
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use std::fmt::Debug;
2+
use std::fmt::Formatter;
3+
use std::sync::Arc;
4+
5+
use common_arrow::parquet::metadata::RowGroupMetaData;
6+
use common_base::base::tokio::sync::mpsc::Receiver;
7+
use common_datablocks::DataBlock;
8+
use common_exception::Result;
9+
use common_pipeline_core::Pipeline;
10+
use opendal::Object;
11+
12+
use crate::processors::sources::input_formats::input_context::InputContext;
13+
use crate::processors::sources::input_formats::input_format::FileInfo;
14+
use crate::processors::sources::input_formats::input_format::InputData;
15+
use crate::processors::sources::input_formats::input_format::SplitInfo;
16+
use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait;
17+
use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait;
18+
use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe;
19+
use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch;
20+
use crate::processors::sources::input_formats::InputFormat;
21+
22+
struct InputFormatParquet;
23+
24+
#[async_trait::async_trait]
25+
impl InputFormat for InputFormatParquet {
26+
async fn read_file_meta(
27+
&self,
28+
obj: &Object,
29+
size: usize,
30+
) -> Result<Option<Arc<dyn InputData>>> {
31+
todo!()
32+
}
33+
34+
async fn read_split_meta(
35+
&self,
36+
obj: &Object,
37+
split_info: &SplitInfo,
38+
) -> Result<Option<Box<dyn InputData>>> {
39+
todo!()
40+
}
41+
42+
fn split_files(&self, file_infos: Vec<FileInfo>) -> Vec<SplitInfo> {
43+
todo!()
44+
}
45+
46+
fn exec_copy(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
47+
todo!()
48+
}
49+
50+
fn exec_stream(
51+
&self,
52+
ctx: Arc<InputContext>,
53+
pipeline: &mut Pipeline,
54+
input: Receiver<StreamingReadBatch>,
55+
) -> Result<()> {
56+
todo!()
57+
}
58+
}
59+
60+
pub struct ParquetFormatPipe;
61+
62+
#[async_trait::async_trait]
63+
impl InputFormatPipe for ParquetFormatPipe {
64+
type ReadBatch = ReadBatch;
65+
type RowBatch = RowGroupInMemory;
66+
type AligningState = AligningState;
67+
type BlockBuilder = ParquetBlockBuilder;
68+
}
69+
70+
pub struct SplitMeta {
71+
row_groups: Vec<RowGroupMetaData>,
72+
}
73+
74+
pub struct RowGroupInMemory {}
75+
76+
impl Debug for RowGroupInMemory {
77+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78+
write!(f, "RowGroupInMemory")
79+
}
80+
}
81+
82+
#[derive(Debug)]
83+
pub enum ReadBatch {
84+
Buffer(Vec<u8>),
85+
RowGroup(RowGroupInMemory),
86+
}
87+
88+
impl From<Vec<u8>> for ReadBatch {
89+
fn from(v: Vec<u8>) -> Self {
90+
Self::Buffer(v)
91+
}
92+
}
93+
94+
pub struct ParquetBlockBuilder {
95+
ctx: Arc<InputContext>,
96+
}
97+
98+
impl BlockBuilderTrait for ParquetBlockBuilder {
99+
type Pipe = ParquetFormatPipe;
100+
101+
fn create(ctx: Arc<InputContext>) -> Self {
102+
ParquetBlockBuilder { ctx }
103+
}
104+
105+
fn deserialize(&mut self, batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
106+
todo!()
107+
}
108+
}
109+
110+
pub struct AligningState {
111+
buffers: Vec<Vec<u8>>,
112+
}
113+
114+
impl AligningStateTrait for AligningState {
115+
type Pipe = ParquetFormatPipe;
116+
117+
fn try_create(ctx: &Arc<InputContext>, split_info: &SplitInfo) -> Result<Self> {
118+
todo!()
119+
}
120+
121+
fn align(&mut self, read_batch: Option<ReadBatch>) -> Result<Vec<RowGroupInMemory>> {
122+
todo!()
123+
}
124+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use common_datablocks::DataBlock;
2+
use common_exception::Result;
3+
4+
use crate::processors::sources::input_formats::input_format_text::AligningState;
5+
use crate::processors::sources::input_formats::input_format_text::BlockBuilder;
6+
use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase;
7+
use crate::processors::sources::input_formats::input_format_text::RowBatch;
8+
9+
pub struct InputFormatTSV {}
10+
11+
impl InputFormatTextBase for InputFormatTSV {
12+
fn deserialize(
13+
builder: &mut BlockBuilder<Self>,
14+
batch: Option<RowBatch>,
15+
) -> Result<Vec<DataBlock>> {
16+
todo!()
17+
}
18+
19+
fn align(state: &mut AligningState<Self>, buf: &[u8]) -> Result<Vec<RowBatch>> {
20+
todo!()
21+
}
22+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod input_format_parquet;
2+
pub mod input_format_tsv;
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use std::sync::Arc;
2+
3+
use common_base::base::tokio::sync::Mutex;
4+
use common_base::base::Progress;
5+
use common_datavalues::DataSchemaRef;
6+
use common_exception::ErrorCode;
7+
use common_exception::Result;
8+
use common_meta_types::StageFileCompression;
9+
use common_meta_types::StageFileFormatType;
10+
use common_meta_types::UserStageInfo;
11+
use common_settings::Settings;
12+
use opendal::io_util::CompressAlgorithm;
13+
use opendal::Operator;
14+
15+
use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV;
16+
use crate::processors::sources::input_formats::input_format::FileInfo;
17+
use crate::processors::sources::input_formats::input_format::SplitInfo;
18+
use crate::processors::sources::input_formats::input_format_text::InputFormatText;
19+
use crate::processors::sources::input_formats::InputFormat;
20+
21+
pub enum InputPlan {
22+
CopyInto(Box<CopyIntoPlan>),
23+
StreamingLoad,
24+
ClickHouseInsert,
25+
}
26+
27+
pub struct CopyIntoPlan {
28+
pub stage_info: UserStageInfo,
29+
pub files: Vec<String>,
30+
}
31+
32+
pub struct InputProgress {
33+
// todo(youngsofun): add write progress and errors
34+
scan_progress: Progress,
35+
size: usize,
36+
}
37+
38+
pub struct InputContext {
39+
pub plan: InputPlan,
40+
pub schema: DataSchemaRef,
41+
pub operator: Operator,
42+
pub format: Arc<dyn InputFormat>,
43+
pub splits: Vec<SplitInfo>,
44+
45+
// runtime config
46+
pub settings: Settings,
47+
48+
pub rows_to_skip: usize,
49+
pub read_batch_size: usize,
50+
pub split_size: usize,
51+
pub rows_per_block: usize,
52+
53+
pub progress_total: Mutex<Vec<Arc<InputProgress>>>,
54+
pub progress_by_file: Mutex<Vec<Arc<InputProgress>>>,
55+
}
56+
57+
impl InputContext {
58+
pub fn get_input_format(format: &StageFileFormatType) -> Result<Arc<dyn InputFormat>> {
59+
match format {
60+
StageFileFormatType::Tsv => Ok(Arc::new(InputFormatText::<InputFormatTSV>::create())),
61+
format => Err(ErrorCode::LogicalError(format!(
62+
"Unsupported file format: {:?}",
63+
format
64+
))),
65+
}
66+
}
67+
68+
async fn try_create_from_copy(
69+
operator: Operator,
70+
settings: Settings,
71+
schema: DataSchemaRef,
72+
plan: CopyIntoPlan,
73+
) -> Result<Self> {
74+
let format = Self::get_input_format(&plan.stage_info.file_format_options.format)?;
75+
let files = Self::get_file_infos(&format, &operator, &plan).await?;
76+
let splits = format.split_files(files);
77+
Ok(InputContext {
78+
format,
79+
schema,
80+
operator,
81+
splits,
82+
settings,
83+
rows_to_skip: 0,
84+
read_batch_size: 0,
85+
split_size: 0,
86+
rows_per_block: 0,
87+
progress_total: Default::default(),
88+
plan: InputPlan::CopyInto(Box::new(plan)),
89+
progress_by_file: Default::default(),
90+
})
91+
}
92+
93+
async fn get_file_infos(
94+
format: &Arc<dyn InputFormat>,
95+
op: &Operator,
96+
plan: &CopyIntoPlan,
97+
) -> Result<Vec<FileInfo>> {
98+
let mut infos = vec![];
99+
for p in &plan.files {
100+
let obj = op.object(p);
101+
let size = obj.metadata().await?.content_length() as usize;
102+
let file_meta = format.read_file_meta(&obj, size).await?;
103+
let compress_alg = InputContext::get_compression_alg_copy(
104+
plan.stage_info.file_format_options.compression,
105+
p,
106+
)?;
107+
let info = FileInfo {
108+
path: p.clone(),
109+
size,
110+
compress_alg,
111+
file_meta,
112+
};
113+
infos.push(info)
114+
}
115+
Ok(infos)
116+
}
117+
118+
pub fn num_prefetch_splits(&self) -> Result<usize> {
119+
Ok(self.settings.get_max_threads()? as usize)
120+
}
121+
122+
pub fn num_prefetch_per_split(&self) -> usize {
123+
1
124+
}
125+
126+
pub fn get_compression_alg(&self, path: &str) -> Result<Option<CompressAlgorithm>> {
127+
let opt = match &self.plan {
128+
InputPlan::CopyInto(p) => p.stage_info.file_format_options.compression,
129+
_ => StageFileCompression::None,
130+
};
131+
Self::get_compression_alg_copy(opt, path)
132+
}
133+
134+
pub fn get_compression_alg_copy(
135+
compress_option: StageFileCompression,
136+
path: &str,
137+
) -> Result<Option<CompressAlgorithm>> {
138+
let compression_algo = match compress_option {
139+
StageFileCompression::Auto => CompressAlgorithm::from_path(path),
140+
StageFileCompression::Gzip => Some(CompressAlgorithm::Gzip),
141+
StageFileCompression::Bz2 => Some(CompressAlgorithm::Bz2),
142+
StageFileCompression::Brotli => Some(CompressAlgorithm::Brotli),
143+
StageFileCompression::Zstd => Some(CompressAlgorithm::Zstd),
144+
StageFileCompression::Deflate => Some(CompressAlgorithm::Zlib),
145+
StageFileCompression::RawDeflate => Some(CompressAlgorithm::Deflate),
146+
StageFileCompression::Xz => Some(CompressAlgorithm::Xz),
147+
StageFileCompression::Lzo => {
148+
return Err(ErrorCode::UnImplement("compress type lzo is unimplemented"));
149+
}
150+
StageFileCompression::Snappy => {
151+
return Err(ErrorCode::UnImplement(
152+
"compress type snappy is unimplemented",
153+
));
154+
}
155+
StageFileCompression::None => None,
156+
};
157+
Ok(compression_algo)
158+
}
159+
}

0 commit comments

Comments
 (0)