Skip to content

Commit dda3a67

Browse files
committed
work in progress
1 parent fb2c723 commit dda3a67

19 files changed

+1169
-820
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2096,6 +2096,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
20962096
TTopicTabletTxs topicTxs;
20972097
TDatashardTxs datashardTxs;
20982098
TEvWriteTxs evWriteTxs;
2099+
20992100
if (!TxManager) {
21002101
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
21012102
}
@@ -2380,7 +2381,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23802381
YQL_ENSURE(!ReadOnlyTx);
23812382
}
23822383
}
2383-
23842384
Request.TopicOperations.BuildTopicTxs(topicTxs);
23852385

23862386
const bool needRollback = Request.LocksOp == ELocksOp::Rollback;

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ bool TKqpQueryState::PrepareNextStatementPart() {
428428
void TKqpQueryState::AddOffsetsToTransaction() {
429429
YQL_ENSURE(HasTopicOperations());
430430

431-
const auto& operations = GetTopicOperations();
431+
const auto& operations = GetTopicOperationsFromRequest();
432432

433433
TMaybe<TString> consumer;
434434
if (operations.HasConsumer()) {
@@ -441,7 +441,6 @@ void TKqpQueryState::AddOffsetsToTransaction() {
441441
}
442442

443443
TopicOperations = NTopic::TTopicOperations();
444-
445444
for (auto& topic : operations.GetTopics()) {
446445
auto path = CanonizePath(NPersQueue::GetFullTopicPath(TlsActivationContext->AsActorContext(),
447446
GetDatabase(), topic.path()));
@@ -452,8 +451,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
452451
} else {
453452
for (auto& range : partition.partition_offsets()) {
454453
YQL_ENSURE(consumer.Defined());
455-
456-
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range);
454+
TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range, partition.force_commit(), partition.kill_read_session(), partition.only_check_commited_to_finish());
457455
}
458456
}
459457
}
@@ -474,7 +472,7 @@ std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> TKqpQueryState::BuildSchemeC
474472
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
475473
navigate->DatabaseName = CanonizePath(GetDatabase());
476474

477-
const auto& operations = GetTopicOperations();
475+
const auto& operations = GetTopicOperationsFromRequest();
478476
TMaybe<TString> consumer;
479477
if (operations.HasConsumer())
480478
consumer = operations.GetConsumer();

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ class TKqpQueryState : public TNonCopyable {
317317
return RequestEv->GetQuery();
318318
}
319319

320-
const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const {
320+
const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperationsFromRequest() const {
321321
return RequestEv->GetTopicOperations();
322322
}
323323

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
817817
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
818818
QueryState->TxCtx->SetIsolationLevel(settings);
819819
QueryState->TxCtx->OnBeginQuery();
820-
821820
if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) {
822821
std::vector<TIssue> issues{
823822
YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)};
@@ -1293,7 +1292,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
12931292
}
12941293
}
12951294
}
1296-
12971295
request.TopicOperations = std::move(txCtx.TopicOperations);
12981296
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
12991297
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "kqp_topics.h"
22

33
#include <ydb/core/base/path.h>
4+
#include <ydb/core/protos/kqp.pb.h>
45
#include <ydb/core/persqueue/utils.h>
56
#include <ydb/library/actors/core/log.h>
67

@@ -29,18 +30,37 @@ bool TConsumerOperations::IsValid() const
2930
return Offsets_.GetNumIntervals() == 1;
3031
}
3132

32-
std::pair<ui64, ui64> TConsumerOperations::GetRange() const
33+
std::pair<ui64, ui64> TConsumerOperations::GetOffsetsCommitRange() const
3334
{
3435
Y_ABORT_UNLESS(IsValid());
3536

3637
return {Offsets_.Min(), Offsets_.Max()};
3738
}
3839

39-
void TConsumerOperations::AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range)
40+
bool TConsumerOperations::GetForceCommit() const
41+
{
42+
return ForceCommit_;
43+
}
44+
45+
bool TConsumerOperations::GetKillReadSession() const
46+
{
47+
return KillReadSession_;
48+
}
49+
50+
bool TConsumerOperations::GetOnlyCheckCommitedToFinish() const
51+
{
52+
return OnlyCheckCommitedToFinish_;
53+
}
54+
55+
void TConsumerOperations::AddOperation(const TString& consumer,
56+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
57+
bool forceCommit,
58+
bool killReadSession,
59+
bool onlyCheckCommitedToFinish)
4060
{
4161
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == consumer);
4262

