Skip to content

Commit bb0b938

Browse files
committed
Intermediate changes
commit_hash:70eea6b3e17f4aafae5eadb49724ba9036a55372
1 parent 6d759dd commit bb0b938

File tree

10 files changed

+188
-21
lines changed

10 files changed

+188
-21
lines changed

yql/essentials/sql/v1/complete/name/cache/cache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace NSQLComplete {
3030
using TPtr = TIntrusivePtr<ICache>;
3131

3232
struct TEntry {
33-
TValue Value = {};
33+
TMaybe<TValue> Value = Nothing();
3434
bool IsExpired = true;
3535
};
3636

yql/essentials/sql/v1/complete/name/cache/cached.h

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,47 @@ namespace NSQLComplete {
1616
}
1717

1818
NThreading::TFuture<TValue> operator()(TKey key) const {
19-
return Cache_->Get(key).Apply([cache = Cache_,
20-
query = Query_,
21-
key = std::move(key)](auto f) {
22-
typename ICache<TKey, TValue>::TEntry entry = f.ExtractValue();
23-
if (entry.IsExpired) {
24-
query(key).Apply([cache, key = std::move(key)](auto f) {
25-
cache->Update(key, f.ExtractValue());
26-
});
19+
auto promise = NThreading::NewPromise<TValue>();
20+
Cache_->Get(key).Apply([cache = Cache_,
21+
query = Query_,
22+
key = std::move(key),
23+
promise](auto f) mutable {
24+
typename ICache<TKey, TValue>::TEntry entry;
25+
try {
26+
entry = f.ExtractValue();
27+
} catch (...) {
28+
promise.SetException(std::current_exception());
29+
return;
2730
}
28-
return std::move(entry.Value);
31+
32+
if (!entry.IsExpired) {
33+
Y_ENSURE(entry.Value.Defined());
34+
promise.SetValue(std::move(*entry.Value));
35+
return;
36+
}
37+
38+
bool isEmpty = entry.Value.Empty();
39+
if (!isEmpty) {
40+
promise.SetValue(std::move(*entry.Value));
41+
}
42+
43+
query(key).Apply([cache, key = std::move(key), isEmpty, promise](auto f) mutable {
44+
TValue value;
45+
try {
46+
value = f.ExtractValue();
47+
} catch (...) {
48+
promise.SetException(std::current_exception());
49+
return;
50+
}
51+
52+
if (isEmpty) {
53+
promise.SetValue(value);
54+
}
55+
56+
cache->Update(key, std::move(value));
57+
});
2958
});
59+
return promise;
3060
}
3161

3262
private:

yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,45 @@
88

99
using namespace NSQLComplete;
1010

11+
class TFailableCache: public ICache<TString, TString> {
12+
public:
13+
NThreading::TFuture<TEntry> Get(const TString& key) const try {
14+
if (IsGetFailing) {
15+
ythrow yexception() << "O_O";
16+
}
17+
return NThreading::MakeFuture<TEntry>({.Value = key, .IsExpired = IsExpired});
18+
} catch (...) {
19+
return NThreading::MakeErrorFuture<TEntry>(std::current_exception());
20+
}
21+
22+
NThreading::TFuture<void> Update(const TString& /* key */, TString /* value */) const try {
23+
if (IsUpdateFailing) {
24+
ythrow yexception() << "O_O";
25+
}
26+
return NThreading::MakeFuture();
27+
} catch (...) {
28+
return NThreading::MakeErrorFuture<void>(std::current_exception());
29+
}
30+
31+
bool IsGetFailing = false;
32+
bool IsExpired = false;
33+
bool IsUpdateFailing = false;
34+
};
35+
1136
Y_UNIT_TEST_SUITE(CachedQueryTests) {
1237

38+
Y_UNIT_TEST(OnEmpty_WhenGet_ThenWaitUntilReceived) {
39+
auto cache = MakeLocalCache<TString, TString>(
40+
NMonotonic::CreateDefaultMonotonicTimeProvider(), {.TTL = TDuration::Zero()});
41+
auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) {
42+
return NThreading::MakeFuture<TString>(key);
43+
});
44+
45+
TString value = cached("1").GetValueSync();
46+
47+
UNIT_ASSERT_VALUES_EQUAL(value, "1");
48+
}
49+
1350
Y_UNIT_TEST(OnExpired_WhenApplied_ThenDefferedUpdateAndReturnOld) {
1451
size_t queried = 0;
1552
auto cache = MakeLocalCache<TString, TString>(
@@ -27,4 +64,52 @@ Y_UNIT_TEST_SUITE(CachedQueryTests) {
2764
UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1");
2865
}
2966

67+
Y_UNIT_TEST(OnQueryError_WhenApplied_ThenNoDeadlock) {
68+
size_t queried = 0;
69+
auto cache = MakeLocalCache<TString, TString>(
70+
NMonotonic::CreateDefaultMonotonicTimeProvider(), {.TTL = TDuration::Zero()});
71+
auto cached = TCachedQuery<TString, TString>(cache, [&](const TString&) {
72+
queried += 1;
73+
try {
74+
ythrow yexception() << "T_T";
75+
} catch (...) {
76+
return NThreading::MakeErrorFuture<TString>(std::current_exception());
77+
}
78+
});
79+
80+
UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception);
81+
UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception);
82+
UNIT_ASSERT_VALUES_EQUAL(queried, 2);
83+
}
84+
85+
Y_UNIT_TEST(OnFailingCacheGet_WhenApplied_ThenNoDeadlock) {
86+
size_t queried = 0;
87+
auto cache = MakeIntrusive<TFailableCache>();
88+
auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) {
89+
queried += 1;
90+
return NThreading::MakeFuture<TString>(key);
91+
});
92+
93+
cache->IsGetFailing = true;
94+
95+
UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception);
96+
UNIT_ASSERT_VALUES_EQUAL(queried, 0);
97+
}
98+
99+
Y_UNIT_TEST(OnFailingCacheUpdate_WhenApplied_ThenNoErrorAndNotCached) {
100+
size_t queried = 0;
101+
auto cache = MakeIntrusive<TFailableCache>();
102+
auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) {
103+
queried += 1;
104+
return NThreading::MakeFuture<TString>(key);
105+
});
106+
107+
cache->IsExpired = true;
108+
cache->IsUpdateFailing = true;
109+
110+
UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1");
111+
UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1");
112+
UNIT_ASSERT_VALUES_EQUAL(queried, 2);
113+
}
114+
30115
} // Y_UNIT_TEST_SUITE(CachedQueryTests)

yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(LocalCacheTests) {
8080

8181
auto entry = cache->Get("1").GetValueSync();
8282

83-
UNIT_ASSERT_VALUES_EQUAL(entry.Value, "");
83+
UNIT_ASSERT_VALUES_EQUAL(entry.Value, Nothing());
8484
UNIT_ASSERT_VALUES_EQUAL(entry.IsExpired, true);
8585
}
8686

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#include "name_service.h"
2+
3+
namespace NSQLComplete {
4+
5+
namespace {
6+
7+
class TNameService: public INameService {
8+
public:
9+
explicit TNameService(INameService::TPtr origin)
10+
: Origin_(std::move(origin))
11+
{
12+
}
13+
14+
NThreading::TFuture<TNameResponse> Lookup(TNameRequest request) const override {
15+
auto future = Origin_->Lookup(std::move(request));
16+
if (future.IsReady()) {
17+
return future;
18+
}
19+
return NThreading::MakeFuture<TNameResponse>({});
20+
}
21+
22+
private:
23+
INameService::TPtr Origin_;
24+
};
25+
26+
} // namespace
27+
28+
INameService::TPtr MakeImpatientNameService(INameService::TPtr origin) {
29+
return new TNameService(std::move(origin));
30+
}
31+
32+
} // namespace NSQLComplete
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <yql/essentials/sql/v1/complete/name/service/name_service.h>
4+
5+
namespace NSQLComplete {
6+
7+
INameService::TPtr MakeImpatientNameService(INameService::TPtr origin);
8+
9+
} // namespace NSQLComplete
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
name_service.cpp
5+
)
6+
7+
PEERDIR(
8+
yql/essentials/sql/v1/complete/name/service
9+
)
10+
11+
END()

