Skip to content

Commit 83d7a3c

Browse files
authored
25-1: introduce VDisk syncronization broker (ydb-platform#15710) (ydb-platform#16861)
1 parent f288dcc commit 83d7a3c

File tree

14 files changed

+259
-13
lines changed

14 files changed

+259
-13
lines changed

ydb/core/base/blobstorage.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,9 @@ struct TEvBlobStorage {
757757
EvHugeQueryForbiddenChunks,
758758
EvHugeForbiddenChunks,
759759
EvContinueShred,
760+
EvQuerySyncToken,
761+
EvSyncToken,
762+
EvReleaseSyncToken,
760763

761764
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
762765
EvLogResult,

ydb/core/base/services/blobstorage_service_id.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ inline TActorId MakeBlobStorageReplBrokerID() {
7676
return TActorId(0, TStringBuf(x, 12));
7777
}
7878

79+
inline TActorId MakeBlobStorageSyncBrokerID() {
80+
char x[12] = {'b', 's', 's', 'y', 'n', 'c', 'b', 'r', 'o', 'k', 'e', 'r'};
81+
return TActorId(0, TStringBuf(x, 12));
82+
}
83+
7984
inline TActorId MakeBlobStorageNodeWardenID(ui32 node) {
8085
char x[12] = {'b','s','n','o','d','e','c','n','t','r','l','r'};
8186
return TActorId(node, TStringBuf(x, 12));

ydb/core/blobstorage/nodewarden/node_warden_impl.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.h>
1111
#include <ydb/core/blobstorage/pdisk/drivedata_serializer.h>
1212
#include <ydb/core/blobstorage/vdisk/repl/blobstorage_replbroker.h>
13+
#include <ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h>
1314
#include <ydb/library/pdisk_io/file_params.h>
1415
#include <ydb/core/mind/bscontroller/yaml_config_helpers.h>
1516
#include <ydb/core/base/nameservice.h>
@@ -44,6 +45,7 @@ TNodeWarden::TNodeWarden(const TIntrusivePtr<TNodeWardenConfig> &cfg)
4445
, ThrottlingMaxOccupancyPerMille(950, 1, 1000)
4546
, ThrottlingMinLogChunkCount(100, 1, 1000)
4647
, ThrottlingMaxLogChunkCount(130, 1, 1000)
48+
, MaxInProgressSyncCount(0, 0, 1000)
4749
, MaxCommonLogChunksHDD(200, 1, 1'000'000)
4850
, MaxCommonLogChunksSSD(200, 1, 1'000'000)
4951
, CostMetricsParametersByMedia({
@@ -370,6 +372,8 @@ void TNodeWarden::Bootstrap() {
370372
icb->RegisterSharedControl(ThrottlingMinLogChunkCount, "VDiskControls.ThrottlingMinLogChunkCount");
371373
icb->RegisterSharedControl(ThrottlingMaxLogChunkCount, "VDiskControls.ThrottlingMaxLogChunkCount");
372374

375+
icb->RegisterSharedControl(MaxInProgressSyncCount, "VDiskControls.MaxInProgressSyncCount");
376+
373377
icb->RegisterSharedControl(MaxCommonLogChunksHDD, "PDiskControls.MaxCommonLogChunksHDD");
374378
icb->RegisterSharedControl(MaxCommonLogChunksSSD, "PDiskControls.MaxCommonLogChunksSSD");
375379

@@ -424,6 +428,9 @@ void TNodeWarden::Bootstrap() {
424428
const ui64 maxBytes = replBrokerConfig.GetMaxInFlightReadBytes();
425429
actorSystem->RegisterLocalService(MakeBlobStorageReplBrokerID(), Register(CreateReplBrokerActor(maxBytes)));
426430

431+
actorSystem->RegisterLocalService(MakeBlobStorageSyncBrokerID(), Register(
432+
CreateSyncBrokerActor(MaxInProgressSyncCount)));
433+
427434
// determine if we are running in 'mock' mode
428435
EnableProxyMock = Cfg->BlobStorageConfig.GetServiceSet().GetEnableProxyMock();
429436

ydb/core/blobstorage/nodewarden/node_warden_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ namespace NKikimr::NStorage {
216216
TControlWrapper ThrottlingMinLogChunkCount;
217217
TControlWrapper ThrottlingMaxLogChunkCount;
218218

219+
TControlWrapper MaxInProgressSyncCount;
220+
219221
TControlWrapper MaxCommonLogChunksHDD;
220222
TControlWrapper MaxCommonLogChunksSSD;
221223

ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ namespace NKikimr::NStorage {
212212
vdiskConfig->ThrottlingMinLogChunkCount = ThrottlingMinLogChunkCount;
213213
vdiskConfig->ThrottlingMaxLogChunkCount = ThrottlingMaxLogChunkCount;
214214

215+
vdiskConfig->MaxInProgressSyncCount = MaxInProgressSyncCount;
216+
215217
vdiskConfig->CostMetricsParametersByMedia = CostMetricsParametersByMedia;
216218

217219
vdiskConfig->FeatureFlags = Cfg->FeatureFlags;

ydb/core/blobstorage/vdisk/common/vdisk_config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,9 @@ namespace NKikimr {
258258
TControlWrapper ThrottlingMinLogChunkCount;
259259
TControlWrapper ThrottlingMaxLogChunkCount;
260260

261+
///////////// SYNC SETTINGS //////////////////
262+
TControlWrapper MaxInProgressSyncCount;
263+
261264
///////////// FEATURE FLAGS ////////////////////////
262265
NKikimrConfig::TFeatureFlags FeatureFlags;
263266

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
#include "blobstorage_syncer_broker.h"
2+
3+
#include <ydb/core/blobstorage/vdisk/common/vdisk_log.h>
4+
#include <ydb/core/control/immediate_control_board_wrapper.h>
5+
6+
namespace NKikimr {
7+
8+
class TSyncBroker : public TActorBootstrapped<TSyncBroker> {
9+
TControlWrapper MaxInProgressSyncCount;
10+
11+
std::unordered_map<TActorId, std::unordered_set<TActorId>> Active;
12+
13+
struct TWaitSync {
14+
TActorId VDiskActorId;
15+
std::unordered_set<TActorId> ActorIds;
16+
};
17+
std::list<TWaitSync> WaitQueue; // TODO: better search
18+
19+
public:
20+
static constexpr auto ActorActivityType() {
21+
return NKikimrServices::TActivity::BS_SYNC_BROKER;
22+
}
23+
24+
STRICT_STFUNC(StateFunc,
25+
hFunc(TEvQuerySyncToken, Handle)
26+
hFunc(TEvReleaseSyncToken, Handle)
27+
hFunc(TEvents::TEvWakeup, Handle)
28+
)
29+
30+
explicit TSyncBroker(const TControlWrapper& maxInProgressSyncCount)
31+
: MaxInProgressSyncCount(maxInProgressSyncCount)
32+
{}
33+
34+
void Bootstrap() {
35+
Become(&TThis::StateFunc, TDuration::MilliSeconds(100), new TEvents::TEvWakeup);
36+
}
37+
38+
void Handle(TEvQuerySyncToken::TPtr& ev) {
39+
const auto vDiskActorId = ev->Get()->VDiskActorId;
40+
const auto actorId = ev->Sender;
41+
42+
if (const auto it = Active.find(vDiskActorId); it != Active.end()) {
43+
it->second.insert(actorId);
44+
Send(actorId, new TEvSyncToken);
45+
46+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
47+
"TEvQuerySyncToken, VDisk actor id: " << vDiskActorId <<
48+
", actor id: " << actorId <<
49+
", token sent, active: " << Active.size() <<
50+
", waiting: " << WaitQueue.size());
51+
return;
52+
}
53+
54+
const auto limit = (ui64)MaxInProgressSyncCount;
55+
56+
if (!limit || Active.size() < limit) {
57+
Active[vDiskActorId].insert(actorId);
58+
Send(actorId, new TEvSyncToken);
59+
60+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
61+
"TEvQuerySyncToken, VDisk actor id: " << vDiskActorId <<
62+
", actor id: " << actorId <<
63+
", token sent, active: " << Active.size() <<
64+
", waiting: " << WaitQueue.size());
65+
return;
66+
}
67+
68+
auto pred = [&vDiskActorId](const auto& item) {
69+
return item.VDiskActorId == vDiskActorId;
70+
};
71+
72+
if (const auto it = std::find_if(WaitQueue.begin(), WaitQueue.end(), pred); it != WaitQueue.end()) {
73+
it->ActorIds.insert(actorId);
74+
75+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
76+
"TEvQuerySyncToken, VDisk actor id: " << vDiskActorId <<
77+
", actor id: " << actorId <<
78+
", enqueued, active: " << Active.size() <<
79+
", waiting: " << WaitQueue.size());
80+
return;
81+
}
82+
83+
TWaitSync sync{vDiskActorId, {actorId}};
84+
WaitQueue.emplace_back(std::move(sync));
85+
86+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
87+
"TEvQuerySyncToken, VDisk actor id: " << vDiskActorId <<
88+
", actor id: " << actorId <<
89+
", enqueued, active: " << Active.size() <<
90+
", waiting: " << WaitQueue.size());
91+
}
92+
93+
void ProcessQueue() {
94+
const auto limit = (ui64)MaxInProgressSyncCount;
95+
96+
while (!WaitQueue.empty() && (!limit || Active.size() < limit)) {
97+
const auto& waitSync = WaitQueue.front();
98+
for (const auto& actorId : waitSync.ActorIds) {
99+
Send(actorId, new TEvSyncToken);
100+
}
101+
Active[waitSync.VDiskActorId] = std::move(waitSync.ActorIds);
102+
WaitQueue.pop_front();
103+
}
104+
}
105+
106+
void Handle(TEvReleaseSyncToken::TPtr& ev) {
107+
const auto vDiskActorId = ev->Get()->VDiskActorId;
108+
const auto actorId = ev->Sender;
109+
110+
if (const auto it = Active.find(vDiskActorId); it != Active.end()) {
111+
it->second.erase(actorId);
112+
if (it->second.empty()) {
113+
Active.erase(vDiskActorId);
114+
ProcessQueue();
115+
}
116+
117+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
118+
"TEvReleaseSyncToken, VDisk actor id: " << vDiskActorId <<
119+
", actor id: " << actorId <<
120+
", token released, active: " << Active.size() <<
121+
", waiting: " << WaitQueue.size());
122+
return;
123+
}
124+
125+
auto pred = [&vDiskActorId](const auto& item) {
126+
return item.VDiskActorId == vDiskActorId;
127+
};
128+
129+
if (const auto it = std::find_if(WaitQueue.begin(), WaitQueue.end(), pred); it != WaitQueue.end()) {
130+
it->ActorIds.erase(actorId);
131+
if (it->ActorIds.empty()) {
132+
WaitQueue.erase(it);
133+
}
134+
135+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
136+
"TEvReleaseSyncToken, VDisk actor id: " << vDiskActorId <<
137+
", actor id: " << actorId <<
138+
", removed from queue, active: " << Active.size() <<
139+
", waiting: " << WaitQueue.size());
140+
}
141+
}
142+
143+
void Handle(TEvents::TEvWakeup::TPtr&) {
144+
ProcessQueue();
145+
Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup);
146+
}
147+
};
148+
149+
IActor *CreateSyncBrokerActor(const TControlWrapper& maxInProgressSyncCount) {
150+
return new TSyncBroker(maxInProgressSyncCount);
151+
}
152+
153+
} // NKikimr
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#pragma once
2+
3+
#include "defs.h"
4+
#include <ydb/core/base/blobstorage.h>
5+
#include <ydb/core/blobstorage/base/blobstorage_vdiskid.h>
6+
7+
namespace NKikimr {
8+
9+
struct TEvQuerySyncToken : public TEventLocal<TEvQuerySyncToken, TEvBlobStorage::EvQuerySyncToken> {
10+
TActorId VDiskActorId;
11+
12+
explicit TEvQuerySyncToken(const TActorId& id)
13+
: VDiskActorId(id)
14+
{}
15+
};
16+
17+
struct TEvSyncToken : public TEventLocal<TEvSyncToken, TEvBlobStorage::EvSyncToken>
18+
{};
19+
20+
struct TEvReleaseSyncToken : public TEventLocal<TEvReleaseSyncToken, TEvBlobStorage::EvReleaseSyncToken> {
21+
TActorId VDiskActorId;
22+
23+
explicit TEvReleaseSyncToken(const TActorId& id)
24+
: VDiskActorId(id)
25+
{}
26+
};
27+
28+
class TControlWrapper;
29+
30+
extern IActor *CreateSyncBrokerActor(const TControlWrapper& maxInProgressSyncCount);
31+
32+
} // NKikimr

ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "blobstorage_syncer_recoverlostdata_proxy.h"
2+
#include "blobstorage_syncer_broker.h"
23
#include "blobstorage_syncer_committer.h"
34
#include "blobstorage_syncer_data.h"
45
#include "blobstorage_syncquorum.h"
@@ -45,6 +46,39 @@ namespace NKikimr {
4546
TVDiskID TargetVDiskId;
4647
TActorId TargetActorId;
4748

49+
void Bootstrap(const TActorContext &ctx) {
50+
LOG_DEBUG(ctx, BS_SYNCER,
51+
VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
52+
"TSyncerRLDFullSyncProxyActor(%s): START",
53+
TargetVDiskId.ToString().data()));
54+
ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvQuerySyncToken(SyncerCtx->VCtx->VDiskActorId),
55+
IEventHandle::FlagTrackDelivery);
56+
Become(&TThis::WaitForBrokerStateFunc);
57+
}
58+
59+
void Handle(TEvSyncToken::TPtr &ev, const TActorContext &ctx) {
60+
Y_UNUSED(ev);
61+
LOG_DEBUG(ctx, BS_SYNCER,
62+
VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
63+
"TSyncerRLDFullSyncProxyActor(%s): TEvSyncToken received",
64+
TargetVDiskId.ToString().data()));
65+
CreateAndRunTask(ctx);
66+
}
67+
68+
void Handle(TEvents::TEvUndelivered::TPtr &ev, const TActorContext &ctx) {
69+
// no sync broker service
70+
if (ev->Get()->SourceType == TEvQuerySyncToken::EventType) {
71+
CreateAndRunTask(ctx);
72+
}
73+
}
74+
75+
STRICT_STFUNC(WaitForBrokerStateFunc,
76+
HFunc(TEvents::TEvPoisonPill, HandlePoison)
77+
HFunc(TEvSyncToken, Handle)
78+
HFunc(TEvents::TEvUndelivered, Handle)
79+
HFunc(TEvVGenerationChange, Handle)
80+
)
81+
4882
void CreateAndRunTask(const TActorContext &ctx) {
4983
// create task
5084
auto task = std::make_unique<TSyncerJobTask>(TSyncerJobTask::EFullRecover, TargetVDiskId, TargetActorId,
@@ -56,17 +90,8 @@ namespace NKikimr {
5690
Become(&TThis::WaitForSyncStateFunc);
5791
}
5892

59-
void Bootstrap(const TActorContext &ctx) {
60-
LOG_DEBUG(ctx, BS_SYNCER,
61-
VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
62-
"TSyncerRLDFullSyncProxyActor(%s): START",
63-
TargetVDiskId.ToString().data()));
64-
65-
// run job
66-
CreateAndRunTask(ctx);
67-
}
68-
6993
void Handle(TEvSyncerJobDone::TPtr &ev, const TActorContext &ctx) {
94+
ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvReleaseSyncToken(SyncerCtx->VCtx->VDiskActorId));
7095
LOG_DEBUG(ctx, BS_SYNCER,
7196
VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
7297
"TSyncerRLDFullSyncProxyActor(%s): TEvSyncerJobDone; Task# %s",
@@ -103,8 +128,9 @@ namespace NKikimr {
103128

104129
void Handle(TEvSyncerRLDWakeup::TPtr &ev, const TActorContext &ctx) {
105130
Y_UNUSED(ev);
106-
// run job again
107-
CreateAndRunTask(ctx);
131+
ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvQuerySyncToken(SyncerCtx->VCtx->VDiskActorId),
132+
IEventHandle::FlagTrackDelivery);
133+
Become(&TThis::WaitForBrokerStateFunc);
108134
}
109135

110136
STRICT_STFUNC(WaitForTimeoutStateFunc,
@@ -147,6 +173,7 @@ namespace NKikimr {
147173
// HandlePoison
148174
////////////////////////////////////////////////////////////////////////
149175
void HandlePoison(TEvents::TEvPoisonPill::TPtr &ev, const TActorContext &ctx) {
176+
ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvReleaseSyncToken(SyncerCtx->VCtx->VDiskActorId));
150177
LOG_DEBUG(ctx, BS_SYNCER,
151178
VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
152179
"TSyncerRLDFullSyncProxyActor(%s): PoisonPill",

ydb/core/blobstorage/vdisk/syncer/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ PEERDIR(
1010
ydb/core/blobstorage/vdisk/anubis_osiris
1111
ydb/core/blobstorage/vdisk/common
1212
ydb/core/blobstorage/vdisk/hulldb/base
13+
ydb/core/control
1314
ydb/core/driver_lib/version
1415
)
1516

1617
SRCS(
1718
defs.h
19+
blobstorage_syncer_broker.cpp
20+
blobstorage_syncer_broker.h
1821
blobstorage_syncer_committer.cpp
1922
blobstorage_syncer_committer.h
2023
blobstorage_syncer.cpp

0 commit comments

Comments
 (0)