Skip to content

Commit 998ecf0

Browse files
committed
feat(query): new input format framework.
1 parent 640238d commit 998ecf0

File tree

14 files changed

+1174
-0
lines changed

14 files changed

+1174
-0
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/pipeline/sources/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,22 @@ doctest = false
99
test = false
1010

1111
[dependencies]
12+
async-channel = "1.7.1"
13+
common-arrow = { path = "../../../common/arrow" }
1214
common-base = { path = "../../../common/base" }
1315
common-catalog = { path = "../../catalog" }
1416
common-datablocks = { path = "../../datablocks" }
17+
common-datavalues = { path = "../../datavalues" }
1518
common-exception = { path = "../../../common/exception" }
1619
common-formats = { path = "../../formats" }
1720
common-io = { path = "../../../common/io" }
1821
common-meta-types = { path = "../../../meta/types" }
1922
common-pipeline-core = { path = "../core" }
23+
common-settings = { path = "../../settings" }
2024
common-storage = { path = "../../../common/storage" }
2125
common-streams = { path = "../../streams" }
2226

27+
tracing = "0.1.35"
2328
async-trait = { version = "0.1.0", package = "async-trait-fn" }
2429
futures = "0.3.21"
2530
futures-util = "0.3.21"
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use std::fmt::Debug;
2+
use std::fmt::Formatter;
3+
use std::sync::Arc;
4+
5+
use common_arrow::parquet::metadata::RowGroupMetaData;
6+
use common_base::base::tokio::sync::mpsc::Receiver;
7+
use common_datablocks::DataBlock;
8+
use common_exception::Result;
9+
use common_pipeline_core::Pipeline;
10+
use opendal::Object;
11+
12+
use crate::processors::sources::input_formats::input_context::InputContext;
13+
use crate::processors::sources::input_formats::input_format::FileInfo;
14+
use crate::processors::sources::input_formats::input_format::InputData;
15+
use crate::processors::sources::input_formats::input_format::SplitInfo;
16+
use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait;
17+
use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait;
18+
use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe;
19+
use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch;
20+
use crate::processors::sources::input_formats::InputFormat;
21+
22+
struct InputFormatParquet;
23+
24+
#[async_trait::async_trait]
25+
impl InputFormat for InputFormatParquet {
26+
async fn read_file_meta(
27+
&self,
28+
obj: &Object,
29+
size: usize,
30+
) -> Result<Option<Arc<dyn InputData>>> {
31+
todo!()
32+
}
33+
34+
async fn read_split_meta(
35+
&self,
36+
obj: &Object,
37+
split_info: &SplitInfo,
38+
) -> Result<Option<Box<dyn InputData>>> {
39+
todo!()
40+
}
41+
42+
fn split_files(&self, file_infos: Vec<FileInfo>) -> Vec<SplitInfo> {
43+
todo!()
44+
}
45+
46+
fn exec_copy(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
47+
todo!()
48+
}
49+
50+
fn exec_stream(
51+
&self,
52+
ctx: Arc<InputContext>,
53+
pipeline: &mut Pipeline,
54+
input: Receiver<StreamingReadBatch>,
55+
) -> Result<()> {
56+
todo!()
57+
}
58+
}
59+
60+
pub struct ParquetFormatPipe;
61+
62+
#[async_trait::async_trait]
63+
impl InputFormatPipe for ParquetFormatPipe {
64+
type ReadBatch = ReadBatch;
65+
type RowBatch = RowGroupInMemory;
66+
type AligningState = AligningState;
67+
type BlockBuilder = ParquetBlockBuilder;
68+
}
69+
70+
pub struct SplitMeta {
71+
row_groups: Vec<RowGroupMetaData>,
72+
}
73+
74+
pub struct RowGroupInMemory {}
75+
76+
impl Debug for RowGroupInMemory {
77+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78+
write!(f, "RowGroupInMemory")
79+
}
80+
}
81+
82+
#[derive(Debug)]
83+
pub enum ReadBatch {
84+
Buffer(Vec<u8>),
85+
RowGroup(RowGroupInMemory),
86+
}
87+
88+
impl From<Vec<u8>> for ReadBatch {
89+
fn from(v: Vec<u8>) -> Self {
90+
Self::Buffer(v)
91+
}
92+
}
93+
94+
pub struct ParquetBlockBuilder {
95+
ctx: Arc<InputContext>,
96+
}
97+
98+
impl BlockBuilderTrait for ParquetBlockBuilder {
99+
type Pipe = ParquetFormatPipe;
100+
101+
fn create(ctx: Arc<InputContext>) -> Self {
102+
ParquetBlockBuilder { ctx }
103+
}
104+
105+
fn deserialize(&mut self, batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
106+
todo!()
107+
}
108+
}
109+
110+
pub struct AligningState {
111+
buffers: Vec<Vec<u8>>,
112+
}
113+
114+
impl AligningStateTrait for AligningState {
115+
type Pipe = ParquetFormatPipe;
116+
117+
fn try_create(ctx: &Arc<InputContext>, split_info: &SplitInfo) -> Result<Self> {
118+
todo!()
119+
}
120+
121+
fn align(&mut self, read_batch: Option<ReadBatch>) -> Result<Vec<RowGroupInMemory>> {
122+
todo!()
123+
}
124+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use common_datablocks::DataBlock;
2+
use common_exception::Result;
3+
4+
use crate::processors::sources::input_formats::input_format_text::AligningState;
5+
use crate::processors::sources::input_formats::input_format_text::BlockBuilder;
6+
use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase;
7+
use crate::processors::sources::input_formats::input_format_text::RowBatch;
8+
9+
pub struct InputFormatTSV {}
10+
11+
impl InputFormatTextBase for InputFormatTSV {
12+
fn deserialize(
13+
builder: &mut BlockBuilder<Self>,
14+
batch: Option<RowBatch>,
15+
) -> Result<Vec<DataBlock>> {
16+
todo!()
17+
}
18+
19+
fn align(state: &mut AligningState<Self>, buf: &[u8]) -> Result<Vec<RowBatch>> {
20+
todo!()
21+
}
22+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod input_format_parquet;
2+
pub mod input_format_tsv;
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
use std::sync::Arc;
2+
3+
use common_base::base::tokio::sync::Mutex;
4+
use common_base::base::Progress;
5+
use common_datavalues::DataSchemaRef;
6+
use common_exception::ErrorCode;
7+
use common_exception::Result;
8+
use common_meta_types::StageFileCompression;
9+
use common_meta_types::StageFileFormatType;
10+
use common_meta_types::UserStageInfo;
11+
use common_settings::Settings;
12+
use opendal::io_util::CompressAlgorithm;
13+
use opendal::Operator;
14+
15+
use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV;
16+
use crate::processors::sources::input_formats::input_format::FileInfo;
17+
use crate::processors::sources::input_formats::input_format::SplitInfo;
18+
use crate::processors::sources::input_formats::input_format_text::InputFormatText;
19+
use crate::processors::sources::input_formats::InputFormat;
20+
21+
pub enum InputPlan {
22+
CopyInto(Box<CopyIntoPlan>),
23+
StreamingLoad,
24+
ClickHouseInsert,
25+
}
26+
27+
pub struct CopyIntoPlan {
28+
pub stage_info: UserStageInfo,
29+
pub files: Vec<String>,
30+
}
31+
32+
pub struct InputProgress {
33+
// todo(youngsofun): add write progress and errors
34+
scan_progress: Progress,
35+
size: usize,
36+
}
37+
38+
pub struct InputContext {
39+
pub plan: InputPlan,
40+
pub schema: DataSchemaRef,
41+
pub operator: Operator,
42+
pub format: Arc<dyn InputFormat>,
43+
pub splits: Vec<SplitInfo>,
44+
45+
// runtime config
46+
pub settings: Settings,
47+
pub read_buffer_size: usize,
48+
pub min_split_size: usize,
49+
pub min_block_size: usize,
50+
51+
pub progress_total: Mutex<Vec<Arc<InputProgress>>>,
52+
pub progress_by_file: Mutex<Vec<Arc<InputProgress>>>,
53+
}
54+
55+
impl InputContext {
56+
pub fn get_input_format(format: &StageFileFormatType) -> Result<Arc<dyn InputFormat>> {
57+
match format {
58+
StageFileFormatType::Tsv => Ok(Arc::new(InputFormatText::<InputFormatTSV>::create())),
59+
format => Err(ErrorCode::LogicalError(format!(
60+
"Unsupported file format: {:?}",
61+
format
62+
))),
63+
}
64+
}
65+
66+
async fn try_create_from_copy(
67+
operator: Operator,
68+
settings: Settings,
69+
schema: DataSchemaRef,
70+
plan: CopyIntoPlan,
71+
) -> Result<Self> {
72+
let format = Self::get_input_format(&plan.stage_info.file_format_options.format)?;
73+
let files = Self::get_file_infos(&format, &operator, &plan).await?;
74+
let splits = format.split_files(files);
75+
Ok(InputContext {
76+
format,
77+
schema,
78+
operator,
79+
splits,
80+
settings,
81+
read_buffer_size: 0,
82+
min_split_size: 0,
83+
min_block_size: 0,
84+
progress_total: Default::default(),
85+
plan: InputPlan::CopyInto(Box::new(plan)),
86+
progress_by_file: Default::default(),
87+
})
88+
}
89+
90+
async fn get_file_infos(
91+
format: &Arc<dyn InputFormat>,
92+
op: &Operator,
93+
plan: &CopyIntoPlan,
94+
) -> Result<Vec<FileInfo>> {
95+
let mut infos = vec![];
96+
for p in &plan.files {
97+
let obj = op.object(p);
98+
let size = obj.metadata().await?.content_length() as usize;
99+
let file_meta = format.read_file_meta(&obj, size).await?;
100+
let compress_alg = InputContext::get_compression_alg_copy(
101+
plan.stage_info.file_format_options.compression,
102+
p,
103+
)?;
104+
let info = FileInfo {
105+
path: p.clone(),
106+
size,
107+
compress_alg,
108+
file_meta,
109+
};
110+
infos.push(info)
111+
}
112+
Ok(infos)
113+
}
114+
115+
pub fn num_prefetch_splits(&self) -> Result<usize> {
116+
Ok(self.settings.get_max_threads()? as usize)
117+
}
118+
119+
pub fn num_prefetch_per_split(&self) -> usize {
120+
1
121+
}
122+
123+
pub fn get_compression_alg(&self, path: &str) -> Result<Option<CompressAlgorithm>> {
124+
let opt = match &self.plan {
125+
InputPlan::CopyInto(p) => p.stage_info.file_format_options.compression,
126+
_ => StageFileCompression::None,
127+
};
128+
Self::get_compression_alg_copy(opt, path)
129+
}
130+
131+
pub fn get_compression_alg_copy(
132+
compress_option: StageFileCompression,
133+
path: &str,
134+
) -> Result<Option<CompressAlgorithm>> {
135+
let compression_algo = match compress_option {
136+
StageFileCompression::Auto => CompressAlgorithm::from_path(path),
137+
StageFileCompression::Gzip => Some(CompressAlgorithm::Gzip),
138+
StageFileCompression::Bz2 => Some(CompressAlgorithm::Bz2),
139+
StageFileCompression::Brotli => Some(CompressAlgorithm::Brotli),
140+
StageFileCompression::Zstd => Some(CompressAlgorithm::Zstd),
141+
StageFileCompression::Deflate => Some(CompressAlgorithm::Zlib),
142+
StageFileCompression::RawDeflate => Some(CompressAlgorithm::Deflate),
143+
StageFileCompression::Xz => Some(CompressAlgorithm::Xz),
144+
StageFileCompression::Lzo => {
145+
return Err(ErrorCode::UnImplement("compress type lzo is unimplemented"));
146+
}
147+
StageFileCompression::Snappy => {
148+
return Err(ErrorCode::UnImplement(
149+
"compress type snappy is unimplemented",
150+
));
151+
}
152+
StageFileCompression::None => None,
153+
};
154+
Ok(compression_algo)
155+
}
156+
}

0 commit comments

Comments
 (0)