Skip to content

Commit e849d3a

Browse files
committed
few fixes
1 parent a22d011 commit e849d3a

File tree

6 files changed

+30
-13
lines changed

6 files changed

+30
-13
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i) {
164164
return NKikimrClient::TKeyValueRequest::EStorageChannel(NKikimrClient::TKeyValueRequest::MAIN + i);
165165
}
166166

167-
168167
void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels) {
169168
for (ui32 i = 0; i < numChannels; ++i) {
170169
request->Record.AddCmdGetStatus()->SetStorageChannel(GetChannel(i));
@@ -2101,8 +2100,8 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
21012100
bool isAffectedConsumer = AffectedUsers.contains(consumer); // savnik check
21022101
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
21032102

2104-
if (operation.HasOnlyCheckCommitedToFinish() && operation.GetOnlyCheckCommitedToFinish()) {
2105-
if (static_cast<ui64>(userInfo.Offset) != EndOffset) { // savnik что если откатят коммит пока выполняется транзакция
2103+
if (operation.HasOnlyCheckCommitedToFinish() && operation.GetOnlyCheckCommitedToFinish() && !IsActive()) {
2104+
if (static_cast<ui64>(userInfo.Offset) != EndOffset) {
21062105
ok = false;
21072106
}
21082107
} else {

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,8 +320,8 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa
320320
auto* operation = body->MutableOperations()->Add();
321321
operation->SetPartitionId(txOp.Partition);
322322
if (txOp.Begin.Defined()) {
323-
operation->SetBegin(*txOp.Begin);
324-
operation->SetEnd(*txOp.End);
323+
operation->SetCommitOffsetsBegin(*txOp.Begin);
324+
operation->SetCommitOffsetsEnd(*txOp.End);
325325
operation->SetConsumer(*txOp.Consumer);
326326
}
327327
operation->SetPath(txOp.Path);

ydb/core/persqueue/ut/user_action_processor_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -679,8 +679,8 @@ void TUserActionProcessorFixture::SendProposeTransactionRequest(const TProposeTr
679679
for (auto& txOp : params.TxOps) {
680680
auto* operation = body->MutableOperations()->Add();
681681
operation->SetPartitionId(txOp.Partition);
682-
operation->SetBegin(txOp.Begin);
683-
operation->SetEnd(txOp.End);
682+
operation->SetCommitOffsetsBegin(txOp.Begin);
683+
operation->SetCommitOffsetsEnd(txOp.End);
684684
operation->SetConsumer(txOp.Consumer);
685685
operation->SetPath(txOp.Path);
686686
}

ydb/core/persqueue/utils.cpp

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,8 @@ std::unordered_map<ui32, TPartitionGraph::Node> BuildGraph(const TCollection& pa
266266
}
267267

268268
std::deque<TPartitionGraph::Node*> queue;
269-
for(const auto& p : partitions) {
269+
270+
for (const auto& p : partitions) {
270271
auto& node = result[GetPartitionId(p)];
271272

272273
node.Children.reserve(p.ChildPartitionIdsSize());
@@ -284,27 +285,42 @@ std::unordered_map<ui32, TPartitionGraph::Node> BuildGraph(const TCollection& pa
284285
}
285286
}
286287

287-
while(!queue.empty()) {
288+
while (!queue.empty()) {
288289
auto* n = queue.front();
289290
queue.pop_front();
290291

291292
bool allCompleted = true;
292-
for(auto* c : n->Parents) {
293+
for (auto* c : n->Parents) {
293294
if (c->HierarhicalParents.empty() && !c->Parents.empty()) {
294295
allCompleted = false;
295296
break;
296297
}
297298
}
298299

299300
if (allCompleted) {
300-
for(auto* c : n->Parents) {
301+
for (auto* c : n->Parents) {
301302
n->HierarhicalParents.insert(c->HierarhicalParents.begin(), c->HierarhicalParents.end());
302303
n->HierarhicalParents.insert(c);
303304
}
304305
queue.insert(queue.end(), n->Children.begin(), n->Children.end());
305306
}
306307
}
307308

309+
for (auto& [_, node] : result) {
310+
queue.push_back(&node);
311+
312+
while (!queue.empty()) {
313+
auto* current = queue.front();
314+
queue.pop_front();
315+
316+
for (auto* child : current->Children) {
317+
if (node.HierarhicalChildren.insert(child).second) {
318+
queue.push_back(child);
319+
}
320+
}
321+
}
322+
}
323+
308324
return result;
309325
}
310326

ydb/core/persqueue/utils.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ class TPartitionGraph {
4848
// Direct children of this node
4949
std::vector<Node*> Children;
5050
// All parents include parents of parents and so on
51-
std::set<Node*> HierarhicalParents;
51+
std::set<Node*> HierarhicalParents; // savnik rename to Ancestors?
52+
// All children include children of children and so on
53+
std::set<Node*> HierarhicalChildren;
5254

5355
bool IsRoot() const;
5456
bool IsParent(ui32 partitionId) const;

ydb/services/persqueue_v1/actors/commit_offset_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAc
115115
commits.push_back(commit);
116116
}
117117

118-
for (auto& child: partitionNode->Children) {
118+
for (auto& child: partitionNode->HierarhicalChildren) {
119119
TKqpHelper::TCommitInfo commit {.PartitionId = child->Id, .Offset = 0};
120120
commits.push_back(commit);
121121
}

0 commit comments

Comments
 (0)