Skip to content

Commit f0d474c

Browse files
committed
work
1 parent 18213f3 commit f0d474c

27 files changed

+230
-129
lines changed

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
378378
} else {
379379
for (auto& range : partition.partition_offsets()) {
380380
YQL_ENSURE(consumer.Defined());
381-
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish());
381+
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish(), partition.read_session_id());
382382
}
383383
}
384384
}

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,24 +56,34 @@ bool TConsumerOperations::GetOnlyCheckCommitedToFinish() const
5656
return OnlyCheckCommitedToFinish_;
5757
}
5858

59+
TString TConsumerOperations::GetReadSessionId() const
60+
{
61+
return ReadSessionId_;
62+
}
63+
5964
void TConsumerOperations::AddOperation(const TString& consumer,
6065
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
6166
bool forceCommit,
6267
bool killReadSession,
63-
bool onlyCheckCommitedToFinish)
68+
bool onlyCheckCommitedToFinish,
69+
const TString& readSessionId)
6470
{
6571
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == consumer);
6672

67-
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish);
73+
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
6874
}
6975

7076
void TConsumerOperations::Merge(const TConsumerOperations& rhs)
7177
{
7278
Y_ABORT_UNLESS(rhs.Consumer_.Defined());
7379
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == rhs.Consumer_);
7480

75-
for (auto& range : rhs.Offsets_) {
76-
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish());
81+
if (!rhs.Offsets_.Empty()) {
82+
for (auto& range : rhs.Offsets_) {
83+
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
84+
}
85+
} else {
86+
AddOperationImpl(*rhs.Consumer_, 0, 0, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish(), rhs.GetReadSessionId());
7787
}
7888
}
7989

@@ -82,7 +92,8 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
8292
ui64 end,
8393
bool forceCommit,
8494
bool killReadSession,
85-
bool onlyCheckCommitedToFinish)
95+
bool onlyCheckCommitedToFinish,
96+
const TString& readSessionId)
8697
{
8798
if (Offsets_.Intersects(begin, end)) {
8899
ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
@@ -99,6 +110,7 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
99110
ForceCommit_ = forceCommit;
100111
KillReadSession_ = killReadSession;
101112
OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
113+
ReadSessionId_ = readSessionId;
102114
}
103115

104116
//
@@ -116,7 +128,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic,
116128
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
117129
bool forceCommit,
118130
bool killReadSession,
119-
bool onlyCheckCommitedToFinish)
131+
bool onlyCheckCommitedToFinish,
132+
const TString& readSessionId)
120133
{
121134
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
122135
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
@@ -126,7 +139,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic,
126139
Partition_ = partition;
127140
}
128141

129-
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish);
142+
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish, readSessionId);
130143
}
131144

132145
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
@@ -163,6 +176,7 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
163176
o->SetKillReadSession(operations.GetKillReadSession());
164177
o->SetForceCommit(operations.GetForceCommit());
165178
o->SetOnlyCheckCommitedToFinish(operations.GetOnlyCheckCommitedToFinish());
179+
o->SetReadSessionId(operations.GetReadSessionId());
166180
}
167181

168182
if (HasWriteOperations_) {
@@ -298,7 +312,8 @@ void TTopicOperations::AddOperation(const TString& topic,
298312
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
299313
bool forceCommit,
300314
bool killReadSession,
301-
bool onlyCheckCommitedToFinish
315+
bool onlyCheckCommitedToFinish,
316+
const TString& readSessionId
302317
)
303318
{
304319
TTopicPartition key{topic, partition};
@@ -308,7 +323,8 @@ void TTopicOperations::AddOperation(const TString& topic,
308323
range,
309324
forceCommit,
310325
killReadSession,
311-
onlyCheckCommitedToFinish);
326+
onlyCheckCommitedToFinish,
327+
readSessionId);
312328
HasReadOperations_ = true;
313329
}
314330

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ class TConsumerOperations {
3535
bool GetForceCommit() const;
3636
bool GetKillReadSession() const;
3737
bool GetOnlyCheckCommitedToFinish() const;
38+
TString GetReadSessionId() const;
3839

3940
void AddOperation(const TString& consumer,
4041
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
4142
bool forceCommit = false,
4243
bool killReadSession = false,
43-
bool onlyCheckCommitedToFinish = false);
44+
bool onlyCheckCommitedToFinish = false,
45+
const TString& readSessionId = {});
4446

