Skip to content

Commit 18f911f

Browse files
committed
work
1 parent 9345ee9 commit 18f911f

26 files changed

+227
-125
lines changed

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
451451
} else {
452452
for (auto& range : partition.partition_offsets()) {
453453
YQL_ENSURE(consumer.Defined());
454-
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish());
454+
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish(), partition.read_session_id());
455455
}
456456
}
457457
}

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,24 +56,34 @@ bool TConsumerOperations::GetOnlyCheckCommitedToFinish() const
5656
return OnlyCheckCommitedToFinish_;
5757
}
5858

59+
TString TConsumerOperations::GetReadSessionId() const
60+
{
61+
return ReadSessionId_;
62+
}
63+
5964
void TConsumerOperations::AddOperation(const TString& consumer,
6065
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
6166
bool forceCommit,
6267
bool killReadSession,
63-
bool onlyCheckCommitedToFinish)
68+
bool onlyCheckCommitedToFinish,
69+
const TString& readSessionId)
6470
{
6571
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == consumer);
6672

67-
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish);
73+
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
6874
}
6975

7076
void TConsumerOperations::Merge(const TConsumerOperations& rhs)
7177
{
7278
Y_ABORT_UNLESS(rhs.Consumer_.Defined());
7379
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == rhs.Consumer_);
7480

75-
for (auto& range : rhs.Offsets_) {
76-
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish());
81+
if (!rhs.Offsets_.Empty()) {
82+
for (auto& range : rhs.Offsets_) {
83+
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
84+
}
85+
} else {
86+
AddOperationImpl(*rhs.Consumer_, 0, 0, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
7787
}
7888
}
7989

@@ -82,7 +92,8 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
8292
ui64 end,
8393
bool forceCommit,
8494
bool killReadSession,
85-
bool onlyCheckCommitedToFinish)
95+
bool onlyCheckCommitedToFinish,
96+
const TString& readSessionId)
8697
{
8798
if (Offsets_.Intersects(begin, end)) {
8899
ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
@@ -99,6 +110,7 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
99110
ForceCommit_ = forceCommit;
100111
KillReadSession_ = killReadSession;
101112
OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
113+
ReadSessionId_ = readSessionId;
102114
}
103115

104116
//
@@ -116,7 +128,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic,
116128
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
117129
bool forceCommit,
118130
bool killReadSession,
119-
bool onlyCheckCommitedToFinish)
131+
bool onlyCheckCommitedToFinish,
132+
const TString& readSessionId)
120133
{
121134
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
122135
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
@@ -126,7 +139,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic,
126139
Partition_ = partition;
127140
}
128141

129-
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish);
142+
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
130143
}
131144

132145
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
@@ -163,6 +176,7 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
163176
o->SetKillReadSession(operations.GetKillReadSession());
164177
o->SetForceCommit(operations.GetForceCommit());
165178
o->SetOnlyCheckCommitedToFinish(operations.GetOnlyCheckCommitedToFinish());
179+
o->SetReadSessionId(operations.GetReadSessionId());
166180
}
167181

168182
if (HasWriteOperations_) {
@@ -298,7 +312,8 @@ void TTopicOperations::AddOperation(const TString& topic,
298312
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
299313
bool forceCommit,
300314
bool killReadSession,
301-
bool onlyCheckCommitedToFinish
315+
bool onlyCheckCommitedToFinish,
316+
const TString& readSessionId
302317
)
303318
{
304319
TTopicPartition key{topic, partition};
@@ -308,7 +323,8 @@ void TTopicOperations::AddOperation(const TString& topic,
308323
range,
309324
forceCommit,
310325
killReadSession,
311-
onlyCheckCommitedToFinish);
326+
onlyCheckCommitedToFinish,
327+
readSessionId);
312328
HasReadOperations_ = true;
313329
}
314330

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ class TConsumerOperations {
3535
bool GetForceCommit() const;
3636
bool GetKillReadSession() const;
3737
bool GetOnlyCheckCommitedToFinish() const;
38+
TString GetReadSessionId() const;
3839

3940
void AddOperation(const TString& consumer,
4041
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
4142
bool forceCommit = false,
4243
bool killReadSession = false,
43-
bool onlyCheckCommitedToFinish = false);
44+
bool onlyCheckCommitedToFinish = false,
45+
const TString& readSessionId = {});
4446

