Skip to content

Commit 8c3f4c1

Browse files
committed
Fixed committing of proccessed messages by transfer (#17580)
1 parent 06845f8 commit 8c3f4c1

File tree

9 files changed

+176
-41
lines changed

9 files changed

+176
-41
lines changed

ydb/core/tx/replication/service/service.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
377377
return it->second;
378378
}
379379

380-
std::function<IActor*(void)> ReaderFn(const NKikimrReplication::TRemoteTopicReaderSettings& settings) {
380+
std::function<IActor*(void)> ReaderFn(const NKikimrReplication::TRemoteTopicReaderSettings& settings, bool autoCommit) {
381381
TActorId ydbProxy;
382382
const auto& params = settings.GetConnectionParams();
383383
switch (params.GetCredentialsCase()) {
@@ -394,6 +394,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
394394
auto topicReaderSettings = TEvYdbProxy::TTopicReaderSettings()
395395
.MaxMemoryUsageBytes(1_MB)
396396
.ConsumerName(settings.GetConsumerName())
397+
.AutoCommit(autoCommit)
397398
.AppendTopics(NYdb::NTopic::TTopicReadSettings()
398399
.Path(settings.GetTopicPath())
399400
.AppendPartitionIds(settings.GetTopicPartitionId())
@@ -473,6 +474,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
473474
const auto& cmd = record.GetCommand();
474475
// TODO: validate settings
475476
const auto& readerSettings = cmd.GetRemoteTopicReader();
477+
bool autoCommit = true;
476478
std::function<IActor*(void)> writerFn;
477479
if (cmd.HasLocalTableWriter()) {
478480
const auto& writerSettings = cmd.GetLocalTableWriter();
@@ -485,12 +487,13 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
485487
LOG_C("Run transfer but TransferWriterFactory does not exists.");
486488
return;
487489
}
490+
autoCommit = false;
488491
writerFn = TransferWriterFn(writerSettings, transferWriterFactory);
489492
} else {
490493
Y_ABORT("Unsupported");
491494
}
492495
const auto actorId = session.RegisterWorker(this, id,
493-
CreateWorker(SelfId(), ReaderFn(readerSettings), std::move(writerFn)));
496+
CreateWorker(SelfId(), ReaderFn(readerSettings, autoCommit), std::move(writerFn)));
494497
WorkerActorIdToSession[actorId] = controller.GetTabletId();
495498
}
496499

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,44 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
6767
Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records)));
6868
}
6969

70-
void Handle(TEvYdbProxy::TEvTopicEndPartition::TPtr& ev) {
70+
void Handle(TEvYdbProxy::TEvEndTopicPartition::TPtr& ev) {
7171
LOG_D("Handle " << ev->Get()->ToString());
7272

7373
auto& result = ev->Get()->Result;
7474
Send(Worker, new TEvWorker::TEvDataEnd(result.PartitionId, std::move(result.AdjacentPartitionsIds), std::move(result.ChildPartitionsIds)));
7575
}
7676

77+
void Handle(TEvYdbProxy::TEvStartTopicReadingSession::TPtr& ev) {
78+
LOG_D("Handle " << ev->Get()->ToString());
79+
80+
ReadSessionId = ev->Get()->Result.ReadSessionId;
81+
}
82+
83+
void Handle(TEvWorker::TEvCommit::TPtr& ev) {
84+
LOG_D("Handle " << ev->Get()->ToString());
85+
86+
Y_ABORT_UNLESS(YdbProxy);
87+
Y_ABORT_UNLESS(ReadSessionId);
88+
89+
auto settings = NYdb::NTopic::TCommitOffsetSettings()
90+
.ReadSessionId(ReadSessionId);
91+
92+
const auto& topicName = Settings.GetBase().Topics_.at(0).Path_;
93+
const auto partitionId = Settings.GetBase().Topics_.at(0).PartitionIds_.at(0);
94+
const auto& consumerName = Settings.GetBase().ConsumerName_;
95+
96+
Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(topicName, partitionId, consumerName, ev->Get()->Offset, std::move(settings)));
97+
}
98+
99+
void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) {
100+
if (!ev->Get()->Result.IsSuccess()) {
101+
LOG_W("Handle " << ev->Get()->ToString());
102+
return Leave(TEvWorker::TEvGone::UNAVAILABLE);
103+
} else {
104+
LOG_D("Handle " << ev->Get()->ToString());
105+
}
106+
}
107+
77108
void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) {
78109
LOG_D("Handle " << ev->Get()->ToString());
79110

@@ -120,9 +151,12 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
120151
switch (ev->GetTypeRewrite()) {
121152
hFunc(TEvWorker::TEvHandshake, Handle);
122153
hFunc(TEvWorker::TEvPoll, Handle);
154+
hFunc(TEvWorker::TEvCommit, Handle);
123155
hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle);
124156
hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle);
125-
hFunc(TEvYdbProxy::TEvTopicEndPartition, Handle);
157+
hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle);
158+
hFunc(TEvYdbProxy::TEvStartTopicReadingSession, Handle);
159+
hFunc(TEvYdbProxy::TEvEndTopicPartition, Handle);
126160
hFunc(TEvYdbProxy::TEvTopicReaderGone, Handle);
127161
sFunc(TEvents::TEvPoison, PassAway);
128162
}
@@ -135,6 +169,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
135169

