Skip to content

Commit 575a810

Browse files
authored
Replay GetTableRange (#8760)
1 parent f50c0a5 commit 575a810

File tree

6 files changed

+149
-52
lines changed

6 files changed

+149
-52
lines changed

ydb/library/yql/ast/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ SRCS(
2424
)
2525

2626
PEERDIR(
27+
contrib/libs/openssl
2728
library/cpp/colorizer
2829
library/cpp/containers/sorted_vector
2930
library/cpp/containers/stack_vector

ydb/library/yql/ast/yql_expr.cpp

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include <util/digest/numeric.h>
2020
#include <util/string/cast.h>
2121

22+
#include <openssl/sha.h>
23+
2224
#include <map>
2325
#include <unordered_set>
2426

@@ -3667,6 +3669,53 @@ bool CompareExprTreeParts(const TExprNode& one, const TExprNode& two, const TNod
36673669
return CompareExpressions(l, r, map, level, visited);
36683670
}
36693671

3672+
class TCacheKeyBuilder {
3673+
public:
3674+
TString Process(const TExprNode& root) {
3675+
SHA256_Init(&Sha);
3676+
unsigned char hash[SHA256_DIGEST_LENGTH];
3677+
Visit(root);
3678+
SHA256_Final(hash, &Sha);
3679+
return TString((const char*)hash, sizeof(hash));
3680+
}
3681+
3682+
private:
3683+
void Visit(const TExprNode& node) {
3684+
auto [it, inserted] = Visited.emplace(&node, Visited.size());
3685+
SHA256_Update(&Sha, &it->second, sizeof(it->second));
3686+
if (!inserted) {
3687+
return;
3688+
}
3689+
3690+
ui32 type = node.Type();
3691+
SHA256_Update(&Sha, &type, sizeof(type));
3692+
if (node.Type() == TExprNode::EType::Atom || node.Type() == TExprNode::EType::Callable) {
3693+
ui32 textLen = node.Content().size();
3694+
SHA256_Update(&Sha, &textLen, sizeof(textLen));
3695+
SHA256_Update(&Sha, node.Content().Data(), textLen);
3696+
}
3697+
3698+
if (node.Type() == TExprNode::EType::Atom || node.Type() == TExprNode::EType::Argument || node.Type() == TExprNode::EType::World) {
3699+
return;
3700+
}
3701+
3702+
ui32 len = node.ChildrenSize();
3703+
SHA256_Update(&Sha, &len, sizeof(len));
3704+
for (const auto& child : node.Children()) {
3705+
Visit(*child);
3706+
}
3707+
}
3708+
3709+
private:
3710+
SHA256_CTX Sha;
3711+
TNodeMap<ui64> Visited;
3712+
};
3713+
3714+
TString MakeCacheKey(const TExprNode& root) {
3715+
TCacheKeyBuilder builder;
3716+
return builder.Process(root);
3717+
}
3718+
36703719
void GatherParents(const TExprNode& node, TParentsMap& parentsMap) {
36713720
parentsMap.clear();
36723721
TNodeSet visisted;

ydb/library/yql/ast/yql_expr.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2837,6 +2837,8 @@ bool CompareExprTrees(const TExprNode*& one, const TExprNode*& two);
28372837
28382838
bool CompareExprTreeParts(const TExprNode& one, const TExprNode& two, const TNodeMap<ui32>& argsMap);
28392839
2840+
TString MakeCacheKey(const TExprNode& root);
2841+
28402842
void GatherParents(const TExprNode& node, TParentsMap& parentsMap);
28412843
28422844
struct TConvertToAstSettings {

ydb/library/yql/core/services/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ SRCS(
1616
)
1717

1818
PEERDIR(
19-
contrib/libs/openssl
2019
library/cpp/string_utils/base64
2120
library/cpp/yson
2221
ydb/library/yql/ast/serialize

ydb/library/yql/core/services/yql_eval_expr.cpp

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
#include <library/cpp/yson/node/node_io.h>
1818
#include <library/cpp/string_utils/base64/base64.h>
1919

20-
#include <openssl/sha.h>
21-
2220
#include <util/string/builder.h>
2321

2422
namespace NYql {
@@ -45,53 +43,6 @@ static THashSet<TStringBuf> SubqueryExpandFuncs = {
4543
TStringBuf("SubqueryAssumeOrderBy")
4644
};
4745

48-
class TCacheKeyBuilder {
49-
public:
50-
TString Process(const TExprNode& root) {
51-
SHA256_Init(&Sha);
52-
unsigned char hash[SHA256_DIGEST_LENGTH];
53-
Visit(root);
54-
SHA256_Final(hash, &Sha);
55-
return TString((const char*)hash, sizeof(hash));
56-
}
57-
58-
private:
59-
void Visit(const TExprNode& node) {
60-
auto [it, inserted] = Visited.emplace(&node, Visited.size());
61-
SHA256_Update(&Sha, &it->second, sizeof(it->second));
62-
if (!inserted) {
63-
return;
64-
}
65-
66-
ui32 type = node.Type();
67-
SHA256_Update(&Sha, &type, sizeof(type));
68-
if (node.Type() == TExprNode::EType::Atom || node.Type() == TExprNode::EType::Callable) {
69-
ui32 textLen = node.Content().size();
70-
SHA256_Update(&Sha, &textLen, sizeof(textLen));
71-
SHA256_Update(&Sha, node.Content().Data(), textLen);
72-
}
73-
74-
if (node.Type() == TExprNode::EType::Atom || node.Type() == TExprNode::EType::Argument || node.Type() == TExprNode::EType::World) {
75-
return;
76-
}
77-
78-
ui32 len = node.ChildrenSize();
79-
SHA256_Update(&Sha, &len, sizeof(len));
80-
for (const auto& child : node.Children()) {
81-
Visit(*child);
82-
}
83-
}
84-
85-
private:
86-
SHA256_CTX Sha;
87-
TNodeMap<ui64> Visited;
88-
};
89-
90-
TString MakeCacheKey(const TExprNode& root) {
91-
TCacheKeyBuilder builder;
92-
return builder.Process(root);
93-
}
94-
9546
bool CheckPendingArgs(const TExprNode& root, TNodeSet& visited, TNodeMap<const TExprNode*>& activeArgs, const TNodeMap<ui32>& externalWorlds, TExprContext& ctx,
9647
bool underTypeOf, bool& hasUnresolvedTypes) {
9748
if (!visited.emplace(&root).second) {

ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ namespace {
2020

2121
const TString YtGateway_CanonizePaths = "YtGateway_CanonizePaths";
2222
const TString YtGateway_GetTableInfo = "YtGateway_GetTableInfo";
23+
const TString YtGateway_GetTableRange = "YtGateway_GetTableRange";
2324
const TString YtGateway_GetFolder = "YtGateway_GetFolder";
2425
const TString YtGateway_GetFolders = "YtGateway_GetFolders";
2526
const TString YtGateway_ResolveLinks = "YtGateway_ResolveLinks";
@@ -300,12 +301,106 @@ class TGateway : public IYtGateway {
300301
});
301302
}
302303

304+
static TString MakeGetTableRangeKey(const TTableRangeOptions& options) {
305+
auto keyNode = NYT::TNode()
306+
("Cluster", options.Cluster())
307+
("Prefix", options.Prefix())
308+
("Suffix", options.Suffix());
309+
310+
if (options.Filter()) {
311+
keyNode("Filter", MakeCacheKey(*options.Filter()));
312+
}
313+
314+
return MakeHash(NYT::NodeToCanonicalYsonString(keyNode, NYT::NYson::EYsonFormat::Binary));
315+
}
316+
303317
NThreading::TFuture<TTableRangeResult> GetTableRange(TTableRangeOptions&& options) final {
304318
if (QContext_.CanRead()) {
305-
throw yexception() << "Can't replay GetTableRange";
319+
TTableRangeResult res;
320+
res.SetSuccess();
321+
auto key = MakeGetTableRangeKey(options);
322+
auto item = QContext_.GetReader()->Get({YtGateway_GetTableRange, key}).GetValueSync();
323+
if (!item) {
324+
throw yexception() << "Missing replay data";
325+
}
326+
327+
auto listNode = NYT::NodeFromYsonString(item->Value);
328+
for (const auto& valueNode : listNode.AsList()) {
329+
TCanonizedPath p;
330+
p.Path = valueNode["Path"].AsString();
331+
if (valueNode.HasKey("Columns")) {
332+
p.Columns.ConstructInPlace();
333+
for (const auto& c : valueNode["Columns"].AsList()) {
334+
p.Columns->push_back(c.AsString());
335+
}
336+
}
337+
338+
if (valueNode.HasKey("Ranges")) {
339+
p.Ranges.ConstructInPlace();
340+
for (const auto& r : valueNode["Ranges"].AsString()) {
341+
NYT::TReadRange range;
342+
NYT::Deserialize(range, r);
343+
p.Ranges->push_back(range);
344+
}
345+
}
346+
347+
if (valueNode.HasKey("AdditionalAttributes")) {
348+
p.AdditionalAttributes = valueNode["AdditionalAttributes"].AsString();
349+
}
350+
351+
res.Tables.push_back(p);
352+
}
353+
354+
return NThreading::MakeFuture<TTableRangeResult>(res);
306355
}
307356

308-
return Inner_->GetTableRange(std::move(options));
357+
auto optionsDup = options;
358+
return Inner_->GetTableRange(std::move(options))
359+
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TTableRangeResult>& future) {
360+
if (!qContext.CanWrite() || future.HasException()) {
361+
return;
362+
}
363+
364+
const auto& res = future.GetValueSync();
365+
if (!res.Success()) {
366+
return;
367+
}
368+
369+
const auto& key = MakeGetTableRangeKey(optionsDup);
370+
auto listNode = NYT::TNode::CreateList();
371+
for (const auto& t : res.Tables) {
372+
listNode.Add();
373+
auto& valueNode = listNode.AsList().back();
374+
valueNode("Path", t.Path);
375+
if (t.Columns) {
376+
NYT::TNode columnsNode = NYT::TNode::CreateList();
377+
for (const auto& c : *t.Columns) {
378+
columnsNode.Add(NYT::TNode(c));
379+
}
380+
381+
valueNode("Columns", columnsNode);
382+
}
383+
384+
if (t.Ranges) {
385+
NYT::TNode rangesNode = NYT::TNode::CreateList();
386+
for (const auto& r : *t.Ranges) {
387+
NYT::TNode rangeNode;
388+
NYT::TNodeBuilder builder(&rangeNode);
389+
NYT::Serialize(r, &builder);
390+
rangesNode.Add(rangeNode);
391+
}
392+
393+
valueNode("Ranges", rangesNode);
394+
}
395+
396+
if (t.AdditionalAttributes) {
397+
valueNode("AdditionalAttributes", NYT::TNode(*t.AdditionalAttributes));
398+
}
399+
}
400+
401+
auto value = NYT::NodeToYsonString(listNode, NYT::NYson::EYsonFormat::Binary);
402+
qContext.GetWriter()->Put({YtGateway_GetTableRange, key}, value).GetValueSync();
403+
});
309404
}
310405

311406
static TString MakeGetFolderKey(const TFolderOptions& options) {

0 commit comments

Comments
 (0)