yql/essentials/sql/v1/complete/name/service/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ END()
1414
RECURSE(
1515
binding
1616
cluster
17+
impatient
1718
ranking
1819
schema
1920
static

yql/essentials/sql/v1/complete/sql_complete_ut.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <yql/essentials/sql/v1/complete/name/service/ranking/frequency.h>
1111
#include <yql/essentials/sql/v1/complete/name/service/ranking/ranking.h>
1212
#include <yql/essentials/sql/v1/complete/name/service/cluster/name_service.h>
13+
#include <yql/essentials/sql/v1/complete/name/service/impatient/name_service.h>
1314
#include <yql/essentials/sql/v1/complete/name/service/schema/name_service.h>
1415
#include <yql/essentials/sql/v1/complete/name/service/static/name_service.h>
1516
#include <yql/essentials/sql/v1/complete/name/service/union/name_service.h>
@@ -131,8 +132,13 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) {
131132

132133
TVector<INameService::TPtr> children = {
133134
MakeStaticNameService(std::move(names), frequency),
134-
MakeSchemaNameService(MakeSimpleSchema(MakeStaticSimpleSchema(clustersJson))),
135-
MakeClusterNameService(MakeStaticClusterDiscovery(std::move(clusters))),
135+
MakeImpatientNameService(
136+
MakeSchemaNameService(
137+
MakeSimpleSchema(
138+
MakeStaticSimpleSchema(clustersJson)))),
139+
MakeImpatientNameService(
140+
MakeClusterNameService(
141+
MakeStaticClusterDiscovery(std::move(clusters)))),
136142
};
137143
INameService::TPtr service = MakeUnionNameService(
138144
std::move(children), MakeDefaultRanking(frequency));
@@ -1411,10 +1417,6 @@ JOIN yt:$cluster_name.test;
14111417
TVector<TCandidate> aliceExpected = {{TableName, "`alice`"}};
14121418
TVector<TCandidate> petyaExpected = {{TableName, "`petya`"}};
14131419

1414-
// Cache is empty
1415-
UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT * FROM "), empty);
1416-
UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT * FROM "), empty);
1417-
14181420
// Updates in backround
14191421
UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT * FROM "), aliceExpected);
14201422
UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT * FROM "), petyaExpected);
@@ -1423,10 +1425,6 @@ JOIN yt:$cluster_name.test;
14231425
TVector<TCandidate> aliceExpected = {{ColumnName, "alice"}};
14241426
TVector<TCandidate> petyaExpected = {{ColumnName, "petya"}};
14251427

1426-
// Cache is empty
1427-
UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT a# FROM alice"), empty);
1428-
UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT p# FROM petya"), empty);
1429-
14301428
// Updates in backround
14311429
UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT a# FROM alice"), aliceExpected);
14321430
UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT p# FROM petya"), petyaExpected);

yql/essentials/sql/v1/complete/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ PEERDIR(
1313
yql/essentials/sql/v1/complete/name/object/simple/cached
1414
yql/essentials/sql/v1/complete/name/object/simple/static
1515
yql/essentials/sql/v1/complete/name/service/cluster
16+
yql/essentials/sql/v1/complete/name/service/impatient
1617
yql/essentials/sql/v1/complete/name/service/schema
1718
yql/essentials/sql/v1/complete/name/service/static
1819
yql/essentials/sql/v1/complete/name/service/union

0 commit comments

Comments
 (0)