Skip to content

Commit d44aeaa

Browse files
authored
Commit replication changes (#12680)
1 parent fd6bcad commit d44aeaa

File tree

8 files changed

+173
-18
lines changed

8 files changed

+173
-18
lines changed

ydb/core/protos/counters_replication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,5 @@ enum ETxTypes {
4747
TXTYPE_WORKER_ERROR = 14 [(TxTypeOpts) = {Name: "TxWorkerError"}];
4848
TXTYPE_ASSIGN_TX_ID = 15 [(TxTypeOpts) = {Name: "TxAssignTxId"}];
4949
TXTYPE_HEARTBEAT = 16 [(TxTypeOpts) = {Name: "TxHeartbeat"}];
50+
TXTYPE_COMMIT_CHANGES = 17 [(TxTypeOpts) = {Name: "TxCommitChanges"}];
5051
}

ydb/core/tx/replication/controller/controller.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ STFUNC(TController::StateWork) {
8282
HFunc(TEvService::TEvGetTxId, Handle);
8383
HFunc(TEvService::TEvHeartbeat, Handle);
8484
HFunc(TEvTxAllocatorClient::TEvAllocateResult, Handle);
85+
HFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle);
8586
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
8687
default:
8788
HandleDefaultEvents(ev, SelfId());

ydb/core/tx/replication/controller/controller_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
1818
#include <ydb/core/tx/replication/service/service.h>
1919
#include <ydb/core/tx/tx_allocator_client/actor_client.h>
20+
#include <ydb/core/tx/tx_proxy/proxy.h>
2021
#include <ydb/library/actors/core/interconnect.h>
2122
#include <ydb/library/yverify_stream/yverify_stream.h>
2223

@@ -99,6 +100,7 @@ class TController
99100
void Handle(TEvService::TEvGetTxId::TPtr& ev, const TActorContext& ctx);
100101
void Handle(TEvService::TEvHeartbeat::TPtr& ev, const TActorContext& ctx);
101102
void Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev, const TActorContext& ctx);
103+
void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx);
102104
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx);
103105

104106
void CreateSession(ui32 nodeId, const TActorContext& ctx);
@@ -135,6 +137,7 @@ class TController
135137
class TTxWorkerError;
136138
class TTxAssignTxId;
137139
class TTxHeartbeat;
140+
class TTxCommitChanges;
138141

139142
// tx runners
140143
void RunTxInitSchema(const TActorContext& ctx);
@@ -216,6 +219,7 @@ class TController
216219
TMap<TRowVersion, THashSet<TWorkerId>> WorkersByHeartbeat;
217220
THashMap<TWorkerId, TRowVersion> PendingHeartbeats;
218221
bool ProcessHeartbeatsInFlight = false;
222+
ui64 CommittingTxId = 0;
219223

220224
}; // TController
221225