43-
AddOperationImpl(consumer, range.start(), range.end());
63+
AddOperationImpl(consumer, range.start(), range.end(), forceCommit, killReadSession, onlyCheckCommitedToFinish);
4464
}
4565

4666
void TConsumerOperations::Merge(const TConsumerOperations& rhs)
@@ -49,12 +69,16 @@ void TConsumerOperations::Merge(const TConsumerOperations& rhs)
4969
Y_ABORT_UNLESS(Consumer_.Empty() || Consumer_ == rhs.Consumer_);
5070

5171
for (auto& range : rhs.Offsets_) {
52-
AddOperationImpl(*rhs.Consumer_, range.first, range.second);
72+
AddOperationImpl(*rhs.Consumer_, range.first, range.second, rhs.GetForceCommit(), rhs.GetKillReadSession(), rhs.GetOnlyCheckCommitedToFinish());
5373
}
5474
}
5575

5676
void TConsumerOperations::AddOperationImpl(const TString& consumer,
57-
ui64 begin, ui64 end)
77+
ui64 begin,
78+
ui64 end,
79+
bool forceCommit,
80+
bool killReadSession,
81+
bool onlyCheckCommitedToFinish)
5882
{
5983
if (Offsets_.Intersects(begin, end)) {
6084
ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
@@ -65,6 +89,10 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
6589
}
6690

6791
Offsets_.InsertInterval(begin, end);
92+
93+
ForceCommit_ = forceCommit;
94+
KillReadSession_ = killReadSession;
95+
OnlyCheckCommitedToFinish_ = onlyCheckCommitedToFinish;
6896
}
6997

7098
//
@@ -76,9 +104,13 @@ bool TTopicPartitionOperations::IsValid() const
76104
[](auto& x) { return x.second.IsValid(); });
77105
}
78106

79-
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
107+
void TTopicPartitionOperations::AddOperation(const TString& topic,
108+
ui32 partition,
80109
const TString& consumer,
81-
const Ydb::Topic::OffsetsRange& range)
110+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
111+
bool forceCommit,
112+
bool killReadSession,
113+
bool onlyCheckCommitedToFinish)
82114
{
83115
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
84116
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
@@ -88,7 +120,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
88120
Partition_ = partition;
89121
}
90122

91-
Operations_[consumer].AddOperation(consumer, range);
123+
Operations_[consumer].AddOperation(consumer, range, forceCommit, killReadSession, onlyCheckCommitedToFinish);
92124
}
93125

94126
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
@@ -117,11 +149,14 @@ void TTopicPartitionOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
117149
for (auto& [consumer, operations] : Operations_) {
118150
NKikimrPQ::TPartitionOperation* o = t.tx.MutableOperations()->Add();
119151
o->SetPartitionId(*Partition_);
120-
auto [begin, end] = operations.GetRange();
121-
o->SetBegin(begin);
122-
o->SetEnd(end);
152+
auto [begin, end] = operations.GetOffsetsCommitRange();
153+
o->SetCommitOffsetsBegin(begin);
154+
o->SetCommitOffsetsEnd(end);
123155
o->SetConsumer(consumer);
124156
o->SetPath(*Topic_);
157+
o->SetKillReadSession(operations.GetKillReadSession());
158+
o->SetForceCommit(operations.GetForceCommit());
159+
o->SetOnlyCheckCommitedToFinish(operations.GetOnlyCheckCommitedToFinish());
125160
}
126161

127162
if (HasWriteOperations_) {
@@ -251,14 +286,23 @@ bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
251286
return false;
252287
}
253288

254-
void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
289+
void TTopicOperations::AddOperation(const TString& topic,
290+
ui32 partition,
255291
const TString& consumer,
256-
const Ydb::Topic::OffsetsRange& range)
292+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
293+
bool forceCommit,
294+
bool killReadSession,
295+
bool onlyCheckCommitedToFinish
296+
)
257297
{
258298
TTopicPartition key{topic, partition};
259-
Operations_[key].AddOperation(topic, partition,
299+
Operations_[key].AddOperation(topic,
300+
partition,
260301
consumer,
261-
range);
302+
range,
303+
forceCommit,
304+
killReadSession,
305+
onlyCheckCommitedToFinish);
262306
HasReadOperations_ = true;
263307
}
264308

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/public/api/protos/ydb_topic.pb.h>
55
#include <ydb/core/protos/pqconfig.pb.h>
66

