Skip to content

Commit 8f7b4e4

Browse files
format setting supported for date type (#11608)
1 parent 78eb45e commit 8f7b4e4

23 files changed

+647
-35
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
6767
for (const auto& column: json.GetArray()) {
6868
*objectStorage.add_partitioned_by() = column;
6969
}
70-
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "csv_delimiter"sv}, lowerKey)) {
70+
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "data.date.format"sv, "csv_delimiter"sv}, lowerKey)) {
7171
objectStorage.mutable_format_setting()->insert({lowerKey, value});
7272
} else {
7373
ythrow TExternalSourceException() << "Unknown attribute " << key;
@@ -196,7 +196,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
196196
continue;
197197
}
198198

199-
if (IsIn({ "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv}, key)) {
199+
if (IsIn({ "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "data.date.format"sv}, key)) {
200200
continue;
201201
}
202202

@@ -257,6 +257,10 @@ struct TObjectStorageExternalSource : public IExternalSource {
257257
continue;
258258
}
259259

260+
if (key == "data.date.format"sv) {
261+
continue;
262+
}
263+
260264
if (matchAllSettings) {
261265
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown format setting " + key));
262266
}

ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ class TPqDataSourceProvider : public TDataProviderBase {
180180
settings.Add(ctx.NewList(read.Pos(), std::move(pair)));
181181
}
182182

183+
if (topicKeyParser.GetDateFormat()) {
184+
settings.Add(topicKeyParser.GetDateFormat());
185+
}
186+
183187
auto builder = Build<TPqReadTopic>(ctx, read.Pos())
184188
.World(read.World())
185189
.DataSource(read.DataSource())

ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
8686
}
8787

8888
TStatus HandleReadTopic(TExprBase input, TExprContext& ctx) {
89-
if (!EnsureMinMaxArgsCount(input.Ref(), 6, 8, ctx)) {
89+
if (!EnsureMinMaxArgsCount(input.Ref(), 6, 9, ctx)) {
9090
return TStatus::Error;
9191
}
9292

ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ bool TTopicKeyParser::Parse(const TExprNode& expr, TExprNode::TPtr readSettings,
6767
TimestampFormat = readSettings->Child(i);
6868
continue;
6969
}
70+
if (readSettings->Child(i)->Head().IsAtom("data.date.format")) {
71+
DateFormat = readSettings->Child(i);
72+
continue;
73+
}
7074
}
7175
}
7276

ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ class TTopicKeyParser {
4646
return TimestampFormat;
4747
}
4848

49+
TExprNode::TPtr GetDateFormat() {
50+
return DateFormat;
51+
}
52+
4953
bool Parse(const TExprNode& expr, TExprNode::TPtr readSettings, TExprContext& ctx);
5054

5155
private:
@@ -60,6 +64,7 @@ class TTopicKeyParser {
6064
TExprNode::TPtr DateTimeFormat;
6165
TExprNode::TPtr TimestampFormatName;
6266
TExprNode::TPtr TimestampFormat;
67+
TExprNode::TPtr DateFormat;
6368
TExprNode::TPtr UserSchema;
6469
TExprNode::TPtr ColumnOrder;
6570
};

ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp

Lines changed: 91 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,10 @@ std::shared_ptr<arrow::Array> ArrowStringAsYqlTimestamp(const std::shared_ptr<ar
238238
return builder.Build(true).make_array();
239239
}
240240

241-
template <bool isOptional>
242-
std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
241+
template <bool isOptional, typename TArrowType>
242+
std::shared_ptr<arrow::Array> ArrowTypeAsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
243243
::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
244-
::NYql::NUdf::TFixedSizeBlockReader<i32, isOptional> reader;
244+
::NYql::NUdf::TFixedSizeBlockReader<TArrowType, isOptional> reader;
245245
for (i64 i = 0; i < value->length(); ++i) {
246246
const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
247247
if constexpr (isOptional) {
@@ -253,7 +253,7 @@ std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::
253253
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
254254
}
255255

256-
const i32 v = item.As<i32>();
256+
const TArrowType v = item.As<TArrowType>();
257257
if (v < 0 || v > ::NYql::NUdf::MAX_DATE) {
258258
throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v);
259259
}
@@ -262,6 +262,31 @@ std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::
262262
return builder.Build(true).make_array();
263263
}
264264

265+
template <bool isOptional>
266+
std::shared_ptr<arrow::Array> ArrowStringAsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, const NDB::FormatSettings& formatSettings) {
267+
::NYql::NUdf::TFixedSizeArrayBuilder<ui32, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
268+
::NYql::NUdf::TStringBlockReader<arrow::BinaryType, isOptional, NKikimr::NUdf::EDataSlot::String> reader;
269+
for (i64 i = 0; i < value->length(); ++i) {
270+
NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
271+
272+
if constexpr (isOptional) {
273+
if (!item) {
274+
builder.Add(item);
275+
continue;
276+
}
277+
} else if (!item) {
278+
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
279+
}
280+
281+
auto ref = item.AsStringRef();
282+
NDB::ReadBufferFromMemory rb{ref.Data(), ref.Size()};
283+
uint16_t result = 0;
284+
parseImpl<NDB::DataTypeDate>(result, rb, nullptr, formatSettings);
285+
builder.Add(NUdf::TBlockItem(static_cast<ui16>(result)));
286+
}
287+
return builder.Build(true).make_array();
288+
}
289+
265290
TColumnConverter ArrowUInt32AsYqlDatetime(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
266291
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
267292
return isOptional
@@ -430,14 +455,62 @@ TColumnConverter ArrowDate32AsYqlString(const std::shared_ptr<arrow::DataType>&
430455
};
431456
}
432457

458+
TColumnConverter ArrowUInt16AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
459+
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
460+
return isOptional
461+
? ArrowTypeAsYqlDate<true, ui16>(targetType, value)
462+
: ArrowTypeAsYqlDate<false, ui16>(targetType, value);
463+
};
464+
}
465+
466+
TColumnConverter ArrowInt32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
467+
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
468+
return isOptional
469+
? ArrowTypeAsYqlDate<true, i32>(targetType, value)
470+
: ArrowTypeAsYqlDate<false, i32>(targetType, value);
471+
};
472+
}
473+
474+
TColumnConverter ArrowUInt32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
475+
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
476+
return isOptional
477+
? ArrowTypeAsYqlDate<true, ui32>(targetType, value)
478+
: ArrowTypeAsYqlDate<false, ui32>(targetType, value);
479+
};
480+
}
481+
482+
TColumnConverter ArrowInt64AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
483+
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
484+
return isOptional
485+
? ArrowTypeAsYqlDate<true, i64>(targetType, value)
486+
: ArrowTypeAsYqlDate<false, i64>(targetType, value);
487+
};
488+
}
489+
490+
TColumnConverter ArrowUInt64AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
491+
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
492+
return isOptional
493+
? ArrowTypeAsYqlDate<true, ui64>(targetType, value)
494+
: ArrowTypeAsYqlDate<false, ui64>(targetType, value);
495+
};
496+
}
497+
433498
TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::DateUnit unit) {
434499
if (unit == arrow::DateUnit::MILLI) {
435500
throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the date");
436501
}
437502
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
438503
return isOptional
439-
? ArrowDate32AsYqlDate<true>(targetType, value)
440-
: ArrowDate32AsYqlDate<false>(targetType, value);
504+
? ArrowTypeAsYqlDate<true, i32>(targetType, value)
505+
: ArrowTypeAsYqlDate<false, i32>(targetType, value);
506+
};
507+
}
508+
509+
TColumnConverter ArrowStringAsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) {
510+
return [targetType, isOptional, formatSettings](const std::shared_ptr<arrow::Array>& value) {
511+
return isOptional
512+
? ArrowStringAsYqlDate<true>(targetType, value, formatSettings)
513+
: ArrowStringAsYqlDate<false>(targetType, value, formatSettings);
441514
};
442515
}
443516

