Skip to content

Commit de04ec6

Browse files
authored
Merge pull request #7613 from youngsofun/fmt
feat(query): unify pipeline for all inputs with format.
2 parents 3a38218 + 4effb76 commit de04ec6

File tree

26 files changed

+2573
-30
lines changed

26 files changed

+2573
-30
lines changed

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/formats/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ mod output_format_values;
2727

2828
pub use format::InputFormat;
2929
pub use format::InputState;
30+
pub use format_diagnostic::verbose_string;
3031
pub use format_factory::FormatFactory;

src/query/pipeline/sources/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,29 @@ doctest = false
99
test = false
1010

1111
[dependencies]
12+
async-channel = "1.7.1"
13+
common-arrow = { path = "../../../common/arrow" }
1214
common-base = { path = "../../../common/base" }
1315
common-catalog = { path = "../../catalog" }
1416
common-datablocks = { path = "../../datablocks" }
17+
common-datavalues = { path = "../../datavalues" }
1518
common-exception = { path = "../../../common/exception" }
1619
common-formats = { path = "../../formats" }
1720
common-io = { path = "../../../common/io" }
1821
common-meta-types = { path = "../../../meta/types" }
1922
common-pipeline-core = { path = "../core" }
23+
common-settings = { path = "../../settings" }
2024
common-storage = { path = "../../../common/storage" }
2125
common-streams = { path = "../../streams" }
2226

2327
async-trait = { version = "0.1.0", package = "async-trait-fn" }
28+
bstr = "0.2.17"
29+
crossbeam-channel = "0.5.6"
30+
csv-core = "0.1.10"
2431
futures = "0.3.21"
2532
futures-util = "0.3.21"
2633
opendal = { version = "0.17.1", features = ["layers-retry", "compress"] }
2734
parking_lot = "0.12.1"
35+
serde_json = "1.0.81"
36+
similar-asserts = "1.2.0"
37+
tracing = "0.1.35"
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_exception::ErrorCode;
16+
use common_exception::Result;
17+
18+
pub enum RecordDelimiter {
19+
Crlf,
20+
Any(u8),
21+
}
22+
23+
impl RecordDelimiter {
24+
pub fn end(&self) -> u8 {
25+
match self {
26+
RecordDelimiter::Crlf => b'\n',
27+
RecordDelimiter::Any(b) => *b,
28+
}
29+
}
30+
}
31+
32+
impl TryFrom<&str> for RecordDelimiter {
33+
type Error = ErrorCode;
34+
fn try_from(s: &str) -> Result<Self> {
35+
Self::try_from(s.as_bytes())
36+
}
37+
}
38+
39+
impl TryFrom<&[u8]> for RecordDelimiter {
40+
type Error = ErrorCode;
41+
fn try_from(s: &[u8]) -> Result<Self> {
42+
match s.len() {
43+
1 => Ok(RecordDelimiter::Any(s[0])),
44+
2 if s.eq(b"\r\n") => Ok(RecordDelimiter::Crlf),
45+
_ => Err(ErrorCode::InvalidArgument(format!(
46+
"bad RecordDelimiter: '{:?}'",
47+
s
48+
))),
49+
}
50+
}
51+
}
52+
53+
impl Default for RecordDelimiter {
54+
fn default() -> Self {
55+
RecordDelimiter::Crlf
56+
}
57+
}

0 commit comments

Comments
 (0)