7+
#include <ydb/core/protos/kqp.pb.h>
78
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
89
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
910

@@ -26,20 +27,36 @@ class TConsumerOperations {
2627
public:
2728
bool IsValid() const;
2829

29-
std::pair<ui64, ui64> GetRange() const;
30+
std::pair<ui64, ui64> GetOffsetsCommitRange() const;
3031

31-
ui64 GetBegin() const;
32-
ui64 GetEnd() const;
32+
ui64 GetOffsetCommitBegin() const;
33+
ui64 GetOffsetCommitEnd() const;
34+
35+
bool GetForceCommit() const;
36+
bool GetKillReadSession() const;
37+
bool GetOnlyCheckCommitedToFinish() const;
38+
39+
void AddOperation(const TString& consumer,
40+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
41+
bool forceCommit = false,
42+
bool killReadSession = false,
43+
bool onlyCheckCommitedToFinish = false);
3344

34-
void AddOperation(const TString& consumer, const Ydb::Topic::OffsetsRange& range);
3545
void Merge(const TConsumerOperations& rhs);
3646

3747
private:
3848
void AddOperationImpl(const TString& consumer,
39-
ui64 begin, ui64 end);
49+
ui64 begin,
50+
ui64 end,
51+
bool forceCommit = false,
52+
bool killReadSession = false,
53+
bool onlyCheckCommitedToFinish = false);
4054

4155
TMaybe<TString> Consumer_;
4256
TDisjointIntervalTree<ui64> Offsets_;
57+
bool ForceCommit_ = false;
58+
bool KillReadSession_ = false;
59+
bool OnlyCheckCommitedToFinish_ = false;
4360
};
4461

4562
struct TTopicOperationTransaction {
@@ -53,9 +70,13 @@ class TTopicPartitionOperations {
5370
public:
5471
bool IsValid() const;
5572

56-
void AddOperation(const TString& topic, ui32 partition,
73+
void AddOperation(const TString& topic,
74+
ui32 partition,
5775
const TString& consumer,
58-
const Ydb::Topic::OffsetsRange& range);
76+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
77+
bool forceCommit = false,
78+
bool killReadSession = false,
79+
bool onlyCheckCommitedToFinish = false);
5980
void AddOperation(const TString& topic, ui32 partition,
6081
TMaybe<ui32> supportivePartition);
6182

@@ -106,7 +127,10 @@ class TTopicOperations {
106127

107128
void AddOperation(const TString& topic, ui32 partition,
108129
const TString& consumer,
109-
const Ydb::Topic::OffsetsRange& range);
130+
const NKikimrKqp::TTopicOperationsRequest_TopicOffsets_PartitionOffsets_OffsetsRange& range,
131+
bool forceCommit,
132+
bool killReadSession,
133+
bool onlyCheckCommitedToFinish);
110134
void AddOperation(const TString& topic, ui32 partition,
111135
TMaybe<ui32> supportivePartition);
112136

ydb/core/persqueue/events/internal.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -810,11 +810,14 @@ struct TEvPQ {
810810
{
811811
}
812812

813-
void AddOperation(TString consumer, ui64 begin, ui64 end) {
813+
void AddOperation(TString consumer, ui64 begin, ui64 end, bool forceCommit = false, bool killReadSession = false, bool onlyCheckCommitedToFinish = false) {
814814
NKikimrPQ::TPartitionOperation operation;
815-
operation.SetBegin(begin);
816-
operation.SetEnd(end);
815+
operation.SetCommitOffsetsBegin(begin);
816+
operation.SetCommitOffsetsEnd(end);
817817
operation.SetConsumer(std::move(consumer));
818+
operation.SetForceCommit(forceCommit);
819+
operation.SetKillReadSession(killReadSession);
820+
operation.SetOnlyCheckCommitedToFinish(onlyCheckCommitedToFinish);
818821

819822
Operations.push_back(std::move(operation));
820823
}

0 commit comments

Comments
 (0)