Skip to content

Commit 7af4fdf

Browse files
merge to stable-24-2 (#6952)
1 parent b665ef7 commit 7af4fdf

18 files changed

+713
-236
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1825,6 +1825,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
18251825
case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED:
18261826
YQL_ENSURE(ReadOnlyTx);
18271827
YQL_ENSURE(!VolatileTx);
1828+
TasksGraph.GetMeta().AllowInconsistentReads = true;
18281829
ImmediateTx = true;
18291830
break;
18301831

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,9 +1103,14 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput&
11031103
transformProto->SetOutputType(input.Transform->OutputType);
11041104
if (input.Meta.StreamLookupSettings) {
11051105
YQL_ENSURE(input.Meta.StreamLookupSettings);
1106-
YQL_ENSURE(snapshot.IsValid(), "stream lookup cannot be performed without the snapshot.");
1107-
input.Meta.StreamLookupSettings->MutableSnapshot()->SetStep(snapshot.Step);
1108-
input.Meta.StreamLookupSettings->MutableSnapshot()->SetTxId(snapshot.TxId);
1106+
if (snapshot.IsValid()) {
1107+
input.Meta.StreamLookupSettings->MutableSnapshot()->SetStep(snapshot.Step);
1108+
input.Meta.StreamLookupSettings->MutableSnapshot()->SetTxId(snapshot.TxId);
1109+
} else {
1110+
YQL_ENSURE(tasksGraph.GetMeta().AllowInconsistentReads, "Expected valid snapshot or enabled inconsistent read mode");
1111+
input.Meta.StreamLookupSettings->SetAllowInconsistentReads(true);
1112+
}
1113+
11091114
if (lockTxId) {
11101115
input.Meta.StreamLookupSettings->SetLockTxId(*lockTxId);
11111116
}

ydb/core/kqp/executer_actor/kqp_tasks_graph.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ struct TGraphMeta {
9292
std::unordered_map<ui64, TActorId> ResultChannelProxies;
9393
TActorId ExecuterId;
9494
bool UseFollowers = false;
95+
bool AllowInconsistentReads = false;
9596
TIntrusivePtr<TProtoArenaHolder> Arena;
9697
TString Database;
9798
NKikimrConfig::TTableServiceConfig::EChannelTransportVersion ChannelTransportVersion;

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h>
1414
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
1515
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
16+
#include <ydb/core/kqp/runtime/kqp_read_iterator_common.h>
1617
#include <ydb/core/kqp/common/kqp_resolve.h>
1718

1819
#include <ydb/library/wilson_ids/wilson.h>

ydb/core/kqp/runtime/kqp_read_actor.cpp

Lines changed: 7 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "kqp_read_actor.h"
22

3+
#include <ydb/core/kqp/runtime/kqp_read_iterator_common.h>
34
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
45
#include <ydb/core/base/tablet_pipecache.h>
56
#include <ydb/core/engine/minikql/minikql_engine_host.h>
@@ -28,73 +29,9 @@ bool IsDebugLogEnabled(const NActors::TActorSystem* actorSystem, NActors::NLog::
2829
return settings && settings->Satisfies(NActors::NLog::EPriority::PRI_DEBUG, component);
2930
}
3031

31-
struct TEvReadSettings : public TAtomicRefCount<TEvReadSettings> {
32-
NKikimrTxDataShard::TEvRead Read;
33-
NKikimrTxDataShard::TEvReadAck Ack;
34-
35-
TEvReadSettings() {
36-
Read.SetMaxRows(32767);
37-
Read.SetMaxBytes(5_MB);
38-
39-
Ack.SetMaxRows(32767);
40-
Ack.SetMaxBytes(5_MB);
41-
}
42-
};
43-
44-
struct TEvReadDefaultSettings {
45-
THotSwap<TEvReadSettings> Settings;
46-
47-
TEvReadDefaultSettings() {
48-
Settings.AtomicStore(MakeIntrusive<TEvReadSettings>());
49-
}
50-
51-
} DefaultSettings;
52-
53-
THolder<NKikimr::TEvDataShard::TEvRead> DefaultReadSettings() {
54-
auto result = MakeHolder<NKikimr::TEvDataShard::TEvRead>();
55-
auto ptr = DefaultSettings.Settings.AtomicLoad();
56-
result->Record.MergeFrom(ptr->Read);
57-
return result;
58-
}
59-
60-
THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() {
61-
auto result = MakeHolder<NKikimr::TEvDataShard::TEvReadAck>();
62-
auto ptr = DefaultSettings.Settings.AtomicLoad();
63-
result->Record.MergeFrom(ptr->Ack);
64-
return result;
65-
}
66-
6732
NActors::TActorId MainPipeCacheId = NKikimr::MakePipePeNodeCacheID(false);
6833
NActors::TActorId FollowersPipeCacheId = NKikimr::MakePipePeNodeCacheID(true);
6934

70-
struct TBackoffStorage {
71-
THotSwap<NKikimr::NKqp::TIteratorReadBackoffSettings> SettingsPtr;
72-
73-
TBackoffStorage() {
74-
SettingsPtr.AtomicStore(new NKikimr::NKqp::TIteratorReadBackoffSettings());
75-
}
76-
};
77-
78-
TDuration CalcDelay(size_t attempt, bool allowInstantRetry) {
79-
return Singleton<::TBackoffStorage>()->SettingsPtr.AtomicLoad()->CalcShardDelay(attempt, allowInstantRetry);
80-
}
81-
82-
size_t MaxShardResolves() {
83-
return Singleton<::TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxShardResolves;
84-
}
85-
86-
size_t MaxShardRetries() {
87-
return Singleton<::TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxShardAttempts;
88-
}
89-
90-
TMaybe<size_t> MaxTotalRetries() {
91-
return Singleton<::TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxTotalRetries;
92-
}
93-
94-
TMaybe<TDuration> ShardTimeout() {
95-
return Singleton<::TBackoffStorage>()->SettingsPtr.AtomicLoad()->ReadResponseTimeout;
96-
}
97-
9835
}
9936

10037

@@ -540,7 +477,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
540477
}
541478

542479
void ResolveShard(TShardState* state) {
543-
if (state->ResolveAttempt >= ::MaxShardResolves()) {
480+
if (state->ResolveAttempt >= MaxShardResolves()) {
544481
RuntimeError(TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' resolve limit exceeded",
545482
NDqProto::StatusIds::UNAVAILABLE);
546483
return;
@@ -792,19 +729,19 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
792729
auto state = Reads[id].Shard;
793730

794731
TotalRetries += 1;
795-
auto limit = ::MaxTotalRetries();
732+
auto limit = MaxTotalRetries();
796733
if (limit && TotalRetries > *limit) {
797734
return RuntimeError(TStringBuilder() << "Table '" << Settings->GetTable().GetTablePath() << "' retry limit exceeded",
798735
NDqProto::StatusIds::UNAVAILABLE);
799736
}
800737

801738
state->RetryAttempt += 1;
802-
if (state->RetryAttempt > ::MaxShardRetries()) {
739+
if (state->RetryAttempt > MaxShardRetries()) {
803740
ResetRead(id);
804741
return ResolveShard(state);
805742
}
806743

807-
auto delay = ::CalcDelay(state->RetryAttempt, allowInstantRetry);
744+
auto delay = CalcDelay(state->RetryAttempt, allowInstantRetry);
808745
if (delay == TDuration::Zero()) {
809746
return DoRetryRead(id);
810747
}
@@ -848,7 +785,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
848785
}
849786
}
850787

851-
auto ev = ::DefaultReadSettings();
788+
auto ev = GetDefaultReadSettings();
852789
auto& record = ev->Record;
853790

854791
state->FillEvRead(*ev, KeyColumnTypes, Settings->GetReverse());
@@ -1307,7 +1244,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
13071244
}
13081245

13091246
if (!limit || *limit > 0) {
1310-
auto request = ::DefaultAckSettings();
1247+
auto request = GetDefaultReadAckSettings();
13111248
request->Record.SetReadId(record.GetReadId());
13121249
request->Record.SetSeqNo(record.GetSeqNo());
13131250
request->Record.SetMaxBytes(Min<ui64>(request->Record.GetMaxBytes(), BufSize));
@@ -1538,40 +1475,9 @@ void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<T
15381475
});
15391476
}
15401477