ydb/core/tx/replication/controller/replication.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class TReplication::TImpl: public TLagProvider {
9494

9595
template <typename... Args>
9696
ui64 AddTarget(TReplication* self, ui64 id, ETargetKind kind, Args&&... args) {
97+
TargetTablePaths.clear();
9798
const auto res = Targets.emplace(id, CreateTarget(self, id, kind, std::forward<Args>(args)...));
9899
Y_VERIFY_S(res.second, "Duplicate target: " << id);
99100
TLagProvider::AddPendingLag(id);
@@ -114,6 +115,23 @@ class TReplication::TImpl: public TLagProvider {
114115

115116
void RemoveTarget(ui64 id) {
116117
Targets.erase(id);
118+
TargetTablePaths.clear();
119+
}
120+
121+
const TVector<TString>& GetTargetTablePaths() const {
122+
if (!TargetTablePaths) {
123+
TargetTablePaths.reserve(Targets.size());
124+
for (const auto& [_, target] : Targets) {
125+
switch (target->GetKind()) {
126+
case ETargetKind::Table:
127+
case ETargetKind::IndexTable:
128+
TargetTablePaths.push_back(target->GetDstPath());
129+
break;
130+
}
131+
}
132+
}
133+
134+
return TargetTablePaths;
117135
}
118136

119137
void Progress(const TActorContext& ctx) {
@@ -215,6 +233,7 @@ class TReplication::TImpl: public TLagProvider {
215233
ui64 NextTargetId = 1;
216234
THashMap<ui64, TTarget> Targets;
217235
THashSet<ui64> PendingAlterTargets;
236+
mutable TVector<TString> TargetTablePaths;
218237
TActorId SecretResolver;
219238
TActorId YdbProxy;
220239
TActorId TenantResolver;
@@ -264,6 +283,10 @@ void TReplication::RemoveTarget(ui64 id) {
264283
return Impl->RemoveTarget(id);
265284
}
266285

286+
const TVector<TString>& TReplication::GetTargetTablePaths() const {
287+
return Impl->GetTargetTablePaths();
288+
}
289+
267290
void TReplication::Progress(const TActorContext& ctx) {
268291
Impl->Progress(ctx);
269292
}

ydb/core/tx/replication/controller/replication.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
110110
const ITarget* FindTarget(ui64 id) const;
111111
ITarget* FindTarget(ui64 id);
112112
void RemoveTarget(ui64 id);
113+
const TVector<TString>& GetTargetTablePaths() const;
113114

114115
void Progress(const TActorContext& ctx);
115116
void Shutdown(const TActorContext& ctx);

ydb/core/tx/replication/controller/tx_heartbeat.cpp

Lines changed: 138 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,21 @@
22

33
namespace NKikimr::NReplication::NController {
44

5+
THolder<TEvTxUserProxy::TEvProposeTransaction> MakeCommitProposal(ui64 writeTxId, const TVector<TString>& tables) {
6+
auto ev = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
7+
auto& tx = *ev->Record.MutableTransaction()->MutableCommitWrites();
8+
9+
tx.SetWriteTxId(writeTxId);
10+
for (const auto& path : tables) {
11+
tx.AddTables()->SetTablePath(path);
12+
}
13+
14+
return ev;
15+
}
16+
517
class TController::TTxHeartbeat: public TTxBase {
18+
THolder<TEvTxUserProxy::TEvProposeTransaction> CommitProposal;
19+
620
public:
721
explicit TTxHeartbeat(TController* self)
822
: TTxBase("TxHeartbeat", self)
@@ -22,24 +36,37 @@ class TController::TTxHeartbeat: public TTxBase {
2236
return true;
2337
}
2438

25-
const auto prevMinVersion = !Self->WorkersByHeartbeat.empty()
26-
? std::make_optional<TRowVersion>(Self->WorkersByHeartbeat.begin()->first)
27-
: std::nullopt;
39+
auto replication = Self->GetSingle();
40+
if (!replication) {
41+
CLOG_E(ctx, "Ambiguous replication instance");
42+
return true;
43+
}
2844

2945
NIceDb::TNiceDb db(txc.DB);
3046

31-
for (const auto& [id, version] : Self->PendingHeartbeats) {
47+
while (!Self->PendingHeartbeats.empty()) {
48+
auto it = Self->PendingHeartbeats.begin();
49+
const auto& id = it->first;
50+
const auto& version = it->second;
51+
3252
if (!Self->Workers.contains(id)) {
53+
Self->PendingHeartbeats.erase(it);
3354
continue;
3455
}
3556

3657
auto& worker = Self->Workers[id];
3758
if (worker.HasHeartbeat()) {
38-
auto it = Self->WorkersByHeartbeat.find(worker.GetHeartbeat());
39-
if (it != Self->WorkersByHeartbeat.end()) {
40-
it->second.erase(id);
41-
if (it->second.empty()) {
42-
Self->WorkersByHeartbeat.erase(it);
59+
const auto& prevVersion = worker.GetHeartbeat();
60+
if (version < prevVersion) {
61+
Self->PendingHeartbeats.erase(it);
62+
continue;
63+
}
64+
65+
auto jt = Self->WorkersByHeartbeat.find(prevVersion);
66+
if (jt != Self->WorkersByHeartbeat.end()) {
67+
jt->second.erase(id);
68+
if (jt->second.empty()) {
69+
Self->WorkersByHeartbeat.erase(jt);
4370
}
4471
}
4572
}
@@ -52,31 +79,43 @@ class TController::TTxHeartbeat: public TTxBase {
5279
NIceDb::TUpdate<Schema::Workers::HeartbeatVersionStep>(version.Step),
5380
NIceDb::TUpdate<Schema::Workers::HeartbeatVersionTxId>(version.TxId)
5481
);
82+
83+
Self->PendingHeartbeats.erase(it);
5584
}
5685

5786
if (Self->Workers.size() != Self->WorkersWithHeartbeat.size()) {
58-
return true;
87+
return true; // no quorum
5988
}
6089

61-
Y_ABORT_UNLESS(!Self->WorkersByHeartbeat.empty());
62-
const auto newMinVersion = Self->WorkersByHeartbeat.begin()->first;
90+
if (Self->CommittingTxId) {
91+
return true; // another commit in progress
92+
}
6393

64-
if (newMinVersion <= prevMinVersion.value_or(TRowVersion::Min())) {
65-
return true;
94+
if (Self->AssignedTxIds.empty()) {
95+
return true; // nothing to commit
6696
}
6797

68-
CLOG_N(ctx, "Min version has been changed"
69-
<< ": prev# " << prevMinVersion.value_or(TRowVersion::Min())
70-
<< ", new# " << newMinVersion);
98+
Y_ABORT_UNLESS(!Self->WorkersByHeartbeat.empty());
99+
if (Self->WorkersByHeartbeat.begin()->first < Self->AssignedTxIds.begin()->first) {
100+
return true; // version has not been changed
101+
}
102+
103+
Self->CommittingTxId = Self->AssignedTxIds.begin()->second;
104+
CommitProposal = MakeCommitProposal(Self->CommittingTxId, replication->GetTargetTablePaths());
71105

72-
// TODO: run commit
73106
return true;
74107
}
75108

76109
void Complete(const TActorContext& ctx) override {
77110
CLOG_D(ctx, "Complete"
78111
<< ": pending# " << Self->PendingHeartbeats.size());
79112

113+
if (auto& ev = CommitProposal) {
114+
CLOG_N(ctx, "Propose commit"
115+
<< ": writeTxId# " << Self->CommittingTxId);
116+
ctx.Send(MakeTxProxyID(), std::move(ev), 0, Self->CommittingTxId);
117+
}
118+
80119
if (Self->PendingHeartbeats) {
81120
Self->Execute(new TTxHeartbeat(Self), ctx);
82121
} else {
@@ -93,4 +132,85 @@ void TController::RunTxHeartbeat(const TActorContext& ctx) {
93132
}
94133
}
95134

135+
class TController::TTxCommitChanges: public TTxBase {
136+
TEvTxUserProxy::TEvProposeTransactionStatus::TPtr Status;
137+
THolder<TEvTxUserProxy::TEvProposeTransaction> CommitProposal;
138+
139+
public:
140+
explicit TTxCommitChanges(TController* self, TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev)
141+
: TTxBase("TxCommitChanges", self)
142+
, Status(ev)
143+
{
144+
}
145+
146+
TTxType GetTxType() const override {
147+
return TXTYPE_COMMIT_CHANGES;
148+
}
149+
150+
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
151+
CLOG_D(ctx, "Execute"
152+
<< ": writeTxId# " << Self->CommittingTxId);
153+
154+
auto replication = Self->GetSingle();
155+
if (!replication) {
156+
CLOG_E(ctx, "Ambiguous replication instance");
157+
return true;
158+
}
159+
160+
auto it = Self->AssignedTxIds.begin();
161+
Y_ABORT_UNLESS(it != Self->AssignedTxIds.end());
162+
Y_ABORT_UNLESS(it->second == Self->CommittingTxId);
163+
164+
const auto& record = Status->Get()->Record;
165+
const auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(record.GetStatus());
166+
if (status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
167+
CommitProposal = MakeCommitProposal(Self->CommittingTxId, replication->GetTargetTablePaths());
168+
return true;
169+
}
170+
171+
NIceDb::TNiceDb db(txc.DB);
172+
173+
db.Table<Schema::TxIds>().Key(it->first.Step, it->first.TxId).Delete();
174+
it = Self->AssignedTxIds.erase(it);
175+
Self->CommittingTxId = 0;
176+
177+
if (it == Self->AssignedTxIds.end() || Self->WorkersByHeartbeat.empty()) {
178+
return true;
179+
}
180+
181+
if (Self->WorkersByHeartbeat.begin()->first < it->first) {
182+
return true;
183+
}
184+
185+
Self->CommittingTxId = Self->AssignedTxIds.begin()->second;
186+
CommitProposal = MakeCommitProposal(Self->CommittingTxId, replication->GetTargetTablePaths());
187+
188+
return true;
189+
}
190+
191+
void Complete(const TActorContext& ctx) override {
192+
CLOG_D(ctx, "Complete");
193+
194+
if (auto& ev = CommitProposal) {
195+
CLOG_N(ctx, "Propose commit"
196+
<< ": writeTxId# " << Self->CommittingTxId);
197+
ctx.Send(MakeTxProxyID(), std::move(ev), 0, Self->CommittingTxId);
198+
}
199+
}
200+
201+
}; // TTxCommitChanges
202+
203+
void TController::Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) {
204+
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
205+
206+
if (ev->Cookie != CommittingTxId) {
207+
CLOG_E(ctx, "Cookie mismatch"
208+
<< ": expected# " << CommittingTxId
209+
<< ", got# " << ev->Cookie);
210+
return;
211+
}
212+
213+
Execute(new TTxCommitChanges(this, ev), ctx);
214+
}
215+
96216
}

ydb/core/tx/replication/service/base_table_writer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ class TLocalTableWriter
465465

466466
if (records) {
467467
EnqueueRecords(std::move(records));
468+
} else if (PendingTxId.empty()) {
469+
Y_ABORT_UNLESS(PendingRecords.empty());
470+
Send(Worker, new TEvWorker::TEvPoll());
468471
}
469472
}
470473

ydb/core/tx/replication/service/table_writer_ut.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
181181
TRecord(order++, R"({"resolved":[10,0]})"),
182182
}));
183183
UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0));
184+
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
184185
}
185186

186187
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
@@ -206,6 +207,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
206207
env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData("TestSource", {
207208
TRecord(order++, R"({"resolved":[30,0]})"),
208209
}));
210+
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
209211
}
210212
}
211213

0 commit comments

Comments
 (0)