Skip to content

Commit 3c4fc2c

Browse files
committed
Refactor sink: shards state updating (#19071)
1 parent 39d78c1 commit 3c4fc2c

File tree

4 files changed

+70
-32
lines changed

4 files changed

+70
-32
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
312312

313313
bool NeedCommit() const override {
314314
AFL_ENSURE(ActionsCount != 1 || IsSingleShard()); // ActionsCount == 1 then IsSingleShard()
315-
const bool dontNeedCommit = IsEmpty() || IsReadOnly() && ((ActionsCount == 1) || HasSnapshot());
315+
const bool dontNeedCommit = IsEmpty() || (IsReadOnly() && ((ActionsCount == 1) || HasSnapshot()));
316316
return !dontNeedCommit;
317317
}
318318

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -354,20 +354,13 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
354354
ShardedWriteController->Close();
355355
}
356356

357-
void SetParentTraceId(NWilson::TTraceId traceId) {
358-
ParentTraceId = std::move(traceId);
357+
void CleanupClosedTokens() {
358+
YQL_ENSURE(ShardedWriteController);
359+
ShardedWriteController->CleanupClosedTokens();
359360
}
360361

361-
void UpdateShards() {
362-
// TODO: Maybe there are better ways to initialize new shards...
363-
for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) {
364-
TxManager->AddShard(shardInfo.ShardId, IsOlap, TablePath);
365-
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
366-
if (shardInfo.HasRead) {
367-
flags |= IKqpTransactionManager::EAction::READ;
368-
}
369-
TxManager->AddAction(shardInfo.ShardId, flags);
370-
}
362+
void SetParentTraceId(NWilson::TTraceId traceId) {
363+
ParentTraceId = std::move(traceId);
371364
}
372365

373366
bool IsClosed() const {
@@ -878,18 +871,30 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
878871
}
879872
}
880873

874+
void UpdateShards() {
875+
for (const auto& shardInfo : ShardedWriteController->ExtractShardUpdates()) {
876+
TxManager->AddShard(shardInfo.ShardId, IsOlap, TablePath);
877+
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
878+
if (shardInfo.HasRead) {
879+
flags |= IKqpTransactionManager::EAction::READ;
880+
}
881+
TxManager->AddAction(shardInfo.ShardId, flags);
882+
}
883+
}
884+
881885
void FlushBuffers() {
882886
ShardedWriteController->FlushBuffers();
883887
UpdateShards();
884888
}
885889

