Skip to content

Commit 58d7eac

Browse files
committed
Merge branch 'rightlib' into merge-libs-250708-0050
2 parents fb95647 + 4985bee commit 58d7eac

File tree

68 files changed

+2119
-436
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2119
-436
lines changed

library/cpp/tld/tlds-alpha-by-domain.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Version 2025070400, Last Updated Fri Jul 4 07:07:01 2025 UTC
1+
# Version 2025070700, Last Updated Mon Jul 7 07:07:01 2025 UTC
22
AAA
33
AARP
44
ABB

yql/essentials/core/cbo/cbo_hints.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@ class TOptimizerHintsParser {
2929
private:
3030
void Start() {
3131
while (Pos_ < Size_) {
32-
auto hintType = Keyword({"JoinOrder", "Leading", "JoinType", "Rows"});
32+
auto hintType = Keyword({"JoinOrder", "Leading", "JoinType", "Rows", "Bytes"});
3333
if (hintType == "JoinOrder" || hintType == "Leading") {
3434
JoinOrder(hintType == "Leading");
3535
} else if (hintType == "JoinType") {
3636
JoinType();
37-
} else if (hintType == "Rows"){
38-
Rows();
37+
} else if (hintType == "Rows") {
38+
CardinalityOrBytes(true);
39+
} else if (hintType == "Bytes") {
40+
CardinalityOrBytes(false);
3941
} else {
4042
ParseError(Sprintf("Undefined hints type: %s", hintType.c_str()), Pos_ - hintType.size());
4143
}
@@ -113,7 +115,7 @@ class TOptimizerHintsParser {
113115
Y_UNREACHABLE();
114116
}
115117

116-
void Rows() {
118+
void CardinalityOrBytes(bool isRows) {
117119
i32 beginPos = Pos_ + 1;
118120

119121
Keyword({"("});
@@ -134,7 +136,11 @@ class TOptimizerHintsParser {
134136
default: {ParseError(Sprintf("Unknown operation: '%c'", sign), Pos_ - 1); Y_UNREACHABLE();}
135137
}
136138

137-
Hints_.CardinalityHints->PushBack(std::move(labels), op, value, "Rows" + Text_.substr(beginPos, Pos_ - beginPos + 1));
139+
if (isRows) {
140+
Hints_.CardinalityHints->PushBack(std::move(labels), op, value, "Rows" + Text_.substr(beginPos, Pos_ - beginPos + 1));
141+
} else {
142+
Hints_.BytesHints->PushBack(std::move(labels), op, value, "Bytes" + Text_.substr(beginPos, Pos_ - beginPos + 1));
143+
}
138144
}
139145

140146
private:

yql/essentials/core/cbo/cbo_optimizer_new.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,28 @@ TOptimizerStatistics TBaseProviderContext::ComputeJoinStatsV1(
194194
return stats;
195195
}
196196

197+
TOptimizerStatistics TBaseProviderContext::ComputeJoinStatsV2(
198+
const TOptimizerStatistics& leftStats,
199+
const TOptimizerStatistics& rightStats,
200+
const TVector<NDq::TJoinColumn>& leftJoinKeys,
201+
const TVector<NDq::TJoinColumn>& rightJoinKeys,
202+
EJoinAlgoType joinAlgo,
203+
EJoinKind joinKind,
204+
TCardinalityHints::TCardinalityHint* maybeHint,
205+
bool shuffleLeftSide,
206+
bool shuffleRightSide,
207+
TCardinalityHints::TCardinalityHint* maybeBytesHint
208+
) const {
209+
210+
auto stats = ComputeJoinStatsV1(leftStats, rightStats, leftJoinKeys, rightJoinKeys, joinAlgo, joinKind, maybeHint, shuffleLeftSide, shuffleRightSide);
211+
212+
if (maybeBytesHint) {
213+
stats.ByteSize = maybeBytesHint->ApplyHint(stats.ByteSize);
214+
}
215+
216+
return stats;
217+
}
218+
197219
/**
198220
* Compute the cost and output cardinality of a join
199221
*
@@ -338,6 +360,12 @@ TVector<TString> TOptimizerHints::GetUnappliedString() {
338360
}
339361
}
340362

363+
for (const auto& hint: BytesHints->Hints) {
364+
if (!hint.Applied) {
365+
res.push_back(hint.StringRepr);
366+
}
367+
}
368+
341369
return res;
342370
}
343371

yql/essentials/core/cbo/cbo_optimizer_new.h

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,17 +175,19 @@ struct TJoinOrderHints {
175175
};
176176

177177
struct TOptimizerHints {
178+
std::shared_ptr<TCardinalityHints> BytesHints = std::make_shared<TCardinalityHints>();
178179
std::shared_ptr<TCardinalityHints> CardinalityHints = std::make_shared<TCardinalityHints>();
179180
std::shared_ptr<TJoinAlgoHints> JoinAlgoHints = std::make_shared<TJoinAlgoHints>();
180181
std::shared_ptr<TJoinOrderHints> JoinOrderHints = std::make_shared<TJoinOrderHints>();
181182

182183
TVector<TString> GetUnappliedString();
183184

184185
/*
185-
* The function accepts string with three type of expressions: array of (JoinAlgo | Card | JoinOrder):
186+
* The function accepts string with four type of expressions: array of (JoinAlgo | Rows | Bytes | JoinOrder):
186187
* 1) JoinAlgo(t1 t2 ... tn Map | Grace | Lookup) to change join algo for join, where these labels take part
187-
* 2) Card(t1 t2 ... tn (*|/|+|-) Number) to change cardinality for join, where these labels take part or labels only
188-
* 3) JoinOrder( (t1 t2) (t3 (t4 ...)) ) - fixate this join subtree in the general join tree
188+
* 2) Rows(t1 t2 ... tn (*|/|+|-|#) Number) to change cardinality for join, where these labels take part or labels only
189+
* 3) Bytes(t1 t2 ... tn (*|/|+|-|#) Number) to change byte size for join, where these labels take part or labels only
190+
* 4) JoinOrder( (t1 t2) (t3 (t4 ...)) ) - fixate this join subtree in the general join tree
189191
*/
190192
static TOptimizerHints Parse(const TString&);
191193
};
@@ -228,6 +230,19 @@ struct IProviderContext {
228230
bool shuffleRightSide
229231
) const = 0;
230232

233+
virtual TOptimizerStatistics ComputeJoinStatsV2(
234+
const TOptimizerStatistics& leftStats,
235+
const TOptimizerStatistics& rightStats,
236+
const TVector<NDq::TJoinColumn>& leftJoinKeys,
237+
const TVector<NDq::TJoinColumn>& rightJoinKeys,
238+
EJoinAlgoType joinAlgo,
239+
EJoinKind joinKind,
240+
TCardinalityHints::TCardinalityHint* maybeHint,
241+
bool shuffleLeftSide,
242+
bool shuffleRightSide,
243+
TCardinalityHints::TCardinalityHint* maybeBytesHint
244+
) const = 0;
245+
231246
virtual bool IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
232247
const std::shared_ptr<IBaseOptimizerNode>& right,
233248
const TVector<NDq::TJoinColumn>& leftJoinKeys,
@@ -283,6 +298,19 @@ struct TBaseProviderContext : public IProviderContext {
283298
bool shuffleRightSide
284299
) const override;
285300

301+
TOptimizerStatistics ComputeJoinStatsV2(
302+
const TOptimizerStatistics& leftStats,
303+
const TOptimizerStatistics& rightStats,
304+
const TVector<NDq::TJoinColumn>& leftJoinKeys,
305+
const TVector<NDq::TJoinColumn>& rightJoinKeys,
306+
EJoinAlgoType joinAlgo,
307+
EJoinKind joinKind,
308+
TCardinalityHints::TCardinalityHint* maybeHint,
309+
bool shuffleLeftSide,
310+
bool shuffleRightSide,
311+
TCardinalityHints::TCardinalityHint* maybeBytesHint
312+
) const override;
313+
286314
static const TBaseProviderContext& Instance();
287315
};
288316

yql/essentials/minikql/datetime/datetime.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,12 @@ bool DoAddMonths(TStorage& storage, i64 months, const NUdf::IDateBuilder& builde
170170
storage.Year--;
171171
newMonth += 12;
172172
}
173-
if (storage.Year == 0) {
174-
storage.Year += months > 0 ? 1 : -1;
173+
// The minimal year value for TTMStorage is 1970, but the
174+
// check below makes coverity happy.
175+
if constexpr (!std::is_same_v<TStorage, TTMStorage>) {
176+
if (storage.Year == 0) {
177+
storage.Year += months > 0 ? 1 : -1;
178+
}
175179
}
176180
storage.Month = newMonth;
177181
bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year);
@@ -183,8 +187,12 @@ bool DoAddMonths(TStorage& storage, i64 months, const NUdf::IDateBuilder& builde
183187
template<typename TStorage>
184188
bool DoAddYears(TStorage& storage, i64 years, const NUdf::IDateBuilder& builder) {
185189
storage.Year += years;
186-
if (storage.Year == 0) {
187-
storage.Year += years > 0 ? 1 : -1;
190+
// The minimal year value for TTMStorage is 1970, but the
191+
// check below makes coverity happy.
192+
if constexpr (!std::is_same_v<TStorage, TTMStorage>) {
193+
if (storage.Year == 0) {
194+
storage.Year += years > 0 ? 1 : -1;
195+
}
188196
}
189197
if (storage.Month == 2 && storage.Day == 29) {
190198
bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year);

yt/yql/providers/yt/codec/yt_codec_job.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,10 @@ TJobMkqlReaderImpl::TJobMkqlReaderImpl(const TFile& in)
4646
{
4747
}
4848

49-
TJobMkqlWriterImpl::TJobMkqlWriterImpl(const TMkqlIOSpecs& specs, const TVector<TFile>& outHandles)
49+
TJobMkqlWriterImpl::TJobMkqlWriterImpl(const TVector<TFile>& outHandles)
5050
: NPrivate::TOutStreamsHolder(outHandles)
5151
, TMkqlWriterImpl(GetVectorOfStreams(), YQL_JOB_CODEC_BLOCK_COUNT, YQL_JOB_CODEC_BLOCK_SIZE)
5252
{
53-
SetSpecs(specs);
5453
}
5554

5655
void TJobMkqlWriterImpl::DoFinish(bool abort) {

yt/yql/providers/yt/codec/yt_codec_job.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class TJobMkqlReaderImpl: protected NPrivate::TInStreamHolder, public TMkqlReade
4242

4343
class TJobMkqlWriterImpl: protected NPrivate::TOutStreamsHolder, public TMkqlWriterImpl {
4444
public:
45-
TJobMkqlWriterImpl(const TMkqlIOSpecs& specs, const TVector<TFile>& outHandles);
45+
TJobMkqlWriterImpl(const TVector<TFile>& outHandles);
4646
~TJobMkqlWriterImpl() = default;
4747

4848
private:

yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ class TFmrCoordinator: public IFmrCoordinator {
619619
});
620620

621621
mapTaskParams.Output = fmrTableOutputRefs;
622-
mapTaskParams.Executable = mapOperationParams.Executable; // TODO - change Executable to mapper
622+
mapTaskParams.SerializedMapJobState = mapOperationParams.SerializedMapJobState;
623623
TaskParams.emplace_back(mapTaskParams);
624624
}
625625
}

yt/yql/providers/yt/fmr/job/impl/ut/ya.make

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@ UNITTEST()
22

33
SRCS(
44
yql_yt_job_ut.cpp
5+
yql_yt_map_job_ut.cpp
6+
yql_yt_raw_table_queue_ut.cpp
57
yql_yt_table_data_service_reader_ut.cpp
68
yql_yt_table_data_service_writer_ut.cpp
79
)
810

911
PEERDIR(
1012
yt/cpp/mapreduce/common
1113
yt/yql/providers/yt/fmr/job/impl
14+
yt/yql/providers/yt/fmr/process
15+
yt/yql/providers/yt/fmr/table_data_service/helpers
1216
yt/yql/providers/yt/fmr/yt_job_service/mock
13-
yt/yql/providers/yt/fmr/table_data_service/local/impl
17+
yt/yql/providers/yt/fmr/utils
1418
yql/essentials/utils/log
1519
yql/essentials/parser/pg_wrapper
1620
yql/essentials/parser/pg_wrapper/interface
@@ -23,6 +27,8 @@ PEERDIR(
2327
yt/yql/providers/yt/codec/codegen/llvm16
2428
yql/essentials/minikql/codegen/llvm16
2529
yql/essentials/minikql/computation/llvm16
30+
yql/essentials/minikql/comp_nodes/llvm16
31+
yt/yql/providers/yt/comp_nodes/llvm16
2632
)
2733

2834
YQL_LAST_ABI_VERSION()

yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
#include <library/cpp/testing/unittest/registar.h>
2-
#include <yt/cpp/mapreduce/common/helpers.h>
1+
#include "yql_yt_job_ut.h"
32
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
43
#include <yt/yql/providers/yt/fmr/table_data_service/local/impl/yql_yt_table_data_service_local.h>
54
#include <yt/yql/providers/yt/fmr/utils/yql_yt_table_data_service_key.h>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#pragma once
2+
3+
#include <library/cpp/testing/unittest/registar.h>
4+
#include <yt/cpp/mapreduce/common/helpers.h>
5+
6+
namespace NYql::NFmr {
7+
// helper functions for yson reformatting
8+
9+
TString GetBinaryYson(const TString& textYsonContent);
10+
11+
TString GetTextYson(const TString& binaryYsonContent);
12+
13+
} // namespace NYql::NFmr
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
#include "yql_yt_job_ut.h"
2+
#include <util/string/split.h>
3+
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
4+
#include <yql/essentials/minikql/mkql_node_printer.h>
5+
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
6+
#include <yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h>
7+
#include <yt/yql/providers/yt/fmr/table_data_service/helpers/yql_yt_table_data_service_helpers.h>
8+
9+
using namespace NKikimr::NMiniKQL;
10+
11+
namespace NYql::NFmr {
12+
13+
TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n"
14+
"{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n";
15+
16+
Y_UNIT_TEST_SUITE(MapTests) {
17+
Y_UNIT_TEST(RunMapJob) {
18+
TFmrUserJob mapJob;
19+
20+
TTempFileHandle tableDataServiceHostsFile;
21+
ui16 port = 2345;
22+
auto tableDataServiceServer = MakeTableDataServiceServer(port);
23+
std::vector<TTableDataServiceServerConnection> connections{{.Host = "localhost", .Port = port}};
24+
WriteHostsToFile(tableDataServiceHostsFile, 1, connections);
25+
26+
TTempFileHandle inputYsonContentFile{};
27+
TFileOutput fileWriter(inputYsonContentFile.Name());
28+
fileWriter.Write(inputYsonContent.data(), inputYsonContent.size());
29+
fileWriter.Flush();
30+
31+
TYtTableTaskRef fileTask{.FilePaths = {inputYsonContentFile.Name()}};
32+
TFmrTableOutputRef fmrOutputRef{.TableId = "table_id", .PartId = "part_id"};
33+
TTaskTableRef taskTableRef(fileTask);
34+
TMapTaskParams mapTaskParams{
35+
.Input = TTaskTableInputRef{.Inputs ={taskTableRef}},
36+
.Output = {fmrOutputRef}
37+
};
38+
FillMapFmrJob(mapJob, mapTaskParams, {}, tableDataServiceHostsFile.Name(), true);
39+
40+
{
41+
auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry());
42+
TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),functionRegistry->SupportsSizedAllocators());
43+
alloc.Ref().ForcefullySetMemoryYellowZone(false);
44+
45+
TTypeEnvironment env(alloc);
46+
TProgramBuilder pgmBuilder(env, *functionRegistry);
47+
48+
const auto structType = pgmBuilder.NewStructType({
49+
{"key", pgmBuilder.NewDataType(NUdf::EDataSlot::String)},
50+
{"subkey", pgmBuilder.NewDataType(NUdf::EDataSlot::String)},
51+
{"value", pgmBuilder.NewDataType(NUdf::EDataSlot::String)}
52+
});
53+
54+
auto dataType = pgmBuilder.NewFlowType(structType);
55+
TCallableBuilder inputCallableBuilder(env, "FmrInputJob", dataType);
56+
auto inputNode = TRuntimeNode(inputCallableBuilder.Build(), false);
57+
58+
const auto prefix = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>("prefix_");
59+
const auto map = pgmBuilder.Map(inputNode,
60+
[&](TRuntimeNode item) {
61+
TRuntimeNode prefixKey = pgmBuilder.Concat(prefix, pgmBuilder.Member(item, "key"));
62+
TRuntimeNode subkey = pgmBuilder.Member(item, "subkey");
63+
TRuntimeNode value = pgmBuilder.Member(item, "value");
64+
return pgmBuilder.NewStruct(structType, {{"key", prefixKey}, {"subkey", subkey}, {"value", value}});
65+
});
66+
67+
TCallableBuilder outputCallableBuilder(env, "FmrOutputJob", dataType);
68+
outputCallableBuilder.Add(map);
69+
auto outputNode = TRuntimeNode(outputCallableBuilder.Build(), false);
70+
auto pgmReturn = pgmBuilder.Discard(outputNode);
71+
72+
TString serializedMapLambda = SerializeRuntimeNode(pgmReturn, env);
73+
mapJob.SetLambdaCode(serializedMapLambda);
74+
75+
const TString spec = R"({
76+
tables = [{
77+
"_yql_row_spec" = {
78+
"Type" = [
79+
"StructType"; [
80+
["key"; ["DataType"; "String"]];
81+
["subkey"; ["DataType"; "String"]];
82+
["value"; ["DataType"; "String"]];
83+
]
84+
]
85+
}
86+
}]
87+
})";
88+
const TString type = R"(
89+
[
90+
"StructType";
91+
[
92+
["key"; ["DataType"; "String"]];
93+
["subkey"; ["DataType"; "String"]];
94+
["value"; ["DataType"; "String"]];
95+
]
96+
])";
97+
mapJob.SetInputSpec(spec);
98+
mapJob.SetOutSpec(spec);
99+
mapJob.SetInputType(type);
100+
}
101+
102+
mapJob.DoFmrJob();
103+
104+
// Checking correctness
105+
auto tableDataServiceClient = MakeTableDataServiceClient(port);
106+
TString key = "table_id_part_id:0";
107+
auto gottenBinaryTableContent = tableDataServiceClient->Get(key).GetValueSync();
108+
UNIT_ASSERT(gottenBinaryTableContent);
109+
110+
// Reformating data
111+
auto textTableContent = GetTextYson(*gottenBinaryTableContent);
112+
THashMap<TString, std::vector<TString>> expectedFormattedContent{
113+
{"key", {"prefix_075", "prefix_800"}},
114+
{"subkey", {"1", "2"}},
115+
{"value", {"abc", "ddd"}}
116+
};
117+
THashMap<TString, std::vector<TString>> gottenFormattedContent;
118+
119+
std::vector<TString> splittedTableContent;
120+
StringSplitter(textTableContent).SplitBySet("\n\";{}=").SkipEmpty().Collect(&splittedTableContent);
121+
122+
for (ui64 i = 0; i < splittedTableContent.size(); i += 2) {
123+
TString columnKey = splittedTableContent[i], columnValue = splittedTableContent[i + 1];
124+
gottenFormattedContent[columnKey].emplace_back(columnValue);
125+
}
126+
UNIT_ASSERT_VALUES_EQUAL(expectedFormattedContent, gottenFormattedContent);
127+
}
128+
}
129+
130+
} // namespace NYql::NFmr

0 commit comments

Comments
 (0)