136170
TActorId Worker;
137171
TActorId ReadSession;
172+
TString ReadSessionId;
138173

139174
}; // TRemoteTopicReader
140175

ydb/core/tx/replication/service/worker.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ TString TEvWorker::TEvPoll::ToString() const {
4444
<< " }";
4545
}
4646

47+
TEvWorker::TEvCommit::TEvCommit(size_t offset)
48+
: Offset(offset)
49+
{
50+
}
51+
52+
TString TEvWorker::TEvCommit::ToString() const {
53+
return TStringBuilder() << ToStringHeader() << " {"
54+
<< " Offset: " << Offset
55+
<< " }";
56+
}
57+
4758
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records)
4859
: PartitionId(partitionId)
4960
, Source(source)
@@ -214,6 +225,20 @@ class TWorker: public TActorBootstrapped<TWorker> {
214225
}
215226
}
216227

228+
void Handle(TEvWorker::TEvCommit::TPtr& ev) {
229+
LOG_D("Handle " << ev->Get()->ToString());
230+
231+
if (ev->Sender != Writer) {
232+
LOG_W("Commit from unknown actor"
233+
<< ": sender# " << ev->Sender);
234+
return;
235+
}
236+
237+
if (Reader) {
238+
Send(ev->Forward(Reader));
239+
}
240+
}
241+
217242
void Handle(TEvWorker::TEvData::TPtr& ev) {
218243
LOG_D("Handle " << ev->Get()->ToString());
219244

@@ -335,6 +360,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
335360
switch (ev->GetTypeRewrite()) {
336361
hFunc(TEvWorker::TEvHandshake, Handle);
337362
hFunc(TEvWorker::TEvPoll, Handle);
363+
hFunc(TEvWorker::TEvCommit, Handle);
338364
hFunc(TEvWorker::TEvData, Handle);
339365
hFunc(TEvWorker::TEvDataEnd, Forward);
340366
hFunc(TEvWorker::TEvGone, Handle);

ydb/core/tx/replication/service/worker.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ struct TEvWorker {
2020
EvGone,
2121
EvStatus,
2222
EvDataEnd,
23+
EvCommit,
2324

2425
EvEnd,
2526
};
@@ -35,6 +36,13 @@ struct TEvWorker {
3536
TString ToString() const override;
3637
};
3738

39+
struct TEvCommit: public TEventLocal<TEvCommit, EvCommit> {
40+
size_t Offset;
41+
42+
explicit TEvCommit(size_t offset);
43+
TString ToString() const override;
44+
};
45+
3846
struct TEvData: public TEventLocal<TEvData, EvData> {
3947
struct TRecord {
4048
ui64 Offset;

ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class TPartitionEndWatcher {
1414
return;
1515
}
1616

17-
ActorOps->Send(client, new TEvYdbProxy::TEvTopicEndPartition(*EndPartitionSessionEvent));
17+
ActorOps->Send(client, new TEvYdbProxy::TEvEndTopicPartition(*EndPartitionSessionEvent));
1818
}
1919

2020
public:

ydb/core/tx/replication/ydb_proxy/partition_end_watcher_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(PartitionEndWatcher) {
9494

9595
UNIT_ASSERT_VALUES_EQUAL(actorOps.Events.size(), 1);
9696
UNIT_ASSERT_VALUES_EQUAL(actorOps.Events[0].first, client);
97-
auto* e = dynamic_cast<TEvYdbProxy::TEvTopicEndPartition*>(actorOps.Events[0].second);
97+
auto* e = dynamic_cast<TEvYdbProxy::TEvEndTopicPartition*>(actorOps.Events[0].second);
9898
UNIT_ASSERT(e);
9999
UNIT_ASSERT_VALUES_EQUAL(e->Result.AdjacentPartitionsIds, TVector<ui64>{1});
100100
UNIT_ASSERT_VALUES_EQUAL(e->Result.ChildPartitionsIds, TVector<ui64>{2});
@@ -118,7 +118,7 @@ Y_UNIT_TEST_SUITE(PartitionEndWatcher) {
118118

119119
UNIT_ASSERT_VALUES_EQUAL(actorOps.Events.size(), 1);
120120
UNIT_ASSERT_VALUES_EQUAL(actorOps.Events[0].first, client);
121-
auto* e = dynamic_cast<TEvYdbProxy::TEvTopicEndPartition*>(actorOps.Events[0].second);
121+
auto* e = dynamic_cast<TEvYdbProxy::TEvEndTopicPartition*>(actorOps.Events[0].second);
122122
UNIT_ASSERT(e);
123123
UNIT_ASSERT_VALUES_EQUAL(e->Result.AdjacentPartitionsIds, TVector<ui64>{1});
124124
UNIT_ASSERT_VALUES_EQUAL(e->Result.ChildPartitionsIds, TVector<ui64>{2});

ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ void TEvYdbProxy::TEndTopicPartitionResult::Out(IOutputStream& out) const {
4747
<< " }";
4848
}
4949

50+
void TEvYdbProxy::TStartTopicReadingSessionResult::Out(IOutputStream& out) const {
51+
out << "{"
52+
<< " ReadSessionId: " << ReadSessionId
53+
<< " }";
54+
}
55+
5056
template <typename TDerived>
5157
class TBaseProxyActor: public TActor<TDerived> {
5258
class TRequest;
@@ -213,6 +219,7 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
213219
if (auto* x = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
214220
PartitionEndWatcher.Clear();
215221
x->Confirm();
222+
Send(ev->Get()->Sender, new TEvYdbProxy::TEvStartTopicReadingSession(*x), 0, ev->Get()->Cookie);
216223
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
217224
} else if (auto* x = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
218225
x->Confirm();

ydb/core/tx/replication/ydb_proxy/ydb_proxy.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ struct TEvYdbProxy {
5151
EvTopicReaderGone,
5252
EV_REQUEST_RESPONSE(ReadTopic),
5353
EV_REQUEST_RESPONSE(CommitOffset),
54-
EvTopicEndPartition,
54+
EvEndTopicPartition,
55+
EvStartTopicReadingSession,
5556

5657
EvEnd,
5758
};
@@ -246,7 +247,22 @@ struct TEvYdbProxy {
246247
TVector<ui64> ChildPartitionsIds;
247248
};
248249

249-
struct TEvTopicEndPartition: public TGenericResponse<TEvTopicEndPartition, EvTopicEndPartition, TEndTopicPartitionResult> {
250+
struct TEvEndTopicPartition: public TGenericResponse<TEvEndTopicPartition, EvEndTopicPartition, TEndTopicPartitionResult> {
251+
using TBase::TBase;
252+
};
253+
254+
struct TStartTopicReadingSessionResult {
255+
explicit TStartTopicReadingSessionResult(const NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent& event)
256+
: ReadSessionId(event.GetPartitionSession()->GetReadSessionId())
257+
{
258+
}
259+
260+
void Out(IOutputStream& out) const;
261+
262+
TString ReadSessionId;
263+
};
264+
265+
struct TEvStartTopicReadingSession: public TGenericResponse<TEvStartTopicReadingSession, EvStartTopicReadingSession, TStartTopicReadingSessionResult> {
250266
using TBase::TBase;
251267
};
252268

@@ -287,7 +303,7 @@ struct TEvYdbProxy {
287303
DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings);
288304
DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, TTopicReaderSettings);
289305
DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, TReadTopicResult, TReadTopicSettings);
290-
DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, TString, ui64, TString, ui64, NYdb::NTopic::TCommitOffsetSettings);
306+
DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, std::string, ui64, std::string, ui64, NYdb::NTopic::TCommitOffsetSettings);
291307

292308
#undef DEFINE_GENERIC_REQUEST_RESPONSE
293309
#undef DEFINE_GENERIC_RESPONSE

0 commit comments

Comments
 (0)