Skip to content

Commit a2f603c

Browse files
authored
Deliver raw ByteSize from S3 listing, infer appox stats and split DataSources (#8910)
1 parent e2988c2 commit a2f603c

18 files changed

+323
-48
lines changed

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <ydb/library/yql/utils/plan/plan_utils.h>
1818
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
1919
#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
20+
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
2021

2122
#include <ydb/public/lib/ydb_cli/common/format.h>
2223

@@ -818,7 +819,28 @@ class TxPlanSerializer {
818819
return path;
819820
}
820821

821-
void Visit(const TDqSource& source, TQueryPlanNode& stagePlanNode) {
822+
std::shared_ptr<TOptimizerStatistics> FindWrapStats(TExprNode::TPtr node, const TExprNode* dataSourceNode) {
823+
if (auto maybeWrapBase = TMaybeNode<TDqSourceWrapBase>(node)) {
824+
if (maybeWrapBase.Cast().DataSource().Raw() == dataSourceNode) {
825+
return SerializerCtx.TypeCtx.GetStats(node.Get());
826+
}
827+
}
828+
for (const auto& child : node->Children()) {
829+
std::shared_ptr<TOptimizerStatistics> result;
830+
if (child->IsLambda()) {
831+
auto lambda = TExprBase(child).Cast<TCoLambda>();
832+
result = FindWrapStats(lambda.Body().Ptr(), dataSourceNode);
833+
} else {
834+
result = FindWrapStats(child, dataSourceNode);
835+
}
836+
if (result) {
837+
return result;
838+
}
839+
}
840+
return nullptr;
841+
}
842+
843+
void Visit(const TDqSource& source, TQueryPlanNode& stagePlanNode, const TCoLambda& Lambda) {
822844
// YDB sources
823845
if (auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>(); settings.IsValid()) {
824846
Visit(settings.Cast(), stagePlanNode);
@@ -848,7 +870,15 @@ class TxPlanSerializer {
848870
op.Properties["Name"] = "Read from external data source";
849871
}
850872

851-
if (auto stats = SerializerCtx.TypeCtx.GetStats(dataSource.Raw())) {
873+
// Actual stats must be binded with TDqSourceWrapBase
874+
auto stats = FindWrapStats(Lambda.Body().Ptr(), dataSource.Raw());
875+
876+
if (!stats) {
877+
// Fallback to TCoDataSource
878+
stats = SerializerCtx.TypeCtx.GetStats(dataSource.Raw());
879+
}
880+
881+
if (stats) {
852882
op.Properties["E-Rows"] = TStringBuilder() << stats->Nrows;
853883
op.Properties["E-Cost"] = TStringBuilder() << stats->Cost;
854884
op.Properties["E-Size"] = TStringBuilder() << stats->ByteSize;
@@ -922,7 +952,7 @@ class TxPlanSerializer {
922952
for (const auto& input : expr.Cast<TDqStageBase>().Inputs()) {
923953
if (auto source = input.Maybe<TDqSource>()) {
924954
auto& inputSourceNode = AddPlanNode(stagePlanNode);
925-
Visit(source.Cast(), inputSourceNode);
955+
Visit(source.Cast(), inputSourceNode, expr.Cast<TDqStageBase>().Program());
926956
inputIds.emplace_back(&inputSourceNode);
927957
} else {
928958
auto inputCn = input.Cast<TDqConnection>();

ydb/core/kqp/opt/kqp_statistics_transformer.cpp

Lines changed: 193 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
77
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
88
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
9+
#include <ydb/library/yql/providers/s3/statistics/yql_s3_statistics.h>
910

1011
#include <charconv>
1112

@@ -455,33 +456,212 @@ void InferStatisticsForOlapRead(const TExprNode::TPtr& input, TTypeAnnotationCon
455456
}
456457
}
457458

459+
double EstimateRowSize(const TStructExprType& rowType, const TString& format, const TString& compression, bool decoded) {
460+
double result = 0.0;
461+
for (auto item : rowType.GetItems()) {
462+
auto itemType = item->GetItemType();
463+
if (itemType->GetKind() == ETypeAnnotationKind::Data) {
464+
switch(itemType->Cast<TDataExprType>()->GetSlot()) {
465+
case EDataSlot::Bool:
466+
result += decoded ? 1.0 : 0.2;
467+
break;
468+
case EDataSlot::Int8:
469+
[[fallthrough]];
470+
case EDataSlot::Uint8:
471+
result += decoded ? 1.0 : 0.72;
472+
break;
473+
case EDataSlot::Int16:
474+
[[fallthrough]];
475+
case EDataSlot::Uint16:
476+
result += decoded ? 2.0 : 1.44;
477+
break;
478+
case EDataSlot::Int32:
479+
[[fallthrough]];
480+
case EDataSlot::Uint32:
481+
result += decoded ? 4.0 : 2.88;
482+
break;
483+
case EDataSlot::Int64:
484+
[[fallthrough]];
485+
case EDataSlot::Uint64:
486+
[[fallthrough]];
487+
case EDataSlot::Double:
488+
result += decoded ? 8.0 : 3.88;
489+
break;
490+
case EDataSlot::Float:
491+
result += decoded ? 4.0 : 2.88;
492+
break;
493+
case EDataSlot::String:
494+
[[fallthrough]];
495+
case EDataSlot::Utf8:
496+
result += decoded ? 28.0 : 8.0;
497+
break;
498+
case EDataSlot::Yson:
499+
[[fallthrough]];
500+
case EDataSlot::Json:
501+
result += decoded ? 56.0 : 16.0;
502+
break;
503+
case EDataSlot::Uuid:
504+
break;
505+
case EDataSlot::Date:
506+
result += decoded ? 2.0 : 1.51;
507+
break;
508+
case EDataSlot::Datetime:
509+
[[fallthrough]];
510+
case EDataSlot::Timestamp:
511+
result += decoded ? 8.0 : 6.04;
512+
break;
513+
case EDataSlot::Interval:
514+
break;
515+
case EDataSlot::TzDate:
516+
break;
517+
case EDataSlot::TzDatetime:
518+
break;
519+
case EDataSlot::TzTimestamp:
520+
break;
521+
case EDataSlot::Decimal:
522+
result += decoded ? 16.0 : 7.76;
523+
break;
524+
case EDataSlot::DyNumber:
525+
break;
526+
case EDataSlot::JsonDocument:
527+
break;
528+
case EDataSlot::Date32:
529+
result += decoded ? 4.0 : 2.88;
530+
break;
531+
case EDataSlot::Datetime64:
532+
[[fallthrough]];
533+
case EDataSlot::Timestamp64:
534+
case EDataSlot::Interval64:
535+
result += decoded ? 8.0 : 3.88;
536+
break;
537+
case EDataSlot::TzDate32:
538+
break;
539+
case EDataSlot::TzDatetime64:
540+
break;
541+
case EDataSlot::TzTimestamp64:
542+
break;
543+
}
544+
}
545+
}
546+
547+
if (result == 0.0) {
548+
result = 1000.0;
549+
}
550+
551+
if (format != "parquet" && !decoded) {
552+
double compressionRatio = 1.0;
553+
if (format == "csv_with_names" || format == "tsv_with_names") {
554+
result *= 5.0;
555+
compressionRatio = 4.5; // gzip
556+
} else if (format != "raw") { // json's
557+
result *= 12.0;
558+
compressionRatio = 14.0; // gzip
559+
}
560+
if (compression) {
561+
if (compression == "gzip") {
562+
// 1.00
563+
} else if (compression == "zstd") {
564+
compressionRatio *= 1.05;
565+
} else if (compression == "lz4") {
566+
compressionRatio *= 1.43;
567+
} else if (compression == "brotli") {
568+
compressionRatio *= 1.20;
569+
} else if (compression == "bzip2") {
570+
compressionRatio *= 1.24;
571+
} else if (compression == "xz") {
572+
compressionRatio *= 1.45;
573+
}
574+
result /= compressionRatio;
575+
}
576+
}
577+
578+
return result;
579+
}
580+
458581
void InferStatisticsForDqSourceWrap(const TExprNode::TPtr& input, TTypeAnnotationContext* typeCtx,
459582
TKqpOptimizeContext& kqpCtx) {
460583
auto inputNode = TExprBase(input);
461584
if (auto wrapBase = inputNode.Maybe<TDqSourceWrapBase>()) {
462585
if (auto maybeS3DataSource = wrapBase.Cast().DataSource().Maybe<TS3DataSource>()) {
463586
auto s3DataSource = maybeS3DataSource.Cast();
464587
if (s3DataSource.Name()) {
465-
auto path = s3DataSource.Name().Cast().StringValue();
466-
if (kqpCtx.Config->OptOverrideStatistics.Get() && path) {
467-
auto stats = std::make_shared<TOptimizerStatistics>(EStatisticsType::BaseTable, 0.0, 0, 0, 0.0, TIntrusivePtr<TOptimizerStatistics::TKeyColumns>());
468-
stats = OverrideStatistics(*stats, path, kqpCtx.GetOverrideStatistics());
469-
if (stats->ByteSize == 0.0) {
588+
auto stats = typeCtx->GetStats(s3DataSource.Raw());
589+
if (!stats) {
590+
stats = std::make_shared<TOptimizerStatistics>(EStatisticsType::BaseTable, 0.0, 0, 0, 0.0, TIntrusivePtr<TOptimizerStatistics::TKeyColumns>());
591+
}
592+
if (!stats->Specific) {
593+
stats->Specific = std::make_shared<TS3ProviderStatistics>();
594+
}
595+
596+
const TS3ProviderStatistics* specific = dynamic_cast<const TS3ProviderStatistics*>((stats->Specific.get()));
597+
598+
if (!specific->OverrideApplied && kqpCtx.Config->OptOverrideStatistics.Get()) {
599+
auto path = s3DataSource.Name().Cast().StringValue();
600+
auto dbStats = kqpCtx.GetOverrideStatistics()->GetMapSafe();
601+
if (!dbStats.contains(path)) {
470602
auto n = path.find_last_of('/');
471603
if (n != path.npos) {
472-
stats = OverrideStatistics(*stats, path.substr(n + 1), kqpCtx.GetOverrideStatistics());
604+
path = path.substr(n + 1);
473605
}
474606
}
475-
if (stats->ByteSize != 0.0) {
476-
if (stats->Ncols) {
477-
auto n = wrapBase.Cast().RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetSize();
478-
stats->ByteSize = stats->ByteSize * n / stats->Ncols;
479-
}
480-
YQL_CLOG(TRACE, CoreDq) << "Infer statistics for s3 data source " << path;
481-
typeCtx->SetStats(input.Get(), stats);
607+
if (dbStats.contains(path)) {
608+
YQL_CLOG(TRACE, CoreDq) << "Override statistics for s3 data source " << path;
609+
stats = OverrideStatistics(*stats, path, kqpCtx.GetOverrideStatistics());
610+
auto newSpecific = std::make_shared<TS3ProviderStatistics>(*specific);
611+
newSpecific->OverrideApplied = true;
612+
stats->Specific = newSpecific;
613+
specific = newSpecific.get();
482614
typeCtx->SetStats(s3DataSource.Raw(), stats);
483615
}
484616
}
617+
618+
auto rowType = wrapBase.Cast().RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
619+
if (specific->FullRawRowAvgSize == 0.0) {
620+
auto newSpecific = std::make_shared<TS3ProviderStatistics>(*specific);
621+
stats = std::make_shared<TOptimizerStatistics>(stats->Type, stats->Nrows, stats->Ncols, stats->ByteSize, stats->Cost, stats->KeyColumns, stats->ColumnStatistics, stats->StorageType, newSpecific);
622+
newSpecific->FullRawRowAvgSize = EstimateRowSize(*rowType, newSpecific->Format, newSpecific->Compression, false);
623+
newSpecific->FullDecodedRowAvgSize = EstimateRowSize(*rowType, newSpecific->Format, newSpecific->Compression, true);
624+
specific = newSpecific.get();
625+
typeCtx->SetStats(s3DataSource.Raw(), stats);
626+
}
627+
628+
auto wrapStats = typeCtx->GetStats(input.Get());
629+
if (!wrapStats) {
630+
typeCtx->SetStats(input.Get(), stats);
631+
} else {
632+
stats = wrapStats;
633+
}
634+
635+
if (stats->Ncols == 0 || stats->Ncols > static_cast<int>(rowType->GetSize()) || stats->Nrows == 0 || stats->ByteSize == 0.0 || stats->Cost == 0.0) {
636+
auto newSpecific = std::make_shared<TS3ProviderStatistics>(*specific);
637+
stats = std::make_shared<TOptimizerStatistics>(stats->Type, stats->Nrows, stats->Ncols, stats->ByteSize, stats->Cost, stats->KeyColumns, stats->ColumnStatistics, stats->StorageType, newSpecific);
638+
639+
if (stats->Nrows == 0 && newSpecific->FullRawRowAvgSize) {
640+
stats->Nrows = newSpecific->RawByteSize / newSpecific->FullRawRowAvgSize;
641+
}
642+
if (stats->Ncols == 0 || stats->Ncols > static_cast<int>(rowType->GetSize())) {
643+
stats->Ncols = rowType->GetSize();
644+
newSpecific->PrunedRawRowAvgSize = EstimateRowSize(*rowType, newSpecific->Format, newSpecific->Compression, false);
645+
newSpecific->PrunedDecodedRowAvgSize = EstimateRowSize(*rowType, newSpecific->Format, newSpecific->Compression, true);
646+
stats->ByteSize = 0.0;
647+
}
648+
if (stats->ByteSize == 0.0) {
649+
stats->ByteSize = stats->Nrows * newSpecific->PrunedDecodedRowAvgSize;
650+
}
651+
double rowSize = 0.0;
652+
if (stats->Cost == 0.0) {
653+
if (newSpecific->Format == "parquet") {
654+
rowSize = newSpecific->PrunedRawRowAvgSize;
655+
} else {
656+
rowSize = newSpecific->FullRawRowAvgSize;
657+
}
658+
stats->Cost = rowSize * stats->Nrows;
659+
if (newSpecific->Compression) {
660+
stats->Cost *= 1.5;
661+
}
662+
}
663+
typeCtx->SetStats(input.Get(), stats);
664+
}
485665
}
486666
}
487667
}

ydb/core/kqp/opt/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ PEERDIR(
2525
ydb/library/yql/dq/opt
2626
ydb/library/yql/dq/type_ann
2727
ydb/library/yql/providers/s3/expr_nodes
28+
ydb/library/yql/providers/s3/statistics
2829
ydb/library/yql/utils/plan
2930
ydb/core/kqp/provider
3031
ydb/library/formats/arrow/protos

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2065,6 +2065,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
20652065
// default planner values
20662066

20672067
const TString sql = fmt::format(R"(
2068+
pragma ydb.CostBasedOptimizationLevel = "1";
2069+
20682070
SELECT SUM(t1.bar + t2.bar) as sum FROM `{table1}` as t1 JOIN /*+grace()*/ `{table2}`as t2 ON t1.foo = t2.foo
20692071
)",
20702072
"table1"_a = root + table1,
@@ -2095,6 +2097,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
20952097
// scale down
20962098

20972099
const TString sql = fmt::format(R"(
2100+
pragma ydb.CostBasedOptimizationLevel = "1";
20982101
pragma ydb.OverridePlanner = @@ [
20992102
{{ "tx": 0, "stage": {source1_id}, "tasks": 1 }},
21002103
{{ "tx": 0, "stage": {source2_id}, "tasks": 1 }},
@@ -2131,6 +2134,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
21312134
// scale up
21322135

21332136
const TString sql = fmt::format(R"(
2137+
pragma ydb.CostBasedOptimizationLevel = "1";
21342138
pragma ydb.OverridePlanner = @@ [
21352139
{{ "tx": 0, "stage": {source1_id}, "tasks": 10 }},
21362140
{{ "tx": 0, "stage": {source2_id}, "tasks": 10 }},

ydb/library/yql/core/yql_statistics.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ TOptimizerStatistics::TOptimizerStatistics(
6868
TIntrusivePtr<TKeyColumns> keyColumns,
6969
TIntrusivePtr<TColumnStatMap> columnMap,
7070
EStorageType storageType,
71-
std::unique_ptr<IProviderStatistics> specific)
71+
std::shared_ptr<const IProviderStatistics> specific)
7272
: Type(type)
7373
, Nrows(nrows)
7474
, Ncols(ncols)
@@ -90,7 +90,7 @@ TOptimizerStatistics& TOptimizerStatistics::operator+=(const TOptimizerStatistic
9090
}
9191

9292
std::shared_ptr<TOptimizerStatistics> NYql::OverrideStatistics(const NYql::TOptimizerStatistics& s, const TStringBuf& tablePath, const std::shared_ptr<NJson::TJsonValue>& stats) {
93-
auto res = std::make_shared<TOptimizerStatistics>(s.Type, s.Nrows, s.Ncols, s.ByteSize, s.Cost, s.KeyColumns, s.ColumnStatistics);
93+
auto res = std::make_shared<TOptimizerStatistics>(s.Type, s.Nrows, s.Ncols, s.ByteSize, s.Cost, s.KeyColumns, s.ColumnStatistics, s.StorageType, s.Specific);
9494

9595
auto dbStats = stats->GetMapSafe();
9696

ydb/library/yql/core/yql_statistics.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ struct TOptimizerStatistics {
6868
TIntrusivePtr<TKeyColumns> KeyColumns;
6969
TIntrusivePtr<TColumnStatMap> ColumnStatistics;
7070
EStorageType StorageType = EStorageType::NA;
71-
std::unique_ptr<const IProviderStatistics> Specific;
71+
std::shared_ptr<const IProviderStatistics> Specific;
7272
std::shared_ptr<TVector<TString>> Labels = {};
7373

7474
TOptimizerStatistics(TOptimizerStatistics&&) = default;
@@ -83,7 +83,7 @@ struct TOptimizerStatistics {
8383
TIntrusivePtr<TKeyColumns> keyColumns = {},
8484
TIntrusivePtr<TColumnStatMap> columnMap = {},
8585
EStorageType storageType = EStorageType::NA,
86-
std::unique_ptr<IProviderStatistics> specific = nullptr);
86+
std::shared_ptr<const IProviderStatistics> specific = nullptr);
8787

8888
TOptimizerStatistics& operator+=(const TOptimizerStatistics& other);
8989
bool Empty() const;

0 commit comments

Comments
 (0)