Skip to content

Commit 7eb941d

Browse files
authored
24-3: Adjust change queue reserved capacity at Enqueue() (ydb-platform#9509)
1 parent 644b5f8 commit 7eb941d

File tree

4 files changed

+76
-11
lines changed

4 files changed

+76
-11
lines changed

ydb/core/tx/datashard/cdc_stream_scan.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ class TDataShard::TTxCdcStreamScanProgress
240240
const auto& valueTags = ev.ValueTags;
241241

242242
LOG_D("Progress"
243-
<< ": streamPathId# " << streamPathId);
243+
<< ": streamPathId# " << streamPathId
244+
<< ", rows# " << ev.Rows.size());
244245

245246
if (!Self->GetUserTables().contains(tablePathId.LocalPathId)) {
246247
LOG_W("Cannot progress on unknown table"

ydb/core/tx/datashard/datashard.cpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,19 +1092,33 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10921092
if (!--rIt->second) {
10931093
ChangeQueueReservations.erase(rIt);
10941094
}
1095+
1096+
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
10951097
}
10961098

10971099
UpdateChangeExchangeLag(AppData()->TimeProvider->Now());
10981100
ChangesQueue.erase(it);
10991101

11001102
IncCounter(COUNTER_CHANGE_RECORDS_REMOVED);
11011103
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
1102-
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
11031104

11041105
CheckChangesQueueNoOverflow();
11051106
}
11061107

11071108
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove) {
1109+
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
1110+
Y_ABORT_UNLESS(!afterMove);
1111+
1112+
ChangeQueueReservedCapacity -= it->second;
1113+
it->second = records.size();
1114+
ChangeQueueReservedCapacity += it->second;
1115+
if (!it->second) {
1116+
ChangeQueueReservations.erase(it);
1117+
}
1118+
1119+
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
1120+
}
1121+
11081122
if (!records) {
11091123
return;
11101124
}
@@ -1140,22 +1154,15 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
11401154
ChangesQueueBytes += record.BodySize;
11411155
}
11421156

1143-
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
1144-
Y_ABORT_UNLESS(!afterMove);
1145-
ChangeQueueReservedCapacity -= it->second;
1146-
ChangeQueueReservedCapacity += records.size();
1147-
}
1148-
11491157
UpdateChangeExchangeLag(now);
11501158
IncCounter(COUNTER_CHANGE_RECORDS_ENQUEUED, forward.size());
11511159
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
1152-
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);
11531160

11541161
Y_ABORT_UNLESS(OutChangeSender);
11551162
Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
11561163
}
11571164

1158-
ui32 TDataShard::GetFreeChangeQueueCapacity(ui64 cookie) {
1165+
ui32 TDataShard::GetFreeChangeQueueCapacity(ui64 cookie) const {
11591166
const ui64 sizeLimit = AppData()->DataShardConfig.GetChangesQueueItemsLimit();
11601167
if (sizeLimit < ChangesQueue.size()) {
11611168
return 0;

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1914,7 +1914,7 @@ class TDataShard
19141914
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
19151915
// TODO(ilnaz): remove 'afterMove' after #6541
19161916
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie = 0, bool afterMove = false);
1917-
ui32 GetFreeChangeQueueCapacity(ui64 cookie);
1917+
ui32 GetFreeChangeQueueCapacity(ui64 cookie) const;
19181918
ui64 ReserveChangeQueueCapacity(ui32 capacity);
19191919
void UpdateChangeExchangeLag(TInstant now);
19201920
void CreateChangeSender(const TActorContext& ctx);

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3041,6 +3041,63 @@ Y_UNIT_TEST_SUITE(Cdc) {
30413041
});
30423042
}
30433043

3044+
Y_UNIT_TEST(InitialScanEnqueuesZeroRecords) {
3045+
TPortManager portManager;
3046+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3047+
.SetUseRealThreads(false)
3048+
.SetDomainName("Root")
3049+
.SetEnableChangefeedInitialScan(true)
3050+
.SetChangesQueueItemsLimit(2)
3051+
);
3052+
3053+
auto& runtime = *server->GetRuntime();
3054+
const auto edgeActor = runtime.AllocateEdgeActor();
3055+
3056+
SetupLogging(runtime);
3057+
InitRoot(server, edgeActor);
3058+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
3059+
3060+
ExecSQL(server, edgeActor, R"(
3061+
UPSERT INTO `/Root/Table` (key, value) VALUES
3062+
(1, 10),
3063+
(2, 20),
3064+
(3, 30),
3065+
(4, 40);
3066+
)");
3067+
3068+
TBlockEvents<TEvDataShard::TEvCdcStreamScanRequest> blockScanRequest(runtime, [&](auto& ev) {
3069+
ev->Get()->Record.MutableLimits()->SetBatchMaxRows(1);
3070+
return true;
3071+
});
3072+
3073+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
3074+
WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
3075+
3076+
runtime.WaitFor("Scan request", [&]{ return blockScanRequest.size(); });
3077+
runtime.AddObserver<TEvDataShard::TEvCdcStreamScanRequest>([&](auto& ev) {
3078+
ev->Get()->Record.MutableLimits()->SetBatchMaxRows(1);
3079+
});
3080+
3081+
ExecSQL(server, edgeActor, R"(
3082+
UPSERT INTO `/Root/Table` (key, value) VALUES
3083+
(1, 100),
3084+
(2, 200),
3085+
(3, 300);
3086+
)");
3087+
3088+
blockScanRequest.Unblock().Stop();
3089+
3090+
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
3091+
R"({"update":{"value":10},"key":[1]})",
3092+
R"({"update":{"value":100},"key":[1]})",
3093+
R"({"update":{"value":20},"key":[2]})",
3094+
R"({"update":{"value":200},"key":[2]})",
3095+
R"({"update":{"value":30},"key":[3]})",
3096+
R"({"update":{"value":300},"key":[3]})",
3097+
R"({"update":{"value":40},"key":[4]})",
3098+
});
3099+
}
3100+
30443101
Y_UNIT_TEST(InitialScanRacyProgressAndDrop) {
30453102
TPortManager portManager;
30463103
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

0 commit comments

Comments
 (0)