@@ -457,6 +530,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
457530
switch (originalType->id()) {
458531
case arrow::Type::UINT16: {
459532
switch (slotItem) {
533+
case NUdf::EDataSlot::Date:
534+
return ArrowUInt16AsYqlDate(targetType, isOptional);
460535
case NUdf::EDataSlot::Datetime:
461536
return ArrowUInt16AsYqlDatetime(targetType, isOptional);
462537
case NUdf::EDataSlot::Timestamp:
@@ -467,6 +542,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
467542
}
468543
case arrow::Type::INT32: {
469544
switch (slotItem) {
545+
case NUdf::EDataSlot::Date:
546+
return ArrowInt32AsYqlDate(targetType, isOptional);
470547
case NUdf::EDataSlot::Datetime:
471548
return ArrowInt32AsYqlDatetime(targetType, isOptional);
472549
case NUdf::EDataSlot::Timestamp:
@@ -477,6 +554,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
477554
}
478555
case arrow::Type::UINT32: {
479556
switch (slotItem) {
557+
case NUdf::EDataSlot::Date:
558+
return ArrowUInt32AsYqlDate(targetType, isOptional);
480559
case NUdf::EDataSlot::Datetime:
481560
return ArrowUInt32AsYqlDatetime(targetType, isOptional);
482561
case NUdf::EDataSlot::Timestamp:
@@ -487,6 +566,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
487566
}
488567
case arrow::Type::INT64: {
489568
switch (slotItem) {
569+
case NUdf::EDataSlot::Date:
570+
return ArrowInt64AsYqlDate(targetType, isOptional);
490571
case NUdf::EDataSlot::Datetime:
491572
return ArrowInt64AsYqlDatetime(targetType, isOptional);
492573
case NUdf::EDataSlot::Timestamp:
@@ -497,6 +578,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
497578
}
498579
case arrow::Type::UINT64: {
499580
switch (slotItem) {
581+
case NUdf::EDataSlot::Date:
582+
return ArrowUInt64AsYqlDate(targetType, isOptional);
500583
case NUdf::EDataSlot::Datetime:
501584
return ArrowUInt64AsYqlDatetime(targetType, isOptional);
502585
case NUdf::EDataSlot::Timestamp:
@@ -553,6 +636,8 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
553636
case arrow::Type::STRING:
554637
case arrow::Type::BINARY: {
555638
switch (slotItem) {
639+
case NUdf::EDataSlot::Date:
640+
return ArrowStringAsYqlDate(targetType, isOptional, formatSettings);
556641
case NUdf::EDataSlot::Datetime:
557642
return ArrowStringAsYqlDateTime(targetType, isOptional, formatSettings);
558643
case NUdf::EDataSlot::Timestamp:

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2247,6 +2247,10 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
22472247
readSpec->Settings.timestamp_format = it->second;
22482248
}
22492249

2250+
if (const auto it = settings.find("data.date.format"); settings.cend() != it) {
2251+
readSpec->Settings.date_format = it->second;
2252+
}
2253+
22502254
if (readSpec->Settings.date_time_format_name == NDB::FormatSettings::DateTimeFormat::Unspecified && readSpec->Settings.date_time_format.empty()) {
22512255
readSpec->Settings.date_time_format_name = NDB::FormatSettings::DateTimeFormat::POSIX;
22522256
}

ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,15 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
271271
return true;
272272
}
273273

274+
if (name == "data.date.format") {
275+
const auto& value = setting.Tail();
276+
if (!EnsureAtom(value, ctx)) {
277+
return false;
278+
}
279+
280+
return true;
281+
}
282+
274283
if (name == "csvdelimiter") {
275284
const auto& value = setting.Tail();
276285
if (!EnsureAtom(value, ctx)) {
@@ -292,7 +301,7 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
292301
return true;
293302
};
294303

295-
if (!EnsureValidSettings(*input->Child(TS3Target::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "data.datetime.formatname", "data.datetime.format", "data.timestamp.formatname", "data.timestamp.format", "csvdelimiter", "filepattern"}, validator, ctx)) {
304+
if (!EnsureValidSettings(*input->Child(TS3Target::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "data.datetime.formatname", "data.datetime.format", "data.timestamp.formatname", "data.timestamp.format", "data.date.format", "csvdelimiter", "filepattern"}, validator, ctx)) {
296305
return TStatus::Error;
297306
}
298307

ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,14 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
708708
return true;
709709
}
710710

711+
if (name == "data.date.format"sv) {
712+
TStringBuf unused;
713+
if (!ExtractSettingValue(setting.Tail(), "data.date.format"sv, format, {}, ctx, unused)) {
714+
return false;
715+
}
716+
return true;
717+
}
718+
711719
if (name == "readmaxbytes"sv) {
712720
TStringBuf unused;
713721
if (!ExtractSettingValue(setting.Tail(), "read_max_bytes"sv, format, "raw"sv, ctx, unused)) {
@@ -785,7 +793,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
785793
};
786794
if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings),
787795
{ "compression"sv, "partitionedby"sv, "projection"sv, "data.interval.unit"sv, "constraints"sv,
788-
"data.datetime.formatname"sv, "data.datetime.format"sv, "data.timestamp.formatname"sv, "data.timestamp.format"sv,
796+
"data.datetime.formatname"sv, "data.datetime.format"sv, "data.timestamp.formatname"sv, "data.timestamp.format"sv, "data.date.format"sv,
789797
"readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv, "pathpattern"sv, "pathpatternvariant"sv }, validator, ctx))
790798
{
791799
return TStatus::Error;

ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ TExprNode::TPtr GetTimestampFormat(const TExprNode& settings) {
4646
return GetSetting(settings, "data.timestamp.format"sv);
4747
}
4848

49+
TExprNode::TPtr GetDateFormat(const TExprNode& settings) {
50+
return GetSetting(settings, "data.date.format"sv);
51+
}
52+
4953
TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
5054
if (partBy) {
5155
auto children = partBy->ChildrenList();
@@ -171,6 +175,10 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
171175
sinkOutputSettingsBuilder.Add(ctx.NewList(target.Pos(), std::move(pair)));
172176
}
173177

178+
if (auto dateFormat = GetDateFormat(settings)) {
179+
sinkOutputSettingsBuilder.Add(std::move(dateFormat));
180+
}
181+
174182
const TStringBuf format = target.Format();
175183
if (format != "raw" && format != "json_list") { // multipart
176184
{

0 commit comments

Comments
 (0)