4547
void Merge(const TConsumerOperations& rhs);
4648

@@ -50,13 +52,15 @@ class TConsumerOperations {
5052
ui64 end,
5153
bool forceCommit = false,
5254
bool killReadSession = false,
53-
bool onlyCheckCommitedToFinish = false);
55+
bool onlyCheckCommitedToFinish = false,
56+
const TString& readSessionId = {});
5457

5558
TMaybe<TString> Consumer_;
5659
TDisjointIntervalTree<ui64> Offsets_;
5760
bool ForceCommit_ = false;
5861
bool KillReadSession_ = false;
5962
bool OnlyCheckCommitedToFinish_ = false;
63+
TString ReadSessionId_;
6064
};
6165

6266
struct TTopicOperationTransaction {
@@ -76,7 +80,8 @@ class TTopicPartitionOperations {
7680
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
7781
bool forceCommit = false,
7882
bool killReadSession = false,
79-
bool onlyCheckCommitedToFinish = false);
83+
bool onlyCheckCommitedToFinish = false,
84+
const TString& readSessionId = {});
8085
void AddOperation(const TString& topic, ui32 partition,
8186
TMaybe<ui32> supportivePartition);
8287

@@ -130,7 +135,8 @@ class TTopicOperations {
130135
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
131136
bool forceCommit,
132137
bool killReadSession,
133-
bool onlyCheckCommitedToFinish);
138+
bool onlyCheckCommitedToFinish,
139+
const TString& readSessionId);
134140
void AddOperation(const TString& topic, ui32 partition,
135141
TMaybe<ui32> supportivePartition);
136142

ydb/core/persqueue/events/internal.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -810,14 +810,15 @@ struct TEvPQ {
810810
{
811811
}
812812

813-
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false) {
813+
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false, TString readSessionId = {}) {
814814
NKikimrPQ::TPartitionOperation operation;
815815
operation.SetCommitOffsetsBegin(begin);
816816
operation.SetCommitOffsetsEnd(end);
817817
operation.SetConsumer(std::move(consumer));
818818
operation.SetForceCommit(forceCommit);
819819
operation.SetKillReadSession(killReadSession);
820820
operation.SetOnlyCheckCommitedToFinish(onlyCheckCommitedToFinish);
821+
operation.SetReadSessionId(readSessionId);
821822

822823
Operations.push_back(std::move(operation));
823824
}

ydb/core/persqueue/partition.cpp

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2070,10 +2070,10 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
20702070
return true;
20712071
}
20722072

