Skip to content

Commit df8bd02

Browse files
committed
fixed CDC write to the autopartitioning topic
1 parent 941f9c7 commit df8bd02

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

ydb/core/tx/datashard/change_exchange_split.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -361,16 +361,18 @@ class TCdcWorker
361361
const auto tabletId = partition.GetTabletId();
362362

363363
auto it = Workers.find(partitionId);
364-
if (it != Workers.end()) {
365-
workers.emplace(partitionId, it->second);
366-
Workers.erase(it);
367-
} else {
368-
LOG_T("Register new worker"
369-
<< ": partitionId# " << partitionId);
370-
371-
const auto worker = Register(new TCdcPartitionWorker(SelfId(), partitionId, tabletId, SrcTabletId, DstTabletIds));
372-
workers.emplace(partitionId, worker);
373-
Pending.emplace(worker, partitionId);
364+
if (NKikimrPQ::ETopicPartitionStatus::Active == partition.GetStatus()) {
365+
if (it != Workers.end()) {
366+
workers.emplace(partitionId, it->second);
367+
Workers.erase(it);
368+
} else {
369+
LOG_T("Register new worker"
370+
<< ": partitionId# " << partitionId);
371+
372+
const auto worker = Register(new TCdcPartitionWorker(SelfId(), partitionId, tabletId, SrcTabletId, DstTabletIds));
373+
workers.emplace(partitionId, worker);
374+
Pending.emplace(worker, partitionId);
375+
}
374376
}
375377
}
376378

0 commit comments

Comments
 (0)