Skip to content

Commit 674162e

Browse files
committed
work in progress
1 parent 0a30799 commit 674162e

19 files changed

+1350
-1258
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2096,6 +2096,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20962096
TTopicTabletTxs topicTxs;
20972097
TDatashardTxs datashardTxs;
20982098
TEvWriteTxs evWriteTxs;
2099+
20992100
if (!TxManager) {
21002101
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
21012102
}
@@ -2380,7 +2381,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23802381
YQL_ENSURE(!ReadOnlyTx);
23812382
}
23822383
}
2383-
23842384
Request.TopicOperations.BuildTopicTxs(topicTxs);
23852385

23862386
const bool needRollback = Request.LocksOp == ELocksOp::Rollback;

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ bool TKqpQueryState::PrepareNextStatementPart() {
355355
void TKqpQueryState::AddOffsetsToTransaction() {
356356
YQL_ENSURE(HasTopicOperations());
357357

358-
const auto& operations = GetTopicOperations();
358+
const auto& operations = GetTopicOperationsFromRequest();
359359

360360
TMaybe<TString> consumer;
361361
if (operations.HasConsumer()) {
@@ -368,7 +368,6 @@ void TKqpQueryState::AddOffsetsToTransaction() {
368368
}
369369

370370
TopicOperations = NTopic::TTopicOperations();
371-
372371
for (auto& topic : operations.GetTopics()) {
373372
auto path = CanonizePath(NPersQueue::GetFullTopicPath(TlsActivationContext->AsActorContext(),
374373
GetDatabase(), topic.path()));
@@ -379,8 +378,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
379378
} else {
380379
for (auto& range : partition.partition_offsets()) {
381380
YQL_ENSURE(consumer.Defined());
382-
383-
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range);
381+
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish());
384382
}
385383
}
386384
}
@@ -401,7 +399,7 @@ std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> TKqpQueryState::BuildSchemeC
401399
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
402400
navigate->DatabaseName = CanonizePath(GetDatabase());
403401

404-
const auto& operations = GetTopicOperations();
402+
const auto& operations = GetTopicOperationsFromRequest();
405403
TMaybe<TString> consumer;
406404
if (operations.HasConsumer())
407405
consumer = operations.GetConsumer();

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ class TKqpQueryState : public TNonCopyable {
315315
return RequestEv->GetQuery();
316316
}
317317

318-
const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const {
318+
const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperationsFromRequest() const {
319319
return RequestEv->GetTopicOperations();
320320
}
321321

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
770770
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
771771
QueryState->TxCtx->SetIsolationLevel(settings);
772772
QueryState->TxCtx->OnBeginQuery();
773-
774773
if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) {
775774
std::vector<TIssue> issues{
776775
YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)};
@@ -1246,7 +1245,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12461245
}
12471246
}
12481247
}
1249-
12501248
request.TopicOperations = std::move(txCtx.TopicOperations);
12511249
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
12521250
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "kqp_topics.h"
22

33
#include <ydb/core/base/path.h>
4+
#include <ydb/core/protos/kqp.pb.h>
45
#include <ydb/core/persqueue/utils.h>
56
#include <ydb/library/actors/core/log.h>
67

@@ -29,18 +30,37 @@ bool TConsumerOperations::IsValid() const
2930
return Offsets_.GetNumIntervals() == 1;
3031
}
3132

32-
std::pair<ui64, ui64> TConsumerOperations::GetRange() const
33+
std::pair<ui64, ui64> TConsumerOperations::GetOffsetsCommitRange() const
3334
{
3435
Y_ABORT_UNLESS(IsValid());
3536

3637
return {Offsets_.Min(), Offsets_.Max()};
3738
}
3839