2073-
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicate)
2073+
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicateOut)
20742074
{
20752075
THashSet<TString> consumers;
2076-
bool ok = true;
2076+
bool result = true;
20772077
for (auto& operation : tx.Operations) {
20782078
const TString& consumer = operation.GetConsumer();
20792079
if (TxAffectedConsumers.contains(consumer)) {
@@ -2086,49 +2086,55 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
20862086
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
20872087
PQ_LOG_D("Partition " << Partition <<
20882088
" Consumer '" << consumer << "' has been removed");
2089-
ok = false;
2089+
result = false;
20902090
break;
20912091
}
20922092

20932093
if (!UsersInfoStorage->GetIfExists(consumer)) {
20942094
PQ_LOG_D("Partition " << Partition <<
20952095
" Unknown consumer '" << consumer << "'");
2096-
ok = false;
2096+
result = false;
20972097
break;
20982098
}
20992099

2100-
bool isAffectedConsumer = AffectedUsers.contains(consumer); // savnik check
2100+
bool isAffectedConsumer = AffectedUsers.contains(consumer);
21012101
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
21022102

2103-
if (operation.HasOnlyCheckCommitedToFinish() && operation.GetOnlyCheckCommitedToFinish() && !IsActive()) {
2103+
if (!operation.GetReadSessionId().Empty() && operation.GetReadSessionId() != userInfo.Session) {
2104+
PQ_LOG_D("Partition " << Partition <<
2105+
" Consumer '" << consumer << "'" <<
2106+
" Bad request (session already dead) " <<
2107+
" SessionId '" << operation.GetReadSessionId() << "'");
2108+
result = false;
2109+
} else if (operation.GetOnlyCheckCommitedToFinish() && !IsActive()) {
21042110
if (static_cast<ui64>(userInfo.Offset) != EndOffset) {
2105-
ok = false;
2111+
result = false;
21062112
}
21072113
} else {
2108-
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) { // savnik check default
2114+
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
21092115
PQ_LOG_D("Partition " << Partition <<
21102116
" Consumer '" << consumer << "'" <<
21112117
" Bad request (invalid range) " <<
21122118
" Begin " << operation.GetCommitOffsetsBegin() <<
21132119
" End " << operation.GetCommitOffsetsEnd());
2114-
ok = false;
2120+
result = false;
21152121
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
21162122
PQ_LOG_D("Partition " << Partition <<
21172123
" Consumer '" << consumer << "'" <<
21182124
" Bad request (gap) " <<
21192125
" Offset " << userInfo.Offset <<
21202126
" Begin " << operation.GetCommitOffsetsBegin());
2121-
ok = false;
2127+
result = false;
21222128
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
21232129
PQ_LOG_D("Partition " << Partition <<
21242130
" Consumer '" << consumer << "'" <<
21252131
" Bad request (behind the last offset) " <<
21262132
" EndOffset " << EndOffset <<
21272133
" End " << operation.GetCommitOffsetsEnd());
2128-
ok = false;
2134+
result = false;
21292135
}
21302136

2131-
if (!ok) {
2137+
if (!result) {
21322138
if (!isAffectedConsumer) {
21332139
AffectedUsers.erase(consumer);
21342140
}
@@ -2137,10 +2143,10 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
21372143
consumers.insert(consumer);
21382144
}
21392145
}
2140-
if (ok) {
2141-
TxAffectedConsumers.insert(consumers.begin(), consumers.end()); // savnik check
2146+
if (result) {
2147+
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
21422148
}
2143-
predicate = ok;
2149+
predicateOut = result;
21442150
return EProcessResult::Continue;
21452151
}
21462152

@@ -2279,9 +2285,13 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
22792285
Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate);
22802286

22812287
for (auto& operation : t->Tx->Operations) {
2288+
if (operation.GetOnlyCheckCommitedToFinish()) {
2289+
continue;
2290+
}
2291+
22822292
TUserInfoBase& userInfo = GetOrCreatePendingUser(operation.GetConsumer());
22832293

2284-
if (!operation.HasForceCommit() || !operation.GetForceCommit()) { // savnik check default
2294+
if (!operation.GetForceCommit()) {
22852295
Y_ABORT_UNLESS(userInfo.Offset == (i64)operation.GetCommitOffsetsBegin());
22862296
}
22872297

@@ -2295,7 +2305,6 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
22952305
userInfo.AnyCommits = true;
22962306
userInfo.Offset = operation.GetCommitOffsetsEnd();
22972307
}
2298-
22992308
if (operation.GetKillReadSession()) {
23002309
userInfo.Session = "";
23012310
userInfo.PartitionSessionId = 0;

ydb/core/persqueue/partition_read.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> TPartition::MakeHasDataInfoRespon
122122
ui32 partitionId = Partition.OriginalPartitionId;
123123

124124
auto* node = PartitionGraph.GetPartition(partitionId);
125-
for (auto* child : node->Children) {
125+
for (auto* child : node->DirectChildren) {
126126
res->Record.AddChildPartitionIds(child->Id);
127127

128-
for (auto* p : child->Parents) {
128+
for (auto* p : child->DirectParents) {
129129
if (p->Id != partitionId) {
130130
res->Record.AddAdjacentPartitionIds(p->Id);
131131
}
@@ -744,7 +744,6 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) {
744744
Y_ABORT_UNLESS(read->Offset <= EndOffset);
745745

746746
auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
747-
748747
if (!read->SessionId.empty() && !userInfo.NoConsumer) {
749748
if (userInfo.Session != read->SessionId) {
750749
TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR_NO_SESSION].Increment(1);

ydb/core/persqueue/partition_scale_manager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
7474
auto partitionId = PartitionsToSplit.begin();
7575
while (allowedSplitsCount > 0 && partitionId != PartitionsToSplit.end()) {
7676
auto* node = PartitionGraph.GetPartition(*partitionId);
77-
if (node->Children.empty()) {
77+
if (node->DirectChildren.empty()) {
7878
auto from = node->From;
7979
auto to = node->To;
8080
auto mid = MiddleOf(from, to);

ydb/core/persqueue/partition_sourcemanager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {
5050

5151
bool TPartitionSourceManager::HasParents() const {
5252
auto node = GetPartitionNode();
53-
return node && !node->Parents.empty();
53+
return node && !node->DirectParents.empty();
5454
}
5555

5656

ydb/core/persqueue/pq_impl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3885,7 +3885,8 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
38853885
operation.GetCommitOffsetsEnd(),
38863886
operation.HasForceCommit() ? operation.GetForceCommit() : false,
38873887
operation.HasKillReadSession() ? operation.GetKillReadSession() : false,
3888-
operation.HasOnlyCheckCommitedToFinish() ? operation.GetOnlyCheckCommitedToFinish() : false);
3888+
operation.HasOnlyCheckCommitedToFinish() ? operation.GetOnlyCheckCommitedToFinish() : false,
3889+
operation.HasReadSessionId() ? operation.GetReadSessionId() : "");
38893890
}
38903891
}
38913892

ydb/core/persqueue/read_balancer__balancing_app.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void TBalancer::RenderApp(NApp::TNavigationBar& __navigationBar) const {
7676
for (auto& [partitionId, partition] : consumer->Partitions) {
7777
const auto* family = consumer->FindFamily(partitionId);
7878
const auto* node = consumer->GetPartitionGraph().GetPartition(partitionId);
79-
TString style = node && node->Children.empty() ? "text-success" : "text-muted";
79+
TString style = node && node->DirectChildren.empty() ? "text-success" : "text-muted";
8080
auto* partitionInfo = GetPartitionInfo(partitionId);
8181

8282
TABLER() {
@@ -110,7 +110,7 @@ void TBalancer::RenderApp(NApp::TNavigationBar& __navigationBar) const {
110110
}
111111
TABLED() {
112112
if (node) {
113-
for (auto* parent : node->Parents) {
113+
for (auto* parent : node->DirectParents) {
114114
HREF("#" + partitionAnchor(parent->Id)) { __stream << parent->Id; }
115115
__stream << ", ";
116116
}

ydb/core/persqueue/transaction.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,16 +185,16 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
185185
continue;
186186
}
187187

188-
if (node->Children.empty()) {
189-
for (const auto* r : node->Parents) {
188+
if (node->DirectChildren.empty()) {
189+
for (const auto* r : node->DirectParents) {
190190
if (extractTabletId != r->TabletId) {
191191
PredicatesReceived[r->TabletId].SetTabletId(r->TabletId);
192192
}
193193
}
194194
}
195195

196-
for (const auto* r : node->Children) {
197-
if (r->Children.empty()) {
196+
for (const auto* r : node->DirectChildren) {
197+
if (r->DirectChildren.empty()) {
198198
if (extractTabletId != r->TabletId) {
199199
PredicateRecipients[r->TabletId] = false;
200200
}

0 commit comments

Comments
 (0)