1541-
void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead& read) {
1542-
auto ptr = ::DefaultSettings.Settings.AtomicLoad();
1543-
TEvReadSettings settings = *ptr;
1544-
settings.Read.MergeFrom(read);
1545-
::DefaultSettings.Settings.AtomicStore(MakeIntrusive<TEvReadSettings>(settings));
1546-
}
1547-
1548-
void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck& ack) {
1549-
auto ptr = ::DefaultSettings.Settings.AtomicLoad();
1550-
TEvReadSettings settings = *ptr;
1551-
settings.Ack.MergeFrom(ack);
1552-
::DefaultSettings.Settings.AtomicStore(MakeIntrusive<TEvReadSettings>(settings));
1553-
}
1554-
1555-
void SetDefaultIteratorQuotaSettings(ui32 rows, ui32 bytes) {
1556-
auto ptr = ::DefaultSettings.Settings.AtomicLoad();
1557-
TEvReadSettings settings = *ptr;
1558-
1559-
settings.Read.SetMaxRows(rows);
1560-
settings.Ack.SetMaxRows(rows);
1561-
1562-
settings.Read.SetMaxBytes(bytes);
1563-
settings.Ack.SetMaxBytes(bytes);
1564-
1565-
::DefaultSettings.Settings.AtomicStore(MakeIntrusive<TEvReadSettings>(settings));
1566-
}
1567-
15681478
void InterceptReadActorPipeCache(NActors::TActorId id) {
15691479
::MainPipeCacheId = id;
15701480
}
15711481