39-
void TConsumerOperations::AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range)
40+
bool TConsumerOperations::GetForceCommit() const
41+
{
42+
return ForceCommit_;
43+
}
44+
45+
bool TConsumerOperations::GetKillReadSession() const
46+
{
47+
return KillReadSession_;
48+
}
49+
50+
bool TConsumerOperations::GetOnlyCheckCommitedToFinish() const
51+
{
52+
return OnlyCheckCommitedToFinish_;
53+
}
54+
55+
void TConsumerOperations::AddOperation(const TString& consumer,
56+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
57+
bool forceCommit,
58+
bool killReadSession,
59+
bool onlyCheckCommitedToFinish)
4060
{
4161
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == consumer);
4262

43-
AddOperationImpl(consumer, range.start(), range.end());
63+
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish);
4464
}
4565

4666
void TConsumerOperations::Merge(const TConsumerOperations& rhs)
@@ -49,12 +69,16 @@ void TConsumerOperations::Merge(const TConsumerOperations& rhs)
4969
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == rhs.Consumer_);
5070

5171
for (auto& range : rhs.Offsets_) {
52-
AddOperationImpl(*rhs.Consumer_, range.first, range.second);
72+
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish());
5373
}
5474
}
5575

5676
void TConsumerOperations::AddOperationImpl(const TString& consumer,
57-
ui64 begin, ui64 end)
77+
ui64 begin,
78+
ui64 end,
79+
bool forceCommit,
80+
bool killReadSession,
81+
bool onlyCheckCommitedToFinish)
5882
{
5983
if (Offsets_.Intersects(begin, end)) {
6084
ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
@@ -65,6 +89,10 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
6589
}
6690

6791
Offsets_.InsertInterval(begin, end);
92+
93+
ForceCommit_ = forceCommit;
94+
KillReadSession_ = killReadSession;
95+
OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
6896
}
6997

7098
//
@@ -76,9 +104,13 @@ bool TTopicPartitionOperations::IsValid() const
76104
[](auto& x) { return x.second.IsValid(); });
77105
}
78106

79-
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
107+
void TTopicPartitionOperations::AddOperation(const TString& topic,
108+
ui32 partition,
80109
const TString& consumer,
81-
const Ydb::Topic::OffsetsRange& range)
110+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
111+
bool forceCommit,
112+
bool killReadSession,
113+
bool onlyCheckCommitedToFinish)
82114
{
83115
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
84116
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
@@ -88,7 +120,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
88120
Partition_ = partition;
89121
}
90122

91-
Operations_[consumer].AddOperation(consumer, range);
123+
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish);
92124
}
93125

94126
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
@@ -117,11 +149,14 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
117149
for (auto& [consumer, operations] : Operations_) {
118150
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
119151
o->SetPartitionId(*Partition_);
120-
auto [begin, end] = operations.GetRange();
121-
o->SetBegin(begin);
122-
o->SetEnd(end);
152+
auto [begin, end] = operations.GetOffsetsCommitRange();
153+
o->SetCommitOffsetsBegin(begin);
154+
o->SetCommitOffsetsEnd(end);
123155
o->SetConsumer(consumer);
124156
o->SetPath(*Topic_);
157+
o->SetKillReadSession(operations.GetKillReadSession());
158+
o->SetForceCommit(operations.GetForceCommit());
159+
o->SetOnlyCheckCommitedToFinish(operations.GetOnlyCheckCommitedToFinish());
125160
}
126161

127162
if (HasWriteOperations_) {
@@ -251,14 +286,23 @@ bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
251286
return false;
252287
}
253288

254-
void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
289+
void TTopicOperations::AddOperation(const TString& topic,
290+
ui32 partition,
255291
const TString& consumer,
256-
const Ydb::Topic::OffsetsRange& range)
292+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
293+
bool forceCommit,
294+
bool killReadSession,
295+
bool onlyCheckCommitedToFinish
296+
)
257297
{
258298
TTopicPartition key{topic, partition};
259-
Operations_[key].AddOperation(topic, partition,
299+
Operations_[key].AddOperation(topic,
300+
partition,
260301
consumer,
261-
range);
302+
range,
303+
forceCommit,
304+
killReadSession,
305+
onlyCheckCommitedToFinish);
262306
HasReadOperations_ = true;
263307
}
264308

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/public/api/protos/ydb_topic.pb.h>
55
#include <ydb/core/protos/pqconfig.pb.h>
66