886-
bool Flush() {
887-
for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) {
888-
if (!SendDataToShard(shardInfo.ShardId)) {
889-
return false;
890+
bool FlushToShards() {
891+
bool ok = true;
892+
ShardedWriteController->ForEachPendingShard([&](const auto& shardInfo) {
893+
if (ok && !SendDataToShard(shardInfo.ShardId)) {
894+
ok = false;
890895
}
891-
}
892-
return true;
896+
});
897+
return ok;
893898
}
894899

895900
bool SendDataToShard(const ui64 shardId) {
@@ -1516,7 +1521,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
15161521
}
15171522

15181523
if (Closed || outOfMemory) {
1519-
if (!WriteTableActor->Flush()) {
1524+
if (!WriteTableActor->FlushToShards()) {
15201525
return;
15211526
}
15221527
}
@@ -1933,7 +1938,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19331938
CA_LOG_D("Flush data");
19341939
for (auto& [_, info] : WriteInfos) {
19351940
if (info.WriteTableActor->IsReady()) {
1936-
if (!info.WriteTableActor->Flush()) {
1941+
if (!info.WriteTableActor->FlushToShards()) {
19371942
return false;
19381943
}
19391944
}
@@ -2807,6 +2812,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
28072812
Y_ABORT_UNLESS(GetTotalMemory() == 0);
28082813

28092814
for (auto& [_, info] : WriteInfos) {
2815+
info.WriteTableActor->CleanupClosedTokens();
28102816
info.WriteTableActor->Unlink();
28112817
}
28122818
}

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,17 +1187,15 @@ class TShardsInfo {
11871187
return insertIt->second;
11881188
}
11891189

1190-
TVector<IShardedWriteController::TPendingShardInfo> GetPendingShards() const {
1191-
TVector<IShardedWriteController::TPendingShardInfo> result;
1190+
void ForEachPendingShard(std::function<void(const IShardedWriteController::TPendingShardInfo&)>&& callback) const {
11921191
for (const auto& [id, shard] : ShardsInfo) {
11931192
if (!shard.IsEmpty() && shard.GetSendAttempts() == 0) {
1194-
result.push_back(IShardedWriteController::TPendingShardInfo{
1193+
callback(IShardedWriteController::TPendingShardInfo{
11951194
.ShardId = id,
11961195
.HasRead = shard.HasRead(),
11971196
});
11981197
}
11991198
}
1200-
return result;
12011199
}
12021200

12031201
bool Has(ui64 shardId) const {
@@ -1382,6 +1380,18 @@ class TShardedWriteController : public IShardedWriteController {
13821380
}
13831381
}
13841382

1383+
void CleanupClosedTokens() override {
1384+
AFL_ENSURE(IsEmpty());
1385+
for (auto it = WriteInfos.begin(); it != WriteInfos.end();) {
1386+
if (it->second.Closed) {
1387+
AFL_ENSURE(it->second.Serializer->IsFinished());
1388+
it = WriteInfos.erase(it);
1389+
} else {
1390+
++it;
1391+
}
1392+
}
1393+
}
1394+
13851395
void FlushBuffers() override {
13861396
TVector<TWriteToken> writeTokensFoFlush;
13871397
for (const auto& [token, writeInfo] : WriteInfos) {
@@ -1420,8 +1430,14 @@ class TShardedWriteController : public IShardedWriteController {
14201430
}
14211431
}
14221432

1423-
TVector<TPendingShardInfo> GetPendingShards() const override {
1424-
return ShardsInfo.GetPendingShards();
1433+
void ForEachPendingShard(std::function<void(const TPendingShardInfo&)>&& callback) const override {
1434+
ShardsInfo.ForEachPendingShard(std::move(callback));
1435+
}
1436+
1437+
std::vector<TPendingShardInfo> ExtractShardUpdates() override {
1438+
std::vector<TPendingShardInfo> shardUpdates;
1439+
std::swap(shardUpdates, ShardUpdates);
1440+
return shardUpdates;
14251441
}
14261442

14271443
TVector<ui64> GetShardsIds() const override {
@@ -1580,11 +1596,16 @@ class TShardedWriteController : public IShardedWriteController {
15801596
for (auto& [shardId, batches] : writeInfo.Serializer->FlushBatchesForce()) {
15811597
for (auto& batch : batches) {
15821598
if (batch && !batch->IsEmpty()) {
1599+
const bool hasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT
1600+
|| writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE);
15831601
ShardsInfo.GetShard(shardId).PushBatch(TBatchWithMetadata{
15841602
.Token = token,
15851603
.Data = std::move(batch),
1586-
.HasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT
1587-
|| writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE),
1604+
.HasRead = hasRead,
1605+
});
1606+
ShardUpdates.push_back(IShardedWriteController::TPendingShardInfo{
1607+
.ShardId = shardId,
1608+
.HasRead = hasRead,
15881609
});
15891610
}
15901611
}
@@ -1597,11 +1618,16 @@ class TShardedWriteController : public IShardedWriteController {
15971618
if (!batch || batch->IsEmpty()) {
15981619
break;
15991620
}
1621+
const bool hasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT
1622+
|| writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE);
16001623
shard.PushBatch(TBatchWithMetadata{
16011624
.Token = token,
16021625
.Data = std::move(batch),
1603-
.HasRead = (writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT
1604-
|| writeInfo.Metadata.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE),
1626+
.HasRead = hasRead,
1627+
});
1628+
ShardUpdates.push_back(IShardedWriteController::TPendingShardInfo{
1629+
.ShardId = shardId,
1630+
.HasRead = hasRead,
16051631
});
16061632
}
16071633
}
@@ -1615,6 +1641,7 @@ class TShardedWriteController : public IShardedWriteController {
16151641
shard.MakeNextBatches(1);
16161642
} else {
16171643
shard.MakeNextBatches(std::nullopt);
1644+
AFL_ENSURE(shard.GetBatchesInFlight() == shard.Size());
16181645
}
16191646
}
16201647
}
@@ -1646,6 +1673,7 @@ class TShardedWriteController : public IShardedWriteController {
16461673
TWriteToken CurrentWriteToken = 0;
16471674

16481675
TShardsInfo ShardsInfo;
1676+
std::vector<IShardedWriteController::TPendingShardInfo> ShardUpdates;
16491677

16501678
std::optional<NSchemeCache::TSchemeCacheNavigate::TEntry> SchemeEntry;
16511679
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;

ydb/core/kqp/runtime/kqp_write_table.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ class IShardedWriteController : public TThrRefBase {
6969
virtual void Write(TWriteToken token, IDataBatchPtr&& data) = 0;
7070
virtual void Close(TWriteToken token) = 0;
7171

72+
virtual void CleanupClosedTokens() = 0;
73+
7274
virtual void FlushBuffers() = 0;
7375

7476
virtual void Close() = 0;
@@ -79,7 +81,9 @@ class IShardedWriteController : public TThrRefBase {
7981
ui64 ShardId;
8082
bool HasRead;
8183
};
82-
virtual TVector<TPendingShardInfo> GetPendingShards() const = 0;
84+
virtual void ForEachPendingShard(std::function<void(const TPendingShardInfo&)>&& callback) const = 0;
85+
virtual std::vector<TPendingShardInfo> ExtractShardUpdates() = 0;
86+
8387
virtual ui64 GetShardsCount() const = 0;
8488
virtual TVector<ui64> GetShardsIds() const = 0;
8589

0 commit comments

Comments
 (0)