Skip to content

Library import 250708-0050 #20760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion library/cpp/tld/tlds-alpha-by-domain.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Version 2025070400, Last Updated Fri Jul 4 07:07:01 2025 UTC
# Version 2025070700, Last Updated Mon Jul 7 07:07:01 2025 UTC
AAA
AARP
ABB
Expand Down
2 changes: 1 addition & 1 deletion ydb/ci/rightlib.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
66c697edffda3020e43c59bd99a7194bd82e7c01
4985bee7f78371eae4f89d2d010143ab71912a8e
16 changes: 11 additions & 5 deletions yql/essentials/core/cbo/cbo_hints.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ class TOptimizerHintsParser {
private:
void Start() {
while (Pos_ < Size_) {
auto hintType = Keyword({"JoinOrder", "Leading", "JoinType", "Rows"});
auto hintType = Keyword({"JoinOrder", "Leading", "JoinType", "Rows", "Bytes"});
if (hintType == "JoinOrder" || hintType == "Leading") {
JoinOrder(hintType == "Leading");
} else if (hintType == "JoinType") {
JoinType();
} else if (hintType == "Rows"){
Rows();
} else if (hintType == "Rows") {
CardinalityOrBytes(true);
} else if (hintType == "Bytes") {
CardinalityOrBytes(false);
} else {
ParseError(Sprintf("Undefined hints type: %s", hintType.c_str()), Pos_ - hintType.size());
}
Expand Down Expand Up @@ -113,7 +115,7 @@ class TOptimizerHintsParser {
Y_UNREACHABLE();
}

void Rows() {
void CardinalityOrBytes(bool isRows) {
i32 beginPos = Pos_ + 1;

Keyword({"("});
Expand All @@ -134,7 +136,11 @@ class TOptimizerHintsParser {
default: {ParseError(Sprintf("Unknown operation: '%c'", sign), Pos_ - 1); Y_UNREACHABLE();}
}

Hints_.CardinalityHints->PushBack(std::move(labels), op, value, "Rows" + Text_.substr(beginPos, Pos_ - beginPos + 1));
if (isRows) {
Hints_.CardinalityHints->PushBack(std::move(labels), op, value, "Rows" + Text_.substr(beginPos, Pos_ - beginPos + 1));
} else {
Hints_.BytesHints->PushBack(std::move(labels), op, value, "Bytes" + Text_.substr(beginPos, Pos_ - beginPos + 1));
}
}

private:
Expand Down
28 changes: 28 additions & 0 deletions yql/essentials/core/cbo/cbo_optimizer_new.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,28 @@ TOptimizerStatistics TBaseProviderContext::ComputeJoinStatsV1(
return stats;
}

TOptimizerStatistics TBaseProviderContext::ComputeJoinStatsV2(
const TOptimizerStatistics& leftStats,
const TOptimizerStatistics& rightStats,
const TVector<NDq::TJoinColumn>& leftJoinKeys,
const TVector<NDq::TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind,
TCardinalityHints::TCardinalityHint* maybeHint,
bool shuffleLeftSide,
bool shuffleRightSide,
TCardinalityHints::TCardinalityHint* maybeBytesHint
) const {

auto stats = ComputeJoinStatsV1(leftStats, rightStats, leftJoinKeys, rightJoinKeys, joinAlgo, joinKind, maybeHint, shuffleLeftSide, shuffleRightSide);

if (maybeBytesHint) {
stats.ByteSize = maybeBytesHint->ApplyHint(stats.ByteSize);
}

return stats;
}

/**
* Compute the cost and output cardinality of a join
*
Expand Down Expand Up @@ -338,6 +360,12 @@ TVector<TString> TOptimizerHints::GetUnappliedString() {
}
}

for (const auto& hint: BytesHints->Hints) {
if (!hint.Applied) {
res.push_back(hint.StringRepr);
}
}

return res;
}

Expand Down
34 changes: 31 additions & 3 deletions yql/essentials/core/cbo/cbo_optimizer_new.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,19 @@ struct TJoinOrderHints {
};

struct TOptimizerHints {
std::shared_ptr<TCardinalityHints> BytesHints = std::make_shared<TCardinalityHints>();
std::shared_ptr<TCardinalityHints> CardinalityHints = std::make_shared<TCardinalityHints>();
std::shared_ptr<TJoinAlgoHints> JoinAlgoHints = std::make_shared<TJoinAlgoHints>();
std::shared_ptr<TJoinOrderHints> JoinOrderHints = std::make_shared<TJoinOrderHints>();

TVector<TString> GetUnappliedString();

/*
* The function accepts string with three type of expressions: array of (JoinAlgo | Card | JoinOrder):
* The function accepts string with four type of expressions: array of (JoinAlgo | Rows | Bytes | JoinOrder):
* 1) JoinAlgo(t1 t2 ... tn Map | Grace | Lookup) to change join algo for join, where these labels take part
* 2) Card(t1 t2 ... tn (*|/|+|-) Number) to change cardinality for join, where these labels take part or labels only
* 3) JoinOrder( (t1 t2) (t3 (t4 ...)) ) - fixate this join subtree in the general join tree
* 2) Rows(t1 t2 ... tn (*|/|+|-|#) Number) to change cardinality for join, where these labels take part or labels only
* 3) Bytes(t1 t2 ... tn (*|/|+|-|#) Number) to change byte size for join, where these labels take part or labels only
* 4) JoinOrder( (t1 t2) (t3 (t4 ...)) ) - fixate this join subtree in the general join tree
*/
static TOptimizerHints Parse(const TString&);
};
Expand Down Expand Up @@ -228,6 +230,19 @@ struct IProviderContext {
bool shuffleRightSide
) const = 0;

virtual TOptimizerStatistics ComputeJoinStatsV2(
const TOptimizerStatistics& leftStats,
const TOptimizerStatistics& rightStats,
const TVector<NDq::TJoinColumn>& leftJoinKeys,
const TVector<NDq::TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind,
TCardinalityHints::TCardinalityHint* maybeHint,
bool shuffleLeftSide,
bool shuffleRightSide,
TCardinalityHints::TCardinalityHint* maybeBytesHint
) const = 0;

virtual bool IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const TVector<NDq::TJoinColumn>& leftJoinKeys,
Expand Down Expand Up @@ -283,6 +298,19 @@ struct TBaseProviderContext : public IProviderContext {
bool shuffleRightSide
) const override;

TOptimizerStatistics ComputeJoinStatsV2(
const TOptimizerStatistics& leftStats,
const TOptimizerStatistics& rightStats,
const TVector<NDq::TJoinColumn>& leftJoinKeys,
const TVector<NDq::TJoinColumn>& rightJoinKeys,
EJoinAlgoType joinAlgo,
EJoinKind joinKind,
TCardinalityHints::TCardinalityHint* maybeHint,
bool shuffleLeftSide,
bool shuffleRightSide,
TCardinalityHints::TCardinalityHint* maybeBytesHint
) const override;

static const TBaseProviderContext& Instance();
};

Expand Down
16 changes: 12 additions & 4 deletions yql/essentials/minikql/datetime/datetime.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,12 @@ bool DoAddMonths(TStorage& storage, i64 months, const NUdf::IDateBuilder& builde
storage.Year--;
newMonth += 12;
}
if (storage.Year == 0) {
storage.Year += months > 0 ? 1 : -1;
// The minimal year value for TTMStorage is 1970, but the
// check below makes coverity happy.
if constexpr (!std::is_same_v<TStorage, TTMStorage>) {
if (storage.Year == 0) {
storage.Year += months > 0 ? 1 : -1;
}
}
storage.Month = newMonth;
bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year);
Expand All @@ -183,8 +187,12 @@ bool DoAddMonths(TStorage& storage, i64 months, const NUdf::IDateBuilder& builde
template<typename TStorage>
bool DoAddYears(TStorage& storage, i64 years, const NUdf::IDateBuilder& builder) {
storage.Year += years;
if (storage.Year == 0) {
storage.Year += years > 0 ? 1 : -1;
// The minimal year value for TTMStorage is 1970, but the
// check below makes coverity happy.
if constexpr (!std::is_same_v<TStorage, TTMStorage>) {
if (storage.Year == 0) {
storage.Year += years > 0 ? 1 : -1;
}
}
if (storage.Month == 2 && storage.Day == 29) {
bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year);
Expand Down
3 changes: 1 addition & 2 deletions yt/yql/providers/yt/codec/yt_codec_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ TJobMkqlReaderImpl::TJobMkqlReaderImpl(const TFile& in)
{
}

TJobMkqlWriterImpl::TJobMkqlWriterImpl(const TMkqlIOSpecs& specs, const TVector<TFile>& outHandles)
TJobMkqlWriterImpl::TJobMkqlWriterImpl(const TVector<TFile>& outHandles)
: NPrivate::TOutStreamsHolder(outHandles)
, TMkqlWriterImpl(GetVectorOfStreams(), YQL_JOB_CODEC_BLOCK_COUNT, YQL_JOB_CODEC_BLOCK_SIZE)
{
SetSpecs(specs);
}

void TJobMkqlWriterImpl::DoFinish(bool abort) {
Expand Down
2 changes: 1 addition & 1 deletion yt/yql/providers/yt/codec/yt_codec_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TJobMkqlReaderImpl: protected NPrivate::TInStreamHolder, public TMkqlReade

class TJobMkqlWriterImpl: protected NPrivate::TOutStreamsHolder, public TMkqlWriterImpl {
public:
TJobMkqlWriterImpl(const TMkqlIOSpecs& specs, const TVector<TFile>& outHandles);
TJobMkqlWriterImpl(const TVector<TFile>& outHandles);
~TJobMkqlWriterImpl() = default;

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ class TFmrCoordinator: public IFmrCoordinator {
});

mapTaskParams.Output = fmrTableOutputRefs;
mapTaskParams.Executable = mapOperationParams.Executable; // TODO - change Executable to mapper
mapTaskParams.SerializedMapJobState = mapOperationParams.SerializedMapJobState;
TaskParams.emplace_back(mapTaskParams);
}
}
Expand Down
8 changes: 7 additions & 1 deletion yt/yql/providers/yt/fmr/job/impl/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ UNITTEST()

SRCS(
yql_yt_job_ut.cpp
yql_yt_map_job_ut.cpp
yql_yt_raw_table_queue_ut.cpp
yql_yt_table_data_service_reader_ut.cpp
yql_yt_table_data_service_writer_ut.cpp
)

PEERDIR(
yt/cpp/mapreduce/common
yt/yql/providers/yt/fmr/job/impl
yt/yql/providers/yt/fmr/process
yt/yql/providers/yt/fmr/table_data_service/helpers
yt/yql/providers/yt/fmr/yt_job_service/mock
yt/yql/providers/yt/fmr/table_data_service/local/impl
yt/yql/providers/yt/fmr/utils
yql/essentials/utils/log
yql/essentials/parser/pg_wrapper
yql/essentials/parser/pg_wrapper/interface
Expand All @@ -23,6 +27,8 @@ PEERDIR(
yt/yql/providers/yt/codec/codegen/llvm16
yql/essentials/minikql/codegen/llvm16
yql/essentials/minikql/computation/llvm16
yql/essentials/minikql/comp_nodes/llvm16
yt/yql/providers/yt/comp_nodes/llvm16
)

YQL_LAST_ABI_VERSION()
Expand Down
3 changes: 1 addition & 2 deletions yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include <library/cpp/testing/unittest/registar.h>
#include <yt/cpp/mapreduce/common/helpers.h>
#include "yql_yt_job_ut.h"
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
#include <yt/yql/providers/yt/fmr/table_data_service/local/impl/yql_yt_table_data_service_local.h>
#include <yt/yql/providers/yt/fmr/utils/yql_yt_table_data_service_key.h>
Expand Down
13 changes: 13 additions & 0 deletions yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

#include <library/cpp/testing/unittest/registar.h>
#include <yt/cpp/mapreduce/common/helpers.h>

namespace NYql::NFmr {
// helper functions for yson reformatting

TString GetBinaryYson(const TString& textYsonContent);

TString GetTextYson(const TString& binaryYsonContent);

} // namespace NYql::NFmr
130 changes: 130 additions & 0 deletions yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_map_job_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#include "yql_yt_job_ut.h"
#include <util/string/split.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
#include <yql/essentials/minikql/mkql_node_printer.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
#include <yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h>
#include <yt/yql/providers/yt/fmr/table_data_service/helpers/yql_yt_table_data_service_helpers.h>

using namespace NKikimr::NMiniKQL;

namespace NYql::NFmr {

TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n"
"{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n";

Y_UNIT_TEST_SUITE(MapTests) {
Y_UNIT_TEST(RunMapJob) {
TFmrUserJob mapJob;

TTempFileHandle tableDataServiceHostsFile;
ui16 port = 2345;
auto tableDataServiceServer = MakeTableDataServiceServer(port);
std::vector<TTableDataServiceServerConnection> connections{{.Host = "localhost", .Port = port}};
WriteHostsToFile(tableDataServiceHostsFile, 1, connections);

TTempFileHandle inputYsonContentFile{};
TFileOutput fileWriter(inputYsonContentFile.Name());
fileWriter.Write(inputYsonContent.data(), inputYsonContent.size());
fileWriter.Flush();

TYtTableTaskRef fileTask{.FilePaths = {inputYsonContentFile.Name()}};
TFmrTableOutputRef fmrOutputRef{.TableId = "table_id", .PartId = "part_id"};
TTaskTableRef taskTableRef(fileTask);
TMapTaskParams mapTaskParams{
.Input = TTaskTableInputRef{.Inputs ={taskTableRef}},
.Output = {fmrOutputRef}
};
FillMapFmrJob(mapJob, mapTaskParams, {}, tableDataServiceHostsFile.Name(), true);

{
auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry());
TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),functionRegistry->SupportsSizedAllocators());
alloc.Ref().ForcefullySetMemoryYellowZone(false);

TTypeEnvironment env(alloc);
TProgramBuilder pgmBuilder(env, *functionRegistry);

const auto structType = pgmBuilder.NewStructType({
{"key", pgmBuilder.NewDataType(NUdf::EDataSlot::String)},
{"subkey", pgmBuilder.NewDataType(NUdf::EDataSlot::String)},
{"value", pgmBuilder.NewDataType(NUdf::EDataSlot::String)}
});

auto dataType = pgmBuilder.NewFlowType(structType);
TCallableBuilder inputCallableBuilder(env, "FmrInputJob", dataType);
auto inputNode = TRuntimeNode(inputCallableBuilder.Build(), false);

const auto prefix = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>("prefix_");
const auto map = pgmBuilder.Map(inputNode,
[&](TRuntimeNode item) {
TRuntimeNode prefixKey = pgmBuilder.Concat(prefix, pgmBuilder.Member(item, "key"));
TRuntimeNode subkey = pgmBuilder.Member(item, "subkey");
TRuntimeNode value = pgmBuilder.Member(item, "value");
return pgmBuilder.NewStruct(structType, {{"key", prefixKey}, {"subkey", subkey}, {"value", value}});
});

TCallableBuilder outputCallableBuilder(env, "FmrOutputJob", dataType);
outputCallableBuilder.Add(map);
auto outputNode = TRuntimeNode(outputCallableBuilder.Build(), false);
auto pgmReturn = pgmBuilder.Discard(outputNode);

TString serializedMapLambda = SerializeRuntimeNode(pgmReturn, env);
mapJob.SetLambdaCode(serializedMapLambda);

const TString spec = R"({
tables = [{
"_yql_row_spec" = {
"Type" = [
"StructType"; [
["key"; ["DataType"; "String"]];
["subkey"; ["DataType"; "String"]];
["value"; ["DataType"; "String"]];
]
]
}
}]
})";
const TString type = R"(
[
"StructType";
[
["key"; ["DataType"; "String"]];
["subkey"; ["DataType"; "String"]];
["value"; ["DataType"; "String"]];
]
])";
mapJob.SetInputSpec(spec);
mapJob.SetOutSpec(spec);
mapJob.SetInputType(type);
}

mapJob.DoFmrJob();

// Checking correctness
auto tableDataServiceClient = MakeTableDataServiceClient(port);
TString key = "table_id_part_id:0";
auto gottenBinaryTableContent = tableDataServiceClient->Get(key).GetValueSync();
UNIT_ASSERT(gottenBinaryTableContent);

// Reformating data
auto textTableContent = GetTextYson(*gottenBinaryTableContent);
THashMap<TString, std::vector<TString>> expectedFormattedContent{
{"key", {"prefix_075", "prefix_800"}},
{"subkey", {"1", "2"}},
{"value", {"abc", "ddd"}}
};
THashMap<TString, std::vector<TString>> gottenFormattedContent;

std::vector<TString> splittedTableContent;
StringSplitter(textTableContent).SplitBySet("\n\";{}=").SkipEmpty().Collect(&splittedTableContent);

for (ui64 i = 0; i < splittedTableContent.size(); i += 2) {
TString columnKey = splittedTableContent[i], columnValue = splittedTableContent[i + 1];
gottenFormattedContent[columnKey].emplace_back(columnValue);
}
UNIT_ASSERT_VALUES_EQUAL(expectedFormattedContent, gottenFormattedContent);
}
}

} // namespace NYql::NFmr
Loading
Loading