1572-
void SetReadIteratorBackoffSettings(TIntrusivePtr<TIteratorReadBackoffSettings> ptr) {
1573-
Singleton<::TBackoffStorage>()->SettingsPtr.AtomicStore(ptr);
1574-
}
1575-
15761482
} // namespace NKqp
15771483
} // namespace NKikimr

ydb/core/kqp/runtime/kqp_read_actor.h

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,42 +12,7 @@ class TEvReadAck;
1212
namespace NKikimr {
1313
namespace NKqp {
1414

15-
struct TIteratorReadBackoffSettings : TAtomicRefCount<TIteratorReadBackoffSettings> {
16-
TDuration StartRetryDelay = TDuration::MilliSeconds(5);
17-
size_t MaxShardAttempts = 10;
18-
size_t MaxShardResolves = 3;
19-
double UnsertaintyRatio = 0.5;
20-
double Multiplier = 2.0;
21-
TDuration MaxRetryDelay = TDuration::Seconds(1);
22-
23-
TMaybe<size_t> MaxTotalRetries;
24-
TMaybe<TDuration> ReadResponseTimeout;
25-
26-
TDuration CalcShardDelay(size_t attempt, bool allowInstantRetry) {
27-
if (allowInstantRetry && attempt == 1) {
28-
return TDuration::Zero();
29-
}
30-
31-
auto delay = StartRetryDelay;
32-
for (size_t i = 0; i < attempt; ++i) {
33-
delay *= Multiplier;
34-
delay = Min(delay, MaxRetryDelay);
35-
}
36-
37-
delay *= (1 - UnsertaintyRatio * RandomNumber<double>());
38-
39-
return delay;
40-
}
41-
};
42-
43-
void SetReadIteratorBackoffSettings(TIntrusivePtr<TIteratorReadBackoffSettings>);
44-
void SetDefaultIteratorQuotaSettings(ui32 rows, ui32 bytes);
45-
4615
void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory&, TIntrusivePtr<TKqpCounters>);
47-
48-
void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead&);
49-
void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck&);
50-
5116
void InterceptReadActorPipeCache(NActors::TActorId);
5217

