Skip to content

Commit be5166f

Browse files
authored
Add new data schema in NodeBroker (#16474)
1 parent 7aa367a commit be5166f

14 files changed

+2790
-214
lines changed

ydb/core/mind/node_broker.cpp

Lines changed: 589 additions & 86 deletions
Large diffs are not rendered by default.

ydb/core/mind/node_broker.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ struct TEpochInfo {
7575
}
7676
};
7777

78+
struct TApproximateEpochStartInfo {
79+
ui64 Id = 0;
80+
ui64 Version = 0;
81+
82+
TString ToString() const
83+
{
84+
return TStringBuilder() << "#" << Id << "." << Version;
85+
}
86+
};
87+
7888
struct TEvNodeBroker {
7989
enum EEv {
8090
// requests

ydb/core/mind/node_broker__extend_lease.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,15 @@ class TNodeBroker::TTxExtendLease : public TTransactionBase<TNodeBroker> {
5858
return Error(TStatus::WRONG_REQUEST, "Node ID is banned", ctx);
5959

6060
auto &node = it->second;
61-
if (!node.IsFixed()) {
62-
Self->Dirty.DbUpdateNodeLease(node, txc);
61+
if (node.Expire < Self->Dirty.Epoch.NextEnd) {
6362
Self->Dirty.ExtendLease(node);
64-
Response->Record.SetExpire(Self->Dirty.Epoch.NextEnd.GetValue());
63+
Self->Dirty.DbAddNode(node, txc);
64+
Self->Dirty.UpdateEpochVersion();
65+
Self->Dirty.DbUpdateEpochVersion(Self->Dirty.Epoch.Version, txc);
6566
Update = true;
66-
} else {
67-
Response->Record.SetExpire(TInstant::Max().GetValue());
6867
}
6968

69+
Response->Record.SetExpire(node.Expire.GetValue());
7070
Response->Record.MutableStatus()->SetCode(TStatus::OK);
7171
Self->Dirty.Epoch.Serialize(*Response->Record.MutableEpoch());
7272

@@ -82,8 +82,12 @@ class TNodeBroker::TTxExtendLease : public TTransactionBase<TNodeBroker> {
8282
"TTxExtendLease reply with: " << Response->ToString());
8383
ctx.Send(Event->Sender, Response.Release());
8484

85-
if (Update)
86-
Self->Committed.ExtendLease(Self->Committed.Nodes.at(Event->Get()->Record.GetNodeId()));
85+
if (Update) {
86+
auto& node = Self->Committed.Nodes.at(Event->Get()->Record.GetNodeId());
87+
Self->Committed.ExtendLease(node);
88+
Self->Committed.UpdateEpochVersion();
89+
Self->AddNodeToEpochCache(node);
90+
}
8791
}
8892

8993
private:

ydb/core/mind/node_broker__graceful_shutdown.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ class TNodeBroker::TTxGracefulShutdown : public TTransactionBase<TNodeBroker> {
3030

3131
if (it != Self->Dirty.Nodes.end()) {
3232
auto& node = it->second;
33-
Self->Dirty.DbReleaseSlotIndex(node, txc);
3433
Self->Dirty.ReleaseSlotIndex(node);
34+
Self->Dirty.DbAddNode(node, txc);
3535

3636
Response->Record.MutableStatus()->SetCode(TStatus::OK);
3737

ydb/core/mind/node_broker__load_state.cpp

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#include "node_broker_impl.h"
2-
#include "node_broker__scheme.h"
32

4-
#include <ydb/core/base/appdata.h>
53
#include <ydb/core/protos/counters_node_broker.pb.h>
64

75
namespace NKikimr {
@@ -20,34 +18,18 @@ class TNodeBroker::TTxLoadState : public TTransactionBase<TNodeBroker> {
2018
{
2119
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxLoadState Execute");
2220

23-
if (!Self->Dirty.DbLoadState(txc, ctx))
24-
return false;
25-
26-
// Move epoch if required.
27-
auto now = ctx.Now();
28-
while (now > Self->Dirty.Epoch.End) {
29-
TStateDiff diff;
30-
Self->Dirty.ComputeNextEpochDiff(diff);
31-
Self->Dirty.DbApplyStateDiff(diff, txc);
32-
Self->Dirty.ApplyStateDiff(diff);
33-
}
34-
35-
return true;
21+
DbChanges = Self->Dirty.DbLoadState(txc, ctx);
22+
return DbChanges.Ready;
3623
}
3724

3825
void Complete(const TActorContext &ctx) override
3926
{
4027
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxLoadState Complete");
41-
42-
Self->Committed = Self->Dirty;
43-
Self->Become(&TNodeBroker::StateWork);
44-
Self->SubscribeForConfigUpdates(ctx);
45-
Self->ScheduleEpochUpdate(ctx);
46-
Self->PrepareEpochCache();
47-
Self->SignalTabletActive(ctx);
28+
Self->Execute(Self->CreateTxMigrateState(std::move(DbChanges)));
4829
}
4930

5031
private:
32+
TDbChanges DbChanges;
5133
};
5234

5335
ITransaction *TNodeBroker::CreateTxLoadState()
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
#include "node_broker_impl.h"
2+
3+
#include <ydb/core/protos/counters_node_broker.pb.h>
4+
5+
namespace NKikimr::NNodeBroker {
6+
7+
constexpr size_t MAX_NODES_BATCH_SIZE = 1000;
8+
9+
class TNodeBroker::TTxMigrateState : public TTransactionBase<TNodeBroker> {
10+
public:
11+
TTxMigrateState(TNodeBroker *self, TDbChanges&& dbChanges)
12+
: TBase(self)
13+
, DbChanges(std::move(dbChanges))
14+
{
15+
}
16+
17+
TTxType GetTxType() const override { return TXTYPE_MIGRATE_STATE; }
18+
19+
void FinalizeMigration(TTransactionContext &txc, const TActorContext &ctx)
20+
{
21+
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxMigrateState FinalizeMigration");
22+
23+
if (DbChanges.UpdateEpoch) {
24+
Self->Dirty.DbUpdateEpoch(Self->Dirty.Epoch, txc);
25+
}
26+
27+
if (DbChanges.UpdateApproxEpochStart) {
28+
Self->Dirty.DbUpdateApproxEpochStart(Self->Dirty.ApproxEpochStart, txc);
29+
}
30+
31+
if (DbChanges.UpdateMainNodesTable) {
32+
Self->Dirty.DbUpdateMainNodesTable(txc);
33+
}
34+
35+
// Move epoch if required.
36+
auto now = ctx.Now();
37+
while (now > Self->Dirty.Epoch.End) {
38+
TStateDiff diff;
39+
Self->Dirty.ComputeNextEpochDiff(diff);
40+
Self->Dirty.ApplyStateDiff(diff);
41+
Self->Dirty.DbApplyStateDiff(diff, txc);
42+
}
43+
44+
Finalized = true;
45+
}
46+
47+
void ProcessMigrationBatch(TTransactionContext &txc, const TActorContext &ctx)
48+
{
49+
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, TStringBuilder()
50+
<< "TTxMigrateState ProcessMigrationBatch"
51+
<< " UpdateNodes left " << DbChanges.UpdateNodes.size()
52+
<< ", NewVersionUpdateNodes left " << DbChanges.NewVersionUpdateNodes.size());
53+
54+
size_t nodesBatchSize = 0;
55+
while (nodesBatchSize < MAX_NODES_BATCH_SIZE && !DbChanges.UpdateNodes.empty()) {
56+
Self->Dirty.DbUpdateNode(DbChanges.UpdateNodes.back(), txc);
57+
DbChanges.UpdateNodes.pop_back();
58+
++nodesBatchSize;
59+
}
60+
61+
const bool newVersionInBatch = nodesBatchSize < MAX_NODES_BATCH_SIZE
62+
&& !DbChanges.NewVersionUpdateNodes.empty()
63+
&& DbChanges.UpdateEpoch;
64+
65+
if (newVersionInBatch) {
66+
Self->Dirty.DbUpdateEpoch(Self->Dirty.Epoch, txc);
67+
DbChanges.UpdateEpoch = false;
68+
// Changing version may affect uncommitted approximate epoch start
69+
if (DbChanges.UpdateApproxEpochStart) {
70+
Self->Dirty.DbUpdateApproxEpochStart(Self->Dirty.ApproxEpochStart, txc);
71+
DbChanges.UpdateApproxEpochStart = false;
72+
}
73+
}
74+
75+
while (nodesBatchSize < MAX_NODES_BATCH_SIZE && !DbChanges.NewVersionUpdateNodes.empty()) {
76+
Self->Dirty.DbUpdateNode(DbChanges.NewVersionUpdateNodes.back(), txc);
77+
DbChanges.NewVersionUpdateNodes.pop_back();
78+
++nodesBatchSize;
79+
}
80+
}
81+
82+
bool Execute(TTransactionContext &txc, const TActorContext &ctx) override
83+
{
84+
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxMigrateState Execute");
85+
86+
ProcessMigrationBatch(txc, ctx);
87+
if (!DbChanges.HasNodeUpdates()) {
88+
FinalizeMigration(txc, ctx);
89+
}
90+
return true;
91+
}
92+
93+
void Complete(const TActorContext &ctx) override
94+
{
95+
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxMigrateState Complete");
96+
97+
if (Finalized) {
98+
Self->Committed = Self->Dirty;
99+
Self->Become(&TNodeBroker::StateWork);
100+
Self->SubscribeForConfigUpdates(ctx);
101+
Self->ScheduleEpochUpdate(ctx);
102+
Self->PrepareEpochCache();
103+
Self->SignalTabletActive(ctx);
104+
} else {
105+
Self->Execute(Self->CreateTxMigrateState(std::move(DbChanges)));
106+
}
107+
}
108+
109+
private:
110+
TDbChanges DbChanges;
111+
bool Finalized = false;
112+
};
113+
114+
ITransaction *TNodeBroker::CreateTxMigrateState(TDbChanges&& dbChanges)
115+
{
116+
return new TTxMigrateState(this, std::move(dbChanges));
117+
}
118+
119+
} // namespace NKikimr::NNodeBroker

0 commit comments

Comments
 (0)