7+
#include <ydb/core/protos/kqp.pb.h>
78
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
89
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
910

@@ -26,20 +27,36 @@ class TConsumerOperations {
2627
public:
2728
bool IsValid() const;
2829

29-
std::pair<ui64, ui64> GetRange() const;
30+
std::pair<ui64, ui64> GetOffsetsCommitRange() const;
3031

31-
ui64 GetBegin() const;
32-
ui64 GetEnd() const;
32+
ui64 GetOffsetCommitBegin() const;
33+
ui64 GetOffsetCommitEnd() const;
34+
35+
bool GetForceCommit() const;
36+
bool GetKillReadSession() const;
37+
bool GetOnlyCheckCommitedToFinish() const;
38+
39+
void AddOperation(const TString& consumer,
40+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
41+
bool forceCommit = false,
42+
bool killReadSession = false,
43+
bool onlyCheckCommitedToFinish = false);
3344

34-
void AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range);
3545
void Merge(const TConsumerOperations& rhs);
3646

3747
private:
3848
void AddOperationImpl(const TString& consumer,
39-
ui64 begin, ui64 end);
49+
ui64 begin,
50+
ui64 end,
51+
bool forceCommit = false,
52+
bool killReadSession = false,
53+
bool onlyCheckCommitedToFinish = false);
4054

4155
TMaybe<TString> Consumer_;
4256
TDisjointIntervalTree<ui64> Offsets_;
57+
bool ForceCommit_ = false;
58+
bool KillReadSession_ = false;
59+
bool OnlyCheckCommitedToFinish_ = false;
4360
};
4461

4562
struct TTopicOperationTransaction {
@@ -53,9 +70,13 @@ class TTopicPartitionOperations {
5370
public:
5471
bool IsValid() const;
5572

56-
void AddOperation(const TString& topic, ui32 partition,
73+
void AddOperation(const TString& topic,
74+
ui32 partition,
5775
const TString& consumer,
58-
const Ydb::Topic::OffsetsRange& range);
76+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
77+
bool forceCommit = false,
78+
bool killReadSession = false,
79+
bool onlyCheckCommitedToFinish = false);
5980
void AddOperation(const TString& topic, ui32 partition,
6081
TMaybe<ui32> supportivePartition);
6182

@@ -106,7 +127,10 @@ class TTopicOperations {
106127

107128
void AddOperation(const TString& topic, ui32 partition,
108129
const TString& consumer,
109-
const Ydb::Topic::OffsetsRange& range);
130+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
131+
bool forceCommit,
132+
bool killReadSession,
133+
bool onlyCheckCommitedToFinish);
110134
void AddOperation(const TString& topic, ui32 partition,
111135
TMaybe<ui32> supportivePartition);
112136

ydb/core/persqueue/events/internal.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -813,11 +813,14 @@ struct TEvPQ {
813813
{
814814
}
815815

816-
void AddOperation(TString consumer, ui64 begin, ui64 end) {
816+
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false) {
817817
NKikimrPQ::TPartitionOperation operation;
818-
operation.SetBegin(begin);
819-
operation.SetEnd(end);
818+
operation.SetCommitOffsetsBegin(begin);
819+
operation.SetCommitOffsetsEnd(end);
820820
operation.SetConsumer(std::move(consumer));
821+
operation.SetForceCommit(forceCommit);
822+
operation.SetKillReadSession(killReadSession);
823+
operation.SetOnlyCheckCommitedToFinish(onlyCheckCommitedToFinish);
821824

822825
Operations.push_back(std::move(operation));
823826
}

0 commit comments

Comments
 (0)