4547
void Merge(const TConsumerOperations& rhs);
4648

@@ -50,13 +52,15 @@ class TConsumerOperations {
5052
ui64 end,
5153
bool forceCommit = false,
5254
bool killReadSession = false,
53-
bool onlyCheckCommitedToFinish = false);
55+
bool onlyCheckCommitedToFinish = false,
56+
const TString& readSessionId = {});
5457

5558
TMaybe<TString> Consumer_;
5659
TDisjointIntervalTree<ui64> Offsets_;
5760
bool ForceCommit_ = false;
5861
bool KillReadSession_ = false;
5962
bool OnlyCheckCommitedToFinish_ = false;
63+
TString ReadSessionId_;
6064
};
6165

6266
struct TTopicOperationTransaction {
@@ -76,7 +80,8 @@ class TTopicPartitionOperations {
7680
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
7781
bool forceCommit = false,
7882
bool killReadSession = false,
79-
bool onlyCheckCommitedToFinish = false);
83+
bool onlyCheckCommitedToFinish = false,
84+
const TString& readSessionId = {});
8085
void AddOperation(const TString& topic, ui32 partition,
8186
TMaybe<ui32> supportivePartition);
8287

@@ -130,7 +135,8 @@ class TTopicOperations {
130135
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
131136
bool forceCommit,
132137
bool killReadSession,
133-
bool onlyCheckCommitedToFinish);
138+
bool onlyCheckCommitedToFinish,
139+
const TString& readSessionId);
134140
void AddOperation(const TString& topic, ui32 partition,
135141
TMaybe<ui32> supportivePartition);
136142

ydb/core/persqueue/events/internal.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -813,14 +813,15 @@ struct TEvPQ {
813813
{
814814
}
815815

816-
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false) {
816+
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false, TString readSessionId = {}) {
817817
NKikimrPQ::TPartitionOperation operation;
818818
operation.SetCommitOffsetsBegin(begin);
819819
operation.SetCommitOffsetsEnd(end);
820820
operation.SetConsumer(std::move(consumer));
821821
operation.SetForceCommit(forceCommit);
822822
operation.SetKillReadSession(killReadSession);
823823
operation.SetOnlyCheckCommitedToFinish(onlyCheckCommitedToFinish);
824+
operation.SetReadSessionId(readSessionId);
824825

825826
Operations.push_back(std::move(operation));
826827
}

ydb/core/persqueue/partition.cpp

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,10 +2051,10 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
20512051
return true;
20522052
}
20532053

