Skip to content

Commit 928a984

Browse files
authored
Adjust change queue reserved capacity at Enqueue() (#9507)
1 parent 673500c commit 928a984

File tree

4 files changed

+76
-11
lines changed

4 files changed

+76
-11
lines changed

ydb/core/tx/datashard/cdc_stream_scan.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ class TDataShard::TTxCdcStreamScanProgress
231231
const auto& valueTags = ev.ValueTags;
232232

233233
LOG_D("Progress"
234-
<< ": streamPathId# " << streamPathId);
234+
<< ": streamPathId# " << streamPathId
235+
<< ", rows# " << ev.Rows.size());
235236

236237
if (!Self->GetUserTables().contains(tablePathId.LocalPathId)) {
237238
LOG_W("Cannot progress on unknown table"

ydb/core/tx/datashard/datashard.cpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,19 +1092,33 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10921092
if (!--rIt->second) {
10931093
ChangeQueueReservations.erase(rIt);
10941094
}
1095+
1096+
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
10951097
}
10961098

10971099
UpdateChangeExchangeLag(AppData()->TimeProvider->Now());
10981100
ChangesQueue.erase(it);
10991101

11001102
IncCounter(COUNTER_CHANGE_RECORDS_REMOVED);
11011103
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
1102-
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
11031104

11041105
CheckChangesQueueNoOverflow();
11051106
}
11061107

11071108
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove) {
1109+
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
1110+
Y_ABORT_UNLESS(!afterMove);
1111+
1112+
ChangeQueueReservedCapacity -= it->second;
1113+
it->second = records.size();
1114+
ChangeQueueReservedCapacity += it->second;
1115+
if (!it->second) {
1116+
ChangeQueueReservations.erase(it);
1117+
}
1118+
1119+
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
1120+
}
1121+
11081122
if (!records) {
11091123
return;
11101124
}
@@ -1140,22 +1154,15 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
11401154
ChangesQueueBytes += record.BodySize;
11411155
}
11421156

1143-
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
1144-
Y_ABORT_UNLESS(!afterMove);
1145-
ChangeQueueReservedCapacity -= it->second;
1146-
ChangeQueueReservedCapacity += records.size();
1147-
}
1148-
11491157
UpdateChangeExchangeLag(now);
11501158
IncCounter(COUNTER_CHANGE_RECORDS_ENQUEUED, forward.size());
11511159
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
1152-
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
11531160

11541161
Y_ABORT_UNLESS(OutChangeSender);
11551162
Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
11561163
}
11571164

1158-
ui32 TDataShard::GetFreeChangeQueueCapacity(ui64 cookie) {
1165+
ui32 TDataShard::GetFreeChangeQueueCapacity(ui64 cookie) const {
11591166
const ui64 sizeLimit = AppData()->DataShardConfig.GetChangesQueueItemsLimit();
11601167
if (sizeLimit < ChangesQueue.size()) {
11611168
return 0;

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,7 @@ class TDataShard
19281928
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
19291929
// TODO(ilnaz): remove 'afterMove' after #6541
19301930
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie = 0, bool afterMove = false);
1931-
ui32 GetFreeChangeQueueCapacity(ui64 cookie);
1931+
ui32 GetFreeChangeQueueCapacity(ui64 cookie) const;
19321932
ui64 ReserveChangeQueueCapacity(ui32 capacity);
19331933
void UpdateChangeExchangeLag(TInstant now);
19341934
void CreateChangeSender(const TActorContext& ctx);

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3144,6 +3144,63 @@ Y_UNIT_TEST_SUITE(Cdc) {
31443144
});
31453145
}
31463146

3147+
Y_UNIT_TEST(InitialScanEnqueuesZeroRecords) {
3148+
TPortManager portManager;
3149+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3150+
.SetUseRealThreads(false)
3151+
.SetDomainName("Root")
3152+
.SetEnableChangefeedInitialScan(true)
3153+
.SetChangesQueueItemsLimit(2)
3154+
);
3155+
3156+
auto& runtime = *server->GetRuntime();
3157+
const auto edgeActor = runtime.AllocateEdgeActor();
3158+
3159+
SetupLogging(runtime);
3160+
InitRoot(server, edgeActor);
3161+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
3162+
3163+
ExecSQL(server, edgeActor, R"(
3164+
UPSERT INTO `/Root/Table` (key, value) VALUES
3165+
(1, 10),
3166+
(2, 20),
3167+
(3, 30),
3168+
(4, 40);
3169+
)");
3170+
3171+
TBlockEvents<TEvDataShard::TEvCdcStreamScanRequest> blockScanRequest(runtime, [&](auto& ev) {
3172+
ev->Get()->Record.MutableLimits()->SetBatchMaxRows(1);
3173+
return true;
3174+
});
3175+
3176+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3177+
WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
3178+
3179+
runtime.WaitFor("Scan request", [&]{ return blockScanRequest.size(); });
3180+
runtime.AddObserver<TEvDataShard::TEvCdcStreamScanRequest>([&](auto& ev) {
3181+
ev->Get()->Record.MutableLimits()->SetBatchMaxRows(1);
3182+
});
3183+
3184+
ExecSQL(server, edgeActor, R"(
3185+
UPSERT INTO `/Root/Table` (key, value) VALUES
3186+
(1, 100),
3187+
(2, 200),
3188+
(3, 300);
3189+
)");
3190+
3191+
blockScanRequest.Unblock().Stop();
3192+
3193+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3194+
R"({"update":{"value":10},"key":[1]})",
3195+
R"({"update":{"value":100},"key":[1]})",
3196+
R"({"update":{"value":20},"key":[2]})",
3197+
R"({"update":{"value":200},"key":[2]})",
3198+
R"({"update":{"value":30},"key":[3]})",
3199+
R"({"update":{"value":300},"key":[3]})",
3200+
R"({"update":{"value":40},"key":[4]})",
3201+
});
3202+
}
3203+
31473204
Y_UNIT_TEST(InitialScanRacyProgressAndDrop) {
31483205
TPortManager portManager;
31493206
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

0 commit comments

Comments
 (0)