5318
} // namespace NKqp
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#include "kqp_read_iterator_common.h"
2+
3+
#include <library/cpp/threading/hot_swap/hot_swap.h>
4+
5+
namespace NKikimr {
6+
namespace NKqp {
7+
8+
struct TBackoffStorage {
9+
THotSwap<NKikimr::NKqp::TIteratorReadBackoffSettings> SettingsPtr;
10+
11+
TBackoffStorage() {
12+
SettingsPtr.AtomicStore(new NKikimr::NKqp::TIteratorReadBackoffSettings());
13+
}
14+
};
15+
16+
struct TEvReadDefaultSettings {
17+
THotSwap<TEvReadSettings> Settings;
18+
19+
TEvReadDefaultSettings() {
20+
Settings.AtomicStore(MakeIntrusive<TEvReadSettings>());
21+
}
22+
23+
};
24+
25+
void SetDefaultIteratorQuotaSettings(ui32 rows, ui32 bytes) {
26+
TEvReadSettings settings;
27+
28+
settings.Read.SetMaxRows(rows);
29+
settings.Ack.SetMaxRows(rows);
30+
31+
settings.Read.SetMaxBytes(bytes);
32+
settings.Ack.SetMaxBytes(bytes);
33+
34+
SetDefaultReadSettings(settings.Read);
35+
SetDefaultReadAckSettings(settings.Ack);
36+
}
37+
38+
THolder<NKikimr::TEvDataShard::TEvRead> GetDefaultReadSettings() {
39+
auto result = MakeHolder<NKikimr::TEvDataShard::TEvRead>();
40+
auto ptr = Singleton<TEvReadDefaultSettings>()->Settings.AtomicLoad();
41+
result->Record.MergeFrom(ptr->Read);
42+
return result;
43+
}
44+
45+
void SetDefaultReadSettings(const NKikimrTxDataShard::TEvRead& read) {
46+
auto ptr = Singleton<TEvReadDefaultSettings>()->Settings.AtomicLoad();
47+
TEvReadSettings settings = *ptr;
48+
settings.Read.MergeFrom(read);
49+
Singleton<TEvReadDefaultSettings>()->Settings.AtomicStore(MakeIntrusive<TEvReadSettings>(settings));
50+
}
51+
52+
THolder<NKikimr::TEvDataShard::TEvReadAck> GetDefaultReadAckSettings() {
53+
auto result = MakeHolder<NKikimr::TEvDataShard::TEvReadAck>();
54+
auto ptr = Singleton<TEvReadDefaultSettings>()->Settings.AtomicLoad();
55+
result->Record.MergeFrom(ptr->Ack);
56+
return result;
57+
}
58+
59+
void SetDefaultReadAckSettings(const NKikimrTxDataShard::TEvReadAck& ack) {
60+
auto ptr = Singleton<TEvReadDefaultSettings>()->Settings.AtomicLoad();
61+
TEvReadSettings settings = *ptr;
62+
settings.Ack.MergeFrom(ack);
63+
Singleton<TEvReadDefaultSettings>()->Settings.AtomicStore(MakeIntrusive<TEvReadSettings>(settings));
64+
}
65+
66+
TDuration TIteratorReadBackoffSettings::CalcShardDelay(size_t attempt, bool allowInstantRetry) {
67+
if (allowInstantRetry && attempt == 1) {
68+
return TDuration::Zero();
69+
}
70+
71+
auto delay = StartRetryDelay;
72+
for (size_t i = 0; i < attempt; ++i) {
73+
delay *= Multiplier;
74+
delay = Min(delay, MaxRetryDelay);
75+
}
76+
77+
delay *= (1 - UnsertaintyRatio * RandomNumber<double>());
78+
79+
return delay;
80+
}
81+
82+
void SetReadIteratorBackoffSettings(TIntrusivePtr<TIteratorReadBackoffSettings> ptr) {
83+
Singleton<TBackoffStorage>()->SettingsPtr.AtomicStore(ptr);
84+
}
85+
86+
TDuration CalcDelay(size_t attempt, bool allowInstantRetry) {
87+
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->CalcShardDelay(attempt, allowInstantRetry);
88+
}
89+
90+
size_t MaxShardResolves() {
91+
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxShardResolves;
92+
}
93+
94+
size_t MaxShardRetries() {
95+
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxShardAttempts;
96+
}
97+
98+
TMaybe<size_t> MaxTotalRetries() {
99+
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxTotalRetries;
100+
}
101+
102+
TMaybe<TDuration> ShardTimeout() {
103+
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->ReadResponseTimeout;
104+
}
105+
106+
} // namespace NKqp
107+
} // namespace NKikimr

0 commit comments

Comments
 (0)