2054-
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicate)
2054+
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicateOut)
20552055
{
20562056
THashSet<TString> consumers;
2057-
bool ok = true;
2057+
bool result = true;
20582058
for (auto& operation : tx.Operations) {
20592059
const TString& consumer = operation.GetConsumer();
20602060
if (TxAffectedConsumers.contains(consumer)) {
@@ -2067,49 +2067,55 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
20672067
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
20682068
PQ_LOG_D("Partition " << Partition <<
20692069
" Consumer '" << consumer << "' has been removed");
2070-
ok = false;
2070+
result = false;
20712071
break;
20722072
}
20732073

20742074
if (!UsersInfoStorage->GetIfExists(consumer)) {
20752075
PQ_LOG_D("Partition " << Partition <<
20762076
" Unknown consumer '" << consumer << "'");
2077-
ok = false;
2077+
result = false;
20782078
break;
20792079
}
20802080

2081-
bool isAffectedConsumer = AffectedUsers.contains(consumer); // savnik check
2081+
bool isAffectedConsumer = AffectedUsers.contains(consumer);
20822082
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
20832083

2084-
if (operation.HasOnlyCheckCommitedToFinish() && operation.GetOnlyCheckCommitedToFinish() && !IsActive()) {
2084+
if (!operation.GetReadSessionId().Empty() && operation.GetReadSessionId() != userInfo.Session) {
2085+
PQ_LOG_D("Partition " << Partition <<
2086+
" Consumer '" << consumer << "'" <<
2087+
" Bad request (session already dead) " <<
2088+
" SessionId '" << operation.GetReadSessionId() << "'");
2089+
result = false;
2090+
} else if (operation.GetOnlyCheckCommitedToFinish() && !IsActive()) {
20852091
if (static_cast<ui64>(userInfo.Offset) != EndOffset) {
2086-
ok = false;
2092+
result = false;
20872093
}
20882094
} else {
2089-
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) { // savnik check default
2095+
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
20902096
PQ_LOG_D("Partition " << Partition <<
20912097
" Consumer '" << consumer << "'" <<
20922098
" Bad request (invalid range) " <<
20932099
" Begin " << operation.GetCommitOffsetsBegin() <<
20942100
" End " << operation.GetCommitOffsetsEnd());
2095-
ok = false;
2101+
result = false;
20962102
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
20972103
PQ_LOG_D("Partition " << Partition <<
20982104
" Consumer '" << consumer << "'" <<
20992105
" Bad request (gap) " <<
21002106
" Offset " << userInfo.Offset <<
21012107
" Begin " << operation.GetCommitOffsetsBegin());
2102-
ok = false;
2108+
result = false;
21032109
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
21042110
PQ_LOG_D("Partition " << Partition <<
21052111
" Consumer '" << consumer << "'" <<
21062112
" Bad request (behind the last offset) " <<
21072113
" EndOffset " << EndOffset <<
21082114
" End " << operation.GetCommitOffsetsEnd());
2109-
ok = false;
2115+
result = false;
21102116
}
21112117

2112-
if (!ok) {
2118+
if (!result) {
21132119
if (!isAffectedConsumer) {
21142120
AffectedUsers.erase(consumer);
21152121
}
@@ -2118,10 +2124,10 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
21182124
consumers.insert(consumer);
21192125
}
21202126
}
2121-
if (ok) {
2122-
TxAffectedConsumers.insert(consumers.begin(), consumers.end()); // savnik check
2127+
if (result) {
2128+
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
21232129
}
2124-
predicate = ok;
2130+
predicateOut = result;
21252131
return EProcessResult::Continue;
21262132
}
21272133

@@ -2260,9 +2266,13 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
22602266
Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate);
22612267

22622268
for (auto& operation : t->Tx->Operations) {
2269+
if (operation.GetOnlyCheckCommitedToFinish()) {
2270+
continue;
2271+
}
2272+
22632273
TUserInfoBase& userInfo = GetOrCreatePendingUser(operation.GetConsumer());
22642274

2265-
if (!operation.HasForceCommit() || !operation.GetForceCommit()) { // savnik check default
2275+
if (!operation.GetForceCommit()) {
22662276
Y_ABORT_UNLESS(userInfo.Offset == (i64)operation.GetCommitOffsetsBegin());
22672277
}
22682278

@@ -2276,7 +2286,6 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
22762286
userInfo.AnyCommits = true;
22772287
userInfo.Offset = operation.GetCommitOffsetsEnd();
22782288
}
2279-
22802289
if (operation.GetKillReadSession()) {
22812290
userInfo.Session = "";
22822291
userInfo.PartitionSessionId = 0;

ydb/core/persqueue/partition_read.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,10 @@ TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> TPartition::MakeHasDataInfoRespon
104104
ui32 partitionId = Partition.OriginalPartitionId;
105105

106106
auto* node = PartitionGraph.GetPartition(partitionId);
107-
for (auto* child : node->Children) {
107+
for (auto* child : node->DirectChildren) {
108108
res->Record.AddChildPartitionIds(child->Id);
109109

110-
for (auto* p : child->Parents) {
110+
for (auto* p : child->DirectParents) {
111111
if (p->Id != partitionId) {
112112
res->Record.AddAdjacentPartitionIds(p->Id);
113113
}
@@ -731,7 +731,6 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) {
731731
Y_ABORT_UNLESS(read->Offset <= EndOffset);
732732

733733
auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
734-
735734
if (!read->SessionId.empty() && !userInfo.NoConsumer) {
736735
if (userInfo.Session != read->SessionId) {
737736
TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR_NO_SESSION].Increment(1);

ydb/core/persqueue/partition_scale_manager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
7474
auto partitionId = PartitionsToSplit.begin();
7575
while (allowedSplitsCount > 0 && partitionId != PartitionsToSplit.end()) {
7676
auto* node = PartitionGraph.GetPartition(*partitionId);
77-
if (node->Children.empty()) {
77+
if (node->DirectChildren.empty()) {
7878
auto from = node->From;
7979
auto to = node->To;
8080
auto mid = MiddleOf(from, to);

ydb/core/persqueue/partition_sourcemanager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {
5050

5151
bool TPartitionSourceManager::HasParents() const {
5252
auto node = GetPartitionNode();
53-
return node && !node->Parents.empty();
53+
return node && !node->DirectParents.empty();
5454
}
5555

5656

ydb/core/persqueue/pq_impl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4073,7 +4073,8 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
40734073
operation.GetCommitOffsetsEnd(),
40744074
operation.HasForceCommit() ? operation.GetForceCommit() : false,
40754075
operation.HasKillReadSession() ? operation.GetKillReadSession() : false,
4076-
operation.HasOnlyCheckCommitedToFinish() ? operation.GetOnlyCheckCommitedToFinish() : false);
4076+
operation.HasOnlyCheckCommitedToFinish() ? operation.GetOnlyCheckCommitedToFinish() : false,
4077+
operation.HasReadSessionId() ? operation.GetReadSessionId() : "");
40774078
}
40784079
}
40794080

ydb/core/persqueue/read_balancer.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ TString TPersQueueReadBalancer::GenerateStat() {
283283
for (auto& [partitionId, partitionInfo] : PartitionsInfo) {
284284
const auto& stats = AggregatedStats.Stats[partitionId];
285285
const auto* node = PartitionGraph.GetPartition(partitionId);
286-
TString style = node && node->Children.empty() ? "text-success" : "text-muted";
286+
TString style = node && node->DirectChildren.empty() ? "text-success" : "text-muted";
287287

288288
TABLER() {
289289
TABLED() {
@@ -293,7 +293,7 @@ TString TPersQueueReadBalancer::GenerateStat() {
293293
}
294294
TABLED() {
295295
if (node) {
296-
str << (node->Children.empty() ? "Active" : "Inactive");
296+
str << (node->DirectChildren.empty() ? "Active" : "Inactive");
297297
if (node->IsRoot()) {
298298
str << " (root)";
299299
}
@@ -302,15 +302,15 @@ TString TPersQueueReadBalancer::GenerateStat() {
302302
TABLED() { HREF(TStringBuilder() << "?TabletID=" << partitionInfo.TabletId) { str << partitionInfo.TabletId; } }
303303
TABLED() {
304304
if (node) {
305-
for (auto* parent : node->Parents) {
305+
for (auto* parent : node->DirectParents) {
306306
HREF("#" + partitionAnchor(parent->Id)) { str << parent->Id; }
307307
str << ", ";
308308
}
309309
}
310310
}
311311
TABLED() {
312312
if (node) {
313-
for (auto* child : node->Children) {
313+
for (auto* child : node->DirectChildren) {
314314
HREF("#" + partitionAnchor(child->Id)) { str << child->Id; }
315315
str << ", ";
316316
}

0 commit comments

Comments
 (0)