Skip to content

chore(query): Introduce enable_extended_json_syntax setting to control extended JSON syntax #18313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9954bff" }
display-more = { git = "https://github.com/databendlabs/display-more", tag = "v0.2.0" }
jsonb = { git = "https://github.com/b41sh/jsonb", rev = "6a34a381329a45f6cdf77136e5e6e468deed2762" }
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.2.3" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.9" }
Expand Down
2 changes: 2 additions & 0 deletions src/common/io/src/format_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct FormatSettings {
pub geometry_format: GeometryDataType,
pub enable_dst_hour_fix: bool,
pub format_null_as_str: bool,
pub enable_extended_json_syntax: bool,
}

// only used for tests
Expand All @@ -35,6 +36,7 @@ impl Default for FormatSettings {
geometry_format: GeometryDataType::default(),
enable_dst_hour_fix: false,
format_null_as_str: false,
enable_extended_json_syntax: false,
}
}
}
2 changes: 2 additions & 0 deletions src/query/expression/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ pub struct FunctionContext {
pub geometry_output_format: GeometryDataType,
pub parse_datetime_ignore_remainder: bool,
pub enable_strict_datetime_parser: bool,
pub enable_extended_json_syntax: bool,
pub random_function_seed: bool,
pub week_start: u8,
pub date_format_style: String,
Expand All @@ -194,6 +195,7 @@ impl Default for FunctionContext {
geometry_output_format: Default::default(),
parse_datetime_ignore_remainder: false,
enable_strict_datetime_parser: true,
enable_extended_json_syntax: false,
random_function_seed: false,
week_start: 0,
date_format_style: "oracle".to_string(),
Expand Down
125 changes: 86 additions & 39 deletions src/query/expression/src/types/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use super::number::NumberScalar;
use super::timestamp::timestamp_to_string;
use super::AccessType;
use crate::property::Domain;
use crate::types::interval::interval_to_string;
use crate::types::map::KvPair;
use crate::types::AnyType;
use crate::types::ArgType;
Expand Down Expand Up @@ -224,6 +225,7 @@ impl VariantType {
pub fn cast_scalar_to_variant(
scalar: ScalarRef,
tz: &TimeZone,
enable_extended: bool,
buf: &mut Vec<u8>,
table_data_type: Option<&TableDataType>,
) {
Expand All @@ -243,49 +245,77 @@ pub fn cast_scalar_to_variant(
NumberScalar::Float32(n) => n.0.into(),
NumberScalar::Float64(n) => n.0.into(),
},
ScalarRef::Decimal(x) => match x {
DecimalScalar::Decimal64(value, size) => {
let dec = jsonb::Decimal64 {
scale: size.scale(),
value,
};
jsonb::Value::Number(jsonb::Number::Decimal64(dec))
}
DecimalScalar::Decimal128(value, size) => {
let dec = jsonb::Decimal128 {
scale: size.scale(),
value,
};
jsonb::Value::Number(jsonb::Number::Decimal128(dec))
}
DecimalScalar::Decimal256(value, size) => {
let dec = jsonb::Decimal256 {
scale: size.scale(),
value: value.0,
};
jsonb::Value::Number(jsonb::Number::Decimal256(dec))
ScalarRef::Decimal(x) => {
if enable_extended {
match x {
DecimalScalar::Decimal64(value, size) => {
let dec = jsonb::Decimal64 {
scale: size.scale(),
value,
};
jsonb::Value::Number(jsonb::Number::Decimal64(dec))
}
DecimalScalar::Decimal128(value, size) => {
let dec = jsonb::Decimal128 {
scale: size.scale(),
value,
};
jsonb::Value::Number(jsonb::Number::Decimal128(dec))
}
DecimalScalar::Decimal256(value, size) => {
let dec = jsonb::Decimal256 {
scale: size.scale(),
value: value.0,
};
jsonb::Value::Number(jsonb::Number::Decimal256(dec))
}
}
} else {
x.to_float64().into()
}
},
}
ScalarRef::Boolean(b) => jsonb::Value::Bool(b),
ScalarRef::Binary(s) => jsonb::Value::Binary(s),
ScalarRef::Binary(s) => {
if enable_extended {
jsonb::Value::Binary(s)
} else {
jsonb::Value::String(hex::encode_upper(s).into())
}
}
ScalarRef::String(s) => jsonb::Value::String(s.into()),
ScalarRef::Timestamp(ts) => jsonb::Value::Timestamp(jsonb::Timestamp { value: ts }),
ScalarRef::Date(d) => jsonb::Value::Date(jsonb::Date { value: d }),
ScalarRef::Timestamp(ts) => {
if enable_extended {
jsonb::Value::Timestamp(jsonb::Timestamp { value: ts })
} else {
timestamp_to_string(ts, tz).to_string().into()
}
}
ScalarRef::Date(d) => {
if enable_extended {
jsonb::Value::Date(jsonb::Date { value: d })
} else {
date_to_string(d, tz).to_string().into()
}
}
ScalarRef::Interval(i) => {
let interval = jsonb::Interval {
months: i.months(),
days: i.days(),
micros: i.microseconds(),
};
jsonb::Value::Interval(interval)
if enable_extended {
let interval = jsonb::Interval {
months: i.months(),
days: i.days(),
micros: i.microseconds(),
};
jsonb::Value::Interval(interval)
} else {
interval_to_string(&i).to_string().into()
}
}
ScalarRef::Array(col) => {
let typ = if let Some(TableDataType::Array(typ)) = table_data_type {
Some(typ.remove_nullable())
} else {
None
};
let items = cast_scalars_to_variants(col.iter(), tz, typ.as_ref());
let items = cast_scalars_to_variants(col.iter(), tz, enable_extended, typ.as_ref());
let owned_jsonb = OwnedJsonb::build_array(items.iter().map(RawJsonb::new))
.expect("failed to build jsonb array");
buf.extend_from_slice(owned_jsonb.as_ref());
Expand All @@ -311,7 +341,7 @@ pub fn cast_scalar_to_variant(
_ => unreachable!(),
};
let mut val = vec![];
cast_scalar_to_variant(v, tz, &mut val, typ.as_ref());
cast_scalar_to_variant(v, tz, enable_extended, &mut val, typ.as_ref());
kvs.insert(key, val);
}
let owned_jsonb =
Expand Down Expand Up @@ -344,6 +374,7 @@ pub fn cast_scalar_to_variant(
cast_scalar_to_variant(
scalar,
tz,
enable_extended,
&mut builder.data,
Some(&typ.remove_nullable()),
);
Expand All @@ -359,7 +390,7 @@ pub fn cast_scalar_to_variant(
.expect("failed to build jsonb object from tuple")
}
_ => {
let values = cast_scalars_to_variants(fields, tz, None);
let values = cast_scalars_to_variants(fields, tz, enable_extended, None);
OwnedJsonb::build_object(
values
.iter()
Expand All @@ -378,16 +409,24 @@ pub fn cast_scalar_to_variant(
}
ScalarRef::Geometry(bytes) => {
let geom = Ewkb(bytes).to_json().expect("failed to decode wkb data");
jsonb::parse_value(geom.as_bytes())
.expect("failed to parse geojson to json value")
let res = if enable_extended {
jsonb::parse_value(geom.as_bytes())
} else {
jsonb::parse_value_standard_mode(geom.as_bytes())
};
res.expect("failed to parse geojson to json value")
.write_to_vec(buf);
return;
}
ScalarRef::Geography(bytes) => {
// todo: Implement direct conversion, omitting intermediate processes
let geom = Ewkb(bytes.0).to_json().expect("failed to decode wkb data");
jsonb::parse_value(geom.as_bytes())
.expect("failed to parse geojson to json value")
let res = if enable_extended {
jsonb::parse_value(geom.as_bytes())
} else {
jsonb::parse_value_standard_mode(geom.as_bytes())
};
res.expect("failed to parse geojson to json value")
.write_to_vec(buf);
return;
}
Expand All @@ -397,6 +436,7 @@ pub fn cast_scalar_to_variant(
vals.iter()
.map(|n| ScalarRef::Number(NumberScalar::NUM_TYPE(*n))),
tz,
enable_extended,
None,
);
let owned_jsonb = OwnedJsonb::build_array(items.iter().map(RawJsonb::new))
Expand All @@ -412,12 +452,19 @@ pub fn cast_scalar_to_variant(
pub fn cast_scalars_to_variants(
scalars: impl IntoIterator<Item = ScalarRef>,
tz: &TimeZone,
enable_extended: bool,
table_data_type: Option<&TableDataType>,
) -> BinaryColumn {
let iter = scalars.into_iter();
let mut builder = BinaryColumnBuilder::with_capacity(iter.size_hint().0, 0);
for scalar in iter {
cast_scalar_to_variant(scalar, tz, &mut builder.data, table_data_type);
cast_scalar_to_variant(
scalar,
tz,
enable_extended,
&mut builder.data,
table_data_type,
);
builder.commit_row();
}
builder.build()
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/src/utils/variant_transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use jsonb::parse_value;
use jsonb::parse_value_standard_mode;
use jsonb::RawJsonb;

use crate::types::AnyType;
Expand Down Expand Up @@ -101,7 +101,7 @@ fn transform_scalar(scalar: ScalarRef<'_>, decode: bool) -> Result<Scalar> {
let raw_jsonb = RawJsonb::new(data);
Scalar::Variant(raw_jsonb.to_string().into_bytes())
} else {
let value = parse_value(data).map_err(|err| {
let value = parse_value_standard_mode(data).map_err(|err| {
ErrorCode::UDFDataError(format!("parse json value error: {err}"))
})?;
Scalar::Variant(value.to_vec())
Expand Down
1 change: 1 addition & 0 deletions src/query/formats/src/common_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct InputCommonSettings {
pub binary_format: BinaryFormat,
pub is_rounding_mode: bool,
pub enable_dst_hour_fix: bool,
pub enable_extended_json_syntax: bool,
}

#[derive(Clone)]
Expand Down
9 changes: 8 additions & 1 deletion src/query/formats/src/field_decoder/fast_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use databend_common_io::parse_bytes_to_ewkb;
use databend_common_io::prelude::FormatSettings;
use databend_common_io::Interval;
use jsonb::parse_value;
use jsonb::parse_value_standard_mode;
use lexical_core::FromLexical;
use num_traits::NumCast;

Expand Down Expand Up @@ -95,6 +96,7 @@ impl FastFieldDecoderValues {
binary_format: Default::default(),
is_rounding_mode,
enable_dst_hour_fix: format.enable_dst_hour_fix,
enable_extended_json_syntax: format.enable_extended_json_syntax,
},
}
}
Expand Down Expand Up @@ -468,7 +470,12 @@ impl FastFieldDecoderValues {
) -> Result<()> {
let mut buf = Vec::new();
self.read_string_inner(reader, &mut buf, positions)?;
match parse_value(&buf) {
let res = if self.common_settings.enable_extended_json_syntax {
parse_value(&buf)
} else {
parse_value_standard_mode(&buf)
};
match res {
Ok(value) => {
value.write_to_vec(&mut column.data);
column.commit_row();
Expand Down
9 changes: 8 additions & 1 deletion src/query/formats/src/field_decoder/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use databend_common_io::parse_bitmap;
use databend_common_io::parse_bytes_to_ewkb;
use databend_common_io::Interval;
use jsonb::parse_value;
use jsonb::parse_value_standard_mode;
use lexical_core::FromLexical;

use crate::binary::decode_binary;
Expand Down Expand Up @@ -87,6 +88,7 @@ impl NestedValues {
binary_format: Default::default(),
is_rounding_mode: options_ext.is_rounding_mode,
enable_dst_hour_fix: options_ext.enable_dst_hour_fix,
enable_extended_json_syntax: options_ext.enable_extended_json_syntax,
},
}
}
Expand Down Expand Up @@ -306,7 +308,12 @@ impl NestedValues {
) -> Result<()> {
let mut buf = Vec::new();
self.read_string_inner(reader, &mut buf)?;
match parse_value(&buf) {
let res = if self.common_settings.enable_extended_json_syntax {
parse_value(&buf)
} else {
parse_value_standard_mode(&buf)
};
match res {
Ok(value) => {
value.write_to_vec(&mut column.data);
column.commit_row();
Expand Down
10 changes: 9 additions & 1 deletion src/query/formats/src/field_decoder/separated_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use databend_common_io::Interval;
use databend_common_meta_app::principal::CsvFileFormatParams;
use databend_common_meta_app::principal::TsvFileFormatParams;
use jsonb::parse_value;
use jsonb::parse_value_standard_mode;
use lexical_core::FromLexical;
use num_traits::NumCast;

Expand Down Expand Up @@ -89,6 +90,7 @@ impl SeparatedTextDecoder {
binary_format: params.binary_format,
is_rounding_mode: options_ext.is_rounding_mode,
enable_dst_hour_fix: options_ext.enable_dst_hour_fix,
enable_extended_json_syntax: options_ext.enable_extended_json_syntax,
},
nested_decoder: NestedValues::create(options_ext),
}
Expand All @@ -106,6 +108,7 @@ impl SeparatedTextDecoder {
binary_format: Default::default(),
is_rounding_mode: options_ext.is_rounding_mode,
enable_dst_hour_fix: options_ext.enable_dst_hour_fix,
enable_extended_json_syntax: options_ext.enable_extended_json_syntax,
},
nested_decoder: NestedValues::create(options_ext),
}
Expand Down Expand Up @@ -287,7 +290,12 @@ impl SeparatedTextDecoder {
}

fn read_variant(&self, column: &mut BinaryColumnBuilder, data: &[u8]) -> Result<()> {
match parse_value(data) {
let res = if self.common_settings.enable_extended_json_syntax {
parse_value(data)
} else {
parse_value_standard_mode(data)
};
match res {
Ok(value) => {
value.write_to_vec(&mut column.data);
column.commit_row();
Expand Down
Loading
Loading