From 60658d0238a91b17f126fb032bf232ddbea1846e Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Tue, 24 Jun 2025 11:14:51 +0300 Subject: [PATCH 1/4] Improved s3 read / write partitions validation --- ydb/core/external_sources/object_storage.cpp | 16 ++- .../external_sources/object_storage_ut.cpp | 20 +++ .../s3/kqp_federated_query_ut.cpp | 76 +++++++++++ ydb/library/yql/providers/s3/common/util.cpp | 10 +- .../path_generator/yql_s3_path_generator.cpp | 127 +++++++++--------- .../s3/provider/yql_s3_datasink_type_ann.cpp | 7 +- 6 files changed, 185 insertions(+), 71 deletions(-) diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index e93ae9c05774..2c2de549d0a6 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -63,9 +63,19 @@ struct TObjectStorageExternalSource : public IExternalSource { } else if (key.StartsWith("projection.") || key == "storage.location.template") { objectStorage.mutable_projection()->insert({key, value}); } else if (lowerKey == "partitioned_by") { - auto json = NSc::TValue::FromJsonThrow(value); - for (const auto& column: json.GetArray()) { - *objectStorage.add_partitioned_by() = column; + try { + const auto json = NSc::TValue::FromJsonThrow(value); + if (!json.IsArray()) { + throw TExternalSourceException() << "partitioned_by must be an array of column names"; + } + for (const auto& column: json.GetArray()) { + if (!column.IsString()) { + throw TExternalSourceException() << "partitioned_by must be an array of strings"; + } + *objectStorage.add_partitioned_by() = column; + } + } catch (const std::exception& e) { + throw TExternalSourceException() << "Failed to parse partitioned_by: " << e.what(); } } else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "data.date.format"sv, "csv_delimiter"sv}, lowerKey)) { objectStorage.mutable_format_setting()->insert({lowerKey, value}); diff --git a/ydb/core/external_sources/object_storage_ut.cpp b/ydb/core/external_sources/object_storage_ut.cpp index 129ad8febd7d..f6bc281efae0 100644 --- a/ydb/core/external_sources/object_storage_ut.cpp +++ b/ydb/core/external_sources/object_storage_ut.cpp @@ -89,6 +89,26 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) { UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '*' contains wildcards"); } } + + Y_UNIT_TEST(FailedPartitionedByValidation) { + const auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false); + NKikimrExternalSources::TSchema schema; + { + NKikimrExternalSources::TGeneral general; + general.mutable_attributes()->emplace("partitioned_by", "{\"year\": \"2025\"}"); + UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of column names"); + } + { + NKikimrExternalSources::TGeneral general; + general.mutable_attributes()->emplace("partitioned_by", "[{\"year\": \"2025\"}]"); + UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "partitioned_by must be an array of strings"); + } + { + NKikimrExternalSources::TGeneral general; + general.mutable_attributes()->emplace("partitioned_by", "[{"); + UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Failed to parse partitioned_by:"); + } + } } } // NKikimr diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index c82487f47410..1249a01ccab8 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -2687,6 +2687,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT(parser.TryNextRow()); UNIT_ASSERT(!parser.ColumnParser(0).GetOptionalString()); } + + Y_UNIT_TEST(TestRawFormatInsertValidation) { + const TString bucket = "test_raw_format_insert_validation_bucket"; + CreateBucket(bucket); + + auto kikimr = NTestUtils::MakeKikimrRunner(); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + { + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `test_bucket` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "location"_a = GetBucketLocation(bucket) + ); + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + auto db = kikimr->GetQueryClient(); + const TString query = R"( + INSERT INTO test_bucket.`/result/` WITH ( + FORMAT = "raw", + SCHEMA ( + data String?? + ) + ) + (data) + VALUES + ("some_string") + )"; + const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + const auto& issues = result.GetIssues().ToString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues); + UNIT_ASSERT_STRING_CONTAINS(issues, "Only column with primitive type allowed for raw format"); + } + + Y_UNIT_TEST(TestPartitionedByInsertValidation) { + const TString bucket = "test_partitioned_by_insert_validation_bucket"; + CreateBucket(bucket); + + auto kikimr = NTestUtils::MakeKikimrRunner(); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + { + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `test_bucket` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "location"_a = GetBucketLocation(bucket) + ); + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + auto db = kikimr->GetQueryClient(); + const TString query = R"( + INSERT INTO test_bucket.`/result/` WITH ( + FORMAT = "csv_with_names", + PARTITIONED_BY = (data) + ) + (data) + VALUES + ("some_string") + )"; + const auto result = db.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + const auto& issues = result.GetIssues().ToString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, issues); + UNIT_ASSERT_STRING_CONTAINS(issues, "Write schema contains no columns except partitioning columns."); + } } } // namespace NKikimr::NKqp diff --git a/ydb/library/yql/providers/s3/common/util.cpp b/ydb/library/yql/providers/s3/common/util.cpp index 57b7c81ccbff..d43db0b0f1f0 100644 --- a/ydb/library/yql/providers/s3/common/util.cpp +++ b/ydb/library/yql/providers/s3/common/util.cpp @@ -218,9 +218,13 @@ bool ValidateS3WriteSchema(TPositionHandle pos, std::string_view format, const T return false; } - const TDataExprType* rowType; - bool isOptional; - return EnsureDataOrOptionalOfData(pos, schemaStructRowType->GetItems().front()->GetItemType(), isOptional, rowType, ctx); + const auto* rowType = schemaStructRowType->GetItems().front()->GetItemType(); + if (rowType->GetKind() != ETypeAnnotationKind::Data) { + ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Only column with primitive type allowed for raw format (you have field with type " << *rowType << ")")); + return false; + } + + return true; } if (format == "json_list"sv) { diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp index e9b9e9aeab58..4a1f28264ff8 100644 --- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp +++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp @@ -29,7 +29,7 @@ TInstant Strptime(const TString& input, const TString& format) { return TInstant::Seconds(seconds); } } - ythrow yexception() << "Can't parse date " << input << " in format " << format; + throw yexception() << "Can't parse date " << input << " in format " << format; } TString Strftime(const char* format, TInstant time) { @@ -42,7 +42,7 @@ TString Strftime(const char* format, TInstant time) { if (r != 0) { return TString(buf.Data(), r); } - ythrow yexception() << "Can't format date " << time << " in format " << format; + throw yexception() << "Can't format date " << time << " in format " << format; } @@ -84,19 +84,19 @@ i64 GetIntOrThrow(const NSc::TValue& json, const TString& error) { } } - ythrow yexception() << error; + throw yexception() << error; } TString GetStringOrThrow(const NSc::TValue& json, const TString& error) { if (json.IsString() || json.IsIntNumber() || json.IsNumber()) { return json.ForceString(); } - ythrow yexception() << error; + throw yexception() << error; } i64 GetBoolOrThrow(const NSc::TValue& json, const TString& error) { if (json.IsNull()) { - ythrow yexception() << error; + throw yexception() << error; } return json.IsTrue(); } @@ -108,7 +108,7 @@ IPathGenerator::EIntervalUnit ToIntervalUnit(const TString& unit) { return name.first; } } - ythrow yexception() << "Invalid projection scheme: unit " << unit << " must be one of " << GetEnumAllNames(); + throw yexception() << "Invalid projection scheme: unit " << unit << " must be one of " << GetEnumAllNames(); } TMap ToLowerType() { @@ -126,13 +126,13 @@ IPathGenerator::EType ToType(const TString& type) { return name.first; } } - ythrow yexception() << "Invalid projection scheme: type " << type << " must be one of " << to_lower(GetEnumAllNames()); + throw yexception() << "Invalid projection scheme: type " << type << " must be one of " << to_lower(GetEnumAllNames()); } std::string fmtInteger(int32_t width, i64 value) { if (width > 64) { - ythrow yexception() << "Digits cannot exceed 64, but received " << width; + throw yexception() << "Digits cannot exceed 64, but received " << width; } if (width == 0) { return std::to_string(value); @@ -188,7 +188,7 @@ TDuration FromUnit(int64_t interval, IPathGenerator::EIntervalUnit unit) { case IPathGenerator::EIntervalUnit::MONTHS: // external special handling case IPathGenerator::EIntervalUnit::YEARS: default: - ythrow yexception() << "Only the " << GetEnumAllNames() << " units are supported but got " << unit; + throw yexception() << "Only the " << GetEnumAllNames() << " units are supported but got " << unit; } } @@ -219,12 +219,12 @@ TInstant AddUnit(TInstant current, int64_t interval, IPathGenerator::EIntervalUn const TDuration delta = FromUnit(abs(interval), unit); if (delta.GetValue() > std::numeric_limits::max()) { - ythrow yexception() << "Interval is overflowed"; + throw yexception() << "Interval is overflowed"; } const i64 deltaValue = (interval > 0 ? 1LL : -1LL) * delta.GetValue(); if (IsOverflow(current.GetValue(), deltaValue)) { - ythrow yexception() << "Timestamp is overflowed"; + throw yexception() << "Timestamp is overflowed"; } return interval > 0 ? current + delta : current - delta; @@ -283,23 +283,23 @@ struct TPathGenerator: public IPathGenerator { TString Format(const TStringBuf& columnName, const TStringBuf& dataValue) const override { auto it = ColumnConfig.find(columnName); if (it == ColumnConfig.end()) { - ythrow yexception() << columnName << " column not found in config"; + throw yexception() << columnName << " column not found in config"; } const auto& config = it->second; switch (config.Type) { case IPathGenerator::EType::UNDEFINED: { - ythrow yexception() << columnName << " column has undefined type"; + throw yexception() << columnName << " column has undefined type"; } case IPathGenerator::EType::ENUM: { if (Find(config.Values, dataValue) == config.Values.end()) { - ythrow yexception() << dataValue << " data not found as enum item"; + throw yexception() << dataValue << " data not found as enum item"; } return TString{dataValue}; } case IPathGenerator::EType::INTEGER: { i64 value = 0; if (!TryFromString(dataValue.data(), dataValue.size(), value)) { - ythrow yexception() << dataValue << " data is not a int64"; + throw yexception() << dataValue << " data is not a int64"; } return fmtInteger(config.Digits, value); } @@ -314,23 +314,23 @@ struct TPathGenerator: public IPathGenerator { TString Parse(const TStringBuf& columnName, const TStringBuf& pathValue) const override { auto it = ColumnConfig.find(columnName); if (it == ColumnConfig.end()) { - ythrow yexception() << columnName << " column not found in config"; + throw yexception() << columnName << " column not found in config"; } const auto& config = it->second; switch (config.Type) { case IPathGenerator::EType::UNDEFINED: { - ythrow yexception() << columnName << " column has undefined type"; + throw yexception() << columnName << " column has undefined type"; } case IPathGenerator::EType::ENUM: { if (Find(config.Values, pathValue) == config.Values.end()) { - ythrow yexception() << pathValue << " value not found as enum item"; + throw yexception() << pathValue << " value not found as enum item"; } return TString{pathValue}; } case IPathGenerator::EType::INTEGER: { int64_t value = 0; if (!TryFromString(pathValue.data(), pathValue.size(), value)) { - ythrow yexception() << pathValue << " value is not a int64"; + throw yexception() << pathValue << " value is not a int64"; } return std::to_string(value); } @@ -354,7 +354,7 @@ struct TPathGenerator: public IPathGenerator { // Parse void ParsePartitioningRules(const TString& config, const std::vector& partitionedBy) { if (partitionedBy.empty()) { - ythrow yexception() << "Partition by must always be specified"; + throw yexception() << "Partition by must always be specified with projection"; } if (!config) { @@ -368,14 +368,14 @@ struct TPathGenerator: public IPathGenerator { NSc::TValue json = NSc::TValue::FromJsonThrow(config, NSc::TJsonOpts::JO_PARSER_DISALLOW_DUPLICATE_KEYS | NSc::TJsonOpts::JO_SORT_KEYS); if (!json.IsDict()) { - ythrow yexception() << "Invalid projection scheme: top-level element must be a dictionary"; + throw yexception() << "Invalid projection scheme: top-level element must be a dictionary"; } TMap> projection; - for (const auto& p: json.GetDict()) { + for (const auto& p : json.GetDict()) { const auto path = GetPath(p.first); if (path.empty()) { - ythrow yexception() << "Invalid key: key should start with storage or projection, but got an empty value"; + throw yexception() << "Invalid key: key should start with storage or projection, but got an empty value"; } const TString& kind = path.front(); if (kind == "projection") { @@ -383,23 +383,23 @@ struct TPathGenerator: public IPathGenerator { } else if (kind == "storage") { AddStorage(p.first, p.second, path); } else { - ythrow yexception() << "Invalid key: key should start with storage or projection, but got " << p.first; + throw yexception() << "Invalid key: key should start with storage or projection, but got " << p.first; } } DoParseProjection(projection); DoValidateTemplate(partitionedBy); - for (const auto& config: Config.Rules) { + for (const auto& config : Config.Rules) { ColumnConfig[config.Name] = config; } } void AddProjection(const TStringBuf& key, const NSc::TValue& json, const TVector& path, TMap>& projection) { if (path.size() != 3 && path.size() != 2) { - ythrow yexception() << "The key must be three-component or two-component, but received " << key; + throw yexception() << "The key for 'projection' must be three-component or two-component, but received " << key; } if (path.size() == 2 && path[1] != "enabled") { - ythrow yexception() << "Unknown key " << key; + throw yexception() << "Unknown key " << key; } if (path.size() == 2) { @@ -412,11 +412,11 @@ struct TPathGenerator: public IPathGenerator { void AddStorage(const TStringBuf& key, const NSc::TValue& json, const TVector& path) { if (path.size() != 3) { - ythrow yexception() << "The key must be three-component, but received " << key; + throw yexception() << "The key for 'storage' must be three-component, but received " << key; } if (path[1] != "location" || path[2] != "template") { - ythrow yexception() << "The key must be storage.location.template, but received " << key; + throw yexception() << "The key for 'storage' must be storage.location.template, but received " << key; } TString locationTemplate = GetStringOrThrow(json, "The storage.location.template must be a string"); @@ -425,13 +425,12 @@ struct TPathGenerator: public IPathGenerator { Config.LocationTemplate = TStringBuilder() << locationTemplate << '/'; } - void DoParseEnumType(const TString& columnName, const TString& type, const TMap& projection) { if (!projection.contains("values")) { - ythrow yexception() << "Invalid projection scheme: values are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: values are required field for " << columnName << " " << type; } if (!projection.contains("type")) { - ythrow yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type; } for (const auto& p: projection) { if (p.first == "type") { @@ -444,37 +443,37 @@ struct TPathGenerator: public IPathGenerator { } Config.Rules.push_back(IPathGenerator::TColumnPartitioningConfig{.Type=ToType(type), .Name=columnName, .Values=std::move(values)}); } else { - ythrow yexception() << "Invalid projection scheme: enum element must include only type or values (as string) but got " << p.first; + throw yexception() << "Invalid projection scheme: enum element must include only type or values (as string) but got " << p.first; } } } void DoParseIntegerType(const TString& columnName, const TString& type, const TMap& projection) { if (!projection.contains("type")) { - ythrow yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type; } if (!projection.contains("min")) { - ythrow yexception() << "Invalid projection scheme: min are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: min are required field for " << columnName << " " << type; } if (!projection.contains("max")) { - ythrow yexception() << "Invalid projection scheme: max are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: max are required field for " << columnName << " " << type; } IPathGenerator::TColumnPartitioningConfig columnConfig; columnConfig.Name = columnName; columnConfig.Type = ToType(type); - for (const auto& p: projection) { + for (const auto& p : projection) { if (p.first == "type") { // already processed } else if (p.first == "min") { - columnConfig.Min = GetIntOrThrow(p.second, "The min must be a number"); + columnConfig.Min = GetIntOrThrow(p.second, "The min must be a number for " + type + " field"); } else if (p.first == "max") { - columnConfig.Max = GetIntOrThrow(p.second, "The max must be a number"); + columnConfig.Max = GetIntOrThrow(p.second, "The max must be a number for " + type + " field"); } else if (p.first == "interval") { columnConfig.Interval = GetIntOrThrow(p.second, "The interval must be a number"); } else if (p.first == "digits") { columnConfig.Digits = GetIntOrThrow(p.second, "The digits must be a number"); } else { - ythrow yexception() << "Invalid projection scheme: integer element must include only type, min, max, interval, digits but got " << p.first; + throw yexception() << "Invalid projection scheme: integer element must include only type, min, max, interval, digits but got " << p.first; } } Config.Rules.push_back(columnConfig); @@ -482,16 +481,16 @@ struct TPathGenerator: public IPathGenerator { void DoParseDateType(const TString& columnName, const TString& type, const TMap& projection) { if (!projection.contains("type")) { - ythrow yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type; } if (!projection.contains("min")) { - ythrow yexception() << "Invalid projection scheme: min are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: min are required field for " << columnName << " " << type; } if (!projection.contains("max")) { - ythrow yexception() << "Invalid projection scheme: max are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: max are required field for " << columnName << " " << type; } if (!projection.contains("format")) { - ythrow yexception() << "Invalid projection scheme: format are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: format are required field for " << columnName << " " << type; } IPathGenerator::TColumnPartitioningConfig columnConfig; columnConfig.Name = columnName; @@ -500,9 +499,9 @@ struct TPathGenerator: public IPathGenerator { if (p.first == "type") { // already processed } else if (p.first == "min") { - columnConfig.From = GetStringOrThrow(p.second, "The min must be a string"); + columnConfig.From = GetStringOrThrow(p.second, "The min must be a string for " + type + " field"); } else if (p.first == "max") { - columnConfig.To = GetStringOrThrow(p.second, "The max must be a string"); + columnConfig.To = GetStringOrThrow(p.second, "The max must be a string for " + type + " field"); } else if (p.first == "format") { columnConfig.Format = GetStringOrThrow(p.second, "The format must be a string"); } else if (p.first == "interval") { @@ -510,7 +509,7 @@ struct TPathGenerator: public IPathGenerator { } else if (p.first == "unit") { columnConfig.IntervalUnit = ToIntervalUnit(GetStringOrThrow(p.second, "The unit must be a string")); } else { - ythrow yexception() << "Invalid projection scheme: date element must include only type, min, max, format, interval, unit but got " << p.first; + throw yexception() << "Invalid projection scheme: date element must include only type, min, max, format, interval, unit but got " << p.first; } } Config.Rules.push_back(columnConfig); @@ -519,7 +518,7 @@ struct TPathGenerator: public IPathGenerator { void DoParseColumn(const TString& columnName, const TMap& projection) { auto it = projection.find("type"); if (it == projection.end()) { - ythrow yexception() << "Invalid projection scheme: type element must exist for the column" << columnName; + throw yexception() << "Invalid projection scheme: type element must exist for the column " << columnName; } const TString type = GetStringOrThrow(it->second, "The type must be a string for column " + columnName); @@ -530,12 +529,12 @@ struct TPathGenerator: public IPathGenerator { } else if (type == "date") { DoParseDateType(columnName, type, projection); } else { - ythrow yexception() << "Invalid projection scheme: type element should be one of enum, date, integer but got " << type; + throw yexception() << "Invalid projection scheme: type element should be one of enum, date, integer but got " << type; } } void DoParseProjection(TMap> projection) { - for (const auto& p: projection) { + for (const auto& p : projection) { DoParseColumn(p.first, p.second); } } @@ -550,27 +549,27 @@ struct TPathGenerator: public IPathGenerator { } TSet partitionedByColumns; - for (const auto& columnName: partitionedBy) { + for (const auto& columnName : partitionedBy) { partitionedByColumns.insert(columnName); if (!vars.contains(columnName)) { - ythrow yexception() << "Template " << Config.LocationTemplate << " must include ${" << columnName << "}"; + throw yexception() << "Template " << Config.LocationTemplate << " must include ${" << columnName << "} from partitioned_by columns set"; } } TSet rulesColumns; - for (const auto& rule: Config.Rules) { + for (const auto& rule : Config.Rules) { rulesColumns.insert(rule.Name); if (!vars.contains(rule.Name)) { - ythrow yexception() << "Template " << Config.LocationTemplate << " must include ${" << rule.Name << "}"; + throw yexception() << "Template " << Config.LocationTemplate << " must include ${" << rule.Name << "} from projection columns set"; } } - for (const auto& var: vars) { + for (const auto& var : vars) { if (!partitionedByColumns.contains(var)) { - ythrow yexception() << "Partitioned by column named " << var << " does not exist for template " << Config.LocationTemplate; + throw yexception() << "Partitioned by column named " << var << " does not exist for template " << Config.LocationTemplate; } if (!rulesColumns.contains(var)) { - ythrow yexception() << "Projection column named " << var << " does not exist for template " << Config.LocationTemplate; + throw yexception() << "Projection column named " << var << " does not exist for template " << Config.LocationTemplate; } } } @@ -592,7 +591,7 @@ struct TPathGenerator: public IPathGenerator { } if (Rules.empty()) { - ythrow yexception() << "The projection contains an empty set of paths"; + throw yexception() << "The projection contains an empty set of paths"; } } @@ -631,12 +630,12 @@ struct TPathGenerator: public IPathGenerator { size_t p = 0) { if (rules.size() == p) { if (result.size() == pathsLimit) { - ythrow yexception() << "The limit on the number of paths has been reached: " << result.size() << " of " << pathsLimit; + throw yexception() << "The limit on the number of paths has been reached: " << result.size() << " of " << pathsLimit; } auto pib = result.emplace(locationTemplate, columnsWithValue); if (!pib.second) { - ythrow yexception() << "Location path " << locationTemplate << " is composed by different projection value sets " << FormatColumnValues(pib.first->second) << " and " << FormatColumnValues(columnsWithValue); + throw yexception() << "Location path " << locationTemplate << " is composed by different projection value sets " << FormatColumnValues(pib.first->second) << " and " << FormatColumnValues(columnsWithValue); } return; } @@ -653,7 +652,7 @@ struct TPathGenerator: public IPathGenerator { DoGenerateDate(rules, locationTemplate, columnsWithValue, result, pathsLimit, now, p); break; default: - ythrow yexception() << "Only the enum, integer, date types are supported but got " << to_lower(ToString(rule.Type)); + throw yexception() << "Only the enum, integer, date types are supported but got " << to_lower(ToString(rule.Type)); } } @@ -731,21 +730,21 @@ struct TPathGenerator: public IPathGenerator { if (std::numeric_limits::min() <= value && value <= std::numeric_limits::max()) { return; } - ythrow yexception() << "The value " << value << " is not representable as an int32 type for column " << column; + throw yexception() << "The value " << value << " is not representable as an int32 type for column " << column; } static void CheckCastUint32(int64_t value, const TString& column) { if (value >= 0 && value <= std::numeric_limits::max()) { return; } - ythrow yexception() << "The value " << value << " is not representable as an uint32 type for column " << column; + throw yexception() << "The value " << value << " is not representable as an uint32 type for column " << column; } static void CheckCastUint64(int64_t value, const TString& column) { if (value >= 0) { return; } - ythrow yexception() << "The value " << value << " is not representable as an uint64 type for column " << column; + throw yexception() << "The value " << value << " is not representable as an uint64 type for column " << column; } }; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 9748f0c77176..7d391ebf3290 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -438,11 +438,16 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { return nullptr; } } else { - ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), "Missed key column.")); + ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), TStringBuilder() << "Missed key column '" << key->Content() << "' for partitioned by.")); return nullptr; } } + if (structType->GetSize() <= keysCount) { + ctx.AddError(TIssue(ctx.GetPosition(format.Pos()), TStringBuilder() << "Write schema contains no columns except partitioning columns.")); + return nullptr; + } + TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType(EDataSlot::Utf8)); itemTypes.front() = ctx.MakeType(ctx.MakeType(EDataSlot::String)); From d7ded066c2a08f0cf094ce0b80b2e3d25e7e0511 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Tue, 24 Jun 2025 11:20:08 +0300 Subject: [PATCH 2/4] Update ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../yql/providers/s3/path_generator/yql_s3_path_generator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp index 4a1f28264ff8..5921c360a464 100644 --- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp +++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp @@ -430,7 +430,7 @@ struct TPathGenerator: public IPathGenerator { throw yexception() << "Invalid projection scheme: values are required field for " << columnName << " " << type; } if (!projection.contains("type")) { - throw yexception() << "Invalid projection scheme: type are required field for " << columnName << " " << type; + throw yexception() << "Invalid projection scheme: type is a required field for " << columnName << " " << type; } for (const auto& p: projection) { if (p.first == "type") { From 9d5c6e48b4ca30dbd256b70cd2b1bdd4fba582e3 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Tue, 24 Jun 2025 11:27:12 +0300 Subject: [PATCH 3/4] Update ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 7d391ebf3290..0ec8cd04756a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -438,7 +438,7 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { return nullptr; } } else { - ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), TStringBuilder() << "Missed key column '" << key->Content() << "' for partitioned by.")); + ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), TStringBuilder() << "Missing key column for partitioning: '" << key->Content() << "'. Please ensure the column is included in the schema.")); return nullptr; } } From 10901e2a1f0941465801b71433bd9f37f34d494e Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 25 Jun 2025 15:02:46 +0300 Subject: [PATCH 4/4] Fixed test_raw_format_validation --- ydb/tests/fq/s3/test_insert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/tests/fq/s3/test_insert.py b/ydb/tests/fq/s3/test_insert.py index 2e43babe533a..a4466bfe2111 100644 --- a/ydb/tests/fq/s3/test_insert.py +++ b/ydb/tests/fq/s3/test_insert.py @@ -586,7 +586,7 @@ def test_raw_format_validation(self, kikimr, s3, client, unique_prefix): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.FAILED) issues = str(client.describe_query(query_id).result.query.issue) - assert "Expected data or optional of data" in issues, "Incorrect Issues: " + issues + assert "Only column with primitive type allowed for raw format" in issues, "Incorrect Issues: " + issues def get_insert_test_query(self, insert_path: str): sql = f"INSERT INTO {insert_path} WITH (FORMAT = \"parquet\") SELECT\n"