Skip to content

Commit e93caa6

Browse files
authored
Support an emergency one-to-one split/merge in SchemeShard (#17642)
1 parent 26da617 commit e93caa6

File tree

6 files changed

+313
-6
lines changed

6 files changed

+313
-6
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1477,6 +1477,7 @@ message TSplitMergeTablePartitions {
14771477
repeated TSplitBoundary SplitBoundary = 5; // Points of split (there will be N+1 parts)
14781478
optional uint64 SchemeshardId = 6; // Only needed if TableId is used instead of path
14791479
optional uint64 TableOwnerId = 7;
1480+
optional bool AllowOneToOneSplitMerge = 8; // Allow a special 1-to-1 split/merge for emergencies
14801481
}
14811482

14821483
message TUserAttribute {

ydb/core/tx/schemeshard/schemeshard__monitoring.cpp

Lines changed: 148 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55
#include <ydb/core/tx/datashard/range_ops.h>
66
#include <ydb/core/tx/tx_proxy/proxy.h>
77

8+
#include <library/cpp/protobuf/json/proto2json.h>
9+
810
#include <library/cpp/html/pcdata/pcdata.h>
911
#include <util/string/cast.h>
1012

1113
static ui64 TryParseTabletId(TStringBuf tabletIdParam) {
12-
if (tabletIdParam.StartsWith("0x"))
13-
return IntFromString<ui64, 16>(tabletIdParam.substr(2));
14-
else
15-
return FromStringWithDefault<ui64>(tabletIdParam, ui64(NKikimr::NSchemeShard::InvalidTabletId));
14+
ui64 tabletId = ui64(NKikimr::NSchemeShard::InvalidTabletId);
15+
if (tabletIdParam.StartsWith("0x")) {
16+
TryIntFromString<16>(tabletIdParam.substr(2), tabletId);
17+
} else {
18+
TryFromString(tabletIdParam, tabletId);
19+
}
20+
return tabletId;
1621
}
1722

1823
namespace NKikimr {
@@ -79,6 +84,7 @@ struct TCgi {
7984
static const TParam BuildIndexId;
8085
static const TParam UpdateCoordinatorsConfig;
8186
static const TParam UpdateCoordinatorsConfigDryRun;
87+
static const TParam Action;
8288

8389
struct TPages {
8490
static constexpr TStringBuf MainPage = "Main";
@@ -91,6 +97,10 @@ struct TCgi {
9197
static constexpr TStringBuf ShardInfoByShardIdx = "ShardInfoByShardIdx";
9298
static constexpr TStringBuf BuildIndexInfo = "BuildIndexInfo";
9399
};
100+
101+
struct TActions {
102+
static constexpr TStringBuf SplitOneToOne = "SplitOneToOne";
103+
};
94104
};
95105

96106
const TCgi::TParam TCgi::TabletID = TStringBuf("TabletID");
@@ -111,6 +121,7 @@ const TCgi::TParam TCgi::Page = TStringBuf("Page");
111121
const TCgi::TParam TCgi::BuildIndexId = TStringBuf("BuildIndexId");
112122
const TCgi::TParam TCgi::UpdateCoordinatorsConfig = TStringBuf("UpdateCoordinatorsConfig");
113123
const TCgi::TParam TCgi::UpdateCoordinatorsConfigDryRun = TStringBuf("UpdateCoordinatorsConfigDryRun");
124+
const TCgi::TParam TCgi::Action = TStringBuf("Action");
114125

115126

116127
class TUpdateCoordinatorsConfigActor : public TActorBootstrapped<TUpdateCoordinatorsConfigActor> {
@@ -231,6 +242,93 @@ class TUpdateCoordinatorsConfigActor : public TActorBootstrapped<TUpdateCoordina
231242
THashMap<ui64, const TItem*> InFlight;
232243
};
233244

245+
class TMonitoringShardSplitOneToOne : public TActorBootstrapped<TMonitoringShardSplitOneToOne> {
246+
public:
247+
TMonitoringShardSplitOneToOne(NMon::TEvRemoteHttpInfo::TPtr&& ev, ui64 schemeShardId, const TPathId& pathId, TTabletId shardId)
248+
: Ev(std::move(ev))
249+
, SchemeShardId(schemeShardId)
250+
, PathId(pathId)
251+
, ShardId(shardId)
252+
{}
253+
254+
void Bootstrap() {
255+
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
256+
Become(&TThis::StateWaitTxId);
257+
}
258+
259+
STFUNC(StateWaitTxId) {
260+
switch (ev->GetTypeRewrite()) {
261+
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
262+
}
263+
}
264+
265+
void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
266+
TxId = ev->Get()->TxId;
267+
268+
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(TxId, SchemeShardId);
269+
270+
auto& modifyScheme = *propose->Record.AddTransaction();
271+
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions);
272+
modifyScheme.SetInternal(true);
273+
274+
auto& info = *modifyScheme.MutableSplitMergeTablePartitions();
275+
info.SetTableOwnerId(PathId.OwnerId);
276+
info.SetTableLocalId(PathId.LocalPathId);
277+
info.AddSourceTabletId(ui64(ShardId));
278+
info.SetAllowOneToOneSplitMerge(true);
279+
280+
PipeCache = MakePipePerNodeCacheID(EPipePerNodeCache::Leader);
281+
Send(PipeCache, new TEvPipeCache::TEvForward(propose.Release(), SchemeShardId, /* subscribe */ true));
282+
Become(&TThis::StateWaitProposed);
283+
}
284+
285+
STFUNC(StateWaitProposed) {
286+
switch (ev->GetTypeRewrite()) {
287+
hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle);
288+
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
289+
}
290+
}
291+
292+
void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) {
293+
TString text;
294+
try {
295+
NProtobufJson::Proto2Json(ev->Get()->Record, text, {
296+
.EnumMode = NProtobufJson::TProto2JsonConfig::EnumName,
297+
.FieldNameMode = NProtobufJson::TProto2JsonConfig::FieldNameSnakeCaseDense,
298+
.MapAsObject = true,
299+
});
300+
} catch (const std::exception& e) {
301+
Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
302+
"HTTP/1.1 500 Internal Error\r\nConnection: Close\r\n\r\nUnexpected failure to serialize the response\r\n"));
303+
PassAway();
304+
}
305+
306+
Send(Ev->Sender, new NMon::TEvRemoteJsonInfoRes(text));
307+
PassAway();
308+
}
309+
310+
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) {
311+
Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
312+
TStringBuilder() << "HTTP/1.1 502 Bad Gateway\r\nConnection: Close\r\n\r\nSchemeShard tablet disconnected\r\n"));
313+
PassAway();
314+
}
315+
316+
void PassAway() override {
317+
if (PipeCache) {
318+
Send(PipeCache, new TEvPipeCache::TEvUnlink(0));
319+
}
320+
TActorBootstrapped::PassAway();
321+
}
322+
323+
private:
324+
NMon::TEvRemoteHttpInfo::TPtr Ev;
325+
ui64 SchemeShardId;
326+
TPathId PathId;
327+
TTabletId ShardId;
328+
ui64 TxId = 0;
329+
TActorId PipeCache;
330+
};
331+
234332
struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
235333
NMon::TEvRemoteHttpInfo::TPtr Ev;
236334
TStringStream Answer;
@@ -242,11 +340,18 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
242340
{
243341
}
244342

343+
TTxType GetTxType() const override { return TXTYPE_MONITORING; }
344+
245345
bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override {
246346
Y_UNUSED(txc);
247347

248348
const TCgiParameters& cgi = Ev->Get()->Cgi();
249349

350+
if (cgi.Has(TCgi::Action)) {
351+
HandleAction(cgi.Get(TCgi::Action), cgi, ctx);
352+
return true;
353+
}
354+
250355
const TString page = cgi.Has(TCgi::Page) ? cgi.Get(TCgi::Page) : ToString(TCgi::TPages::MainPage);
251356

252357
if (page == TCgi::TPages::AdminRequest) {
@@ -311,7 +416,7 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
311416
}
312417

313418
void Complete(const TActorContext &ctx) override {
314-
if (Answer) {
419+
if (Ev && Answer) {
315420
ctx.Send(Ev->Sender, new NMon::TEvRemoteHttpInfoRes(Answer.Str()));
316421
}
317422
}
@@ -1360,7 +1465,44 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
13601465
}
13611466
}
13621467

1363-
TTxType GetTxType() const override { return TXTYPE_MONITORING; }
1468+
private:
1469+
void SendBadRequest(const TString& details, const TActorContext& ctx) {
1470+
ctx.Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
1471+
TStringBuilder() << "HTTP/1.1 400 Bad Request\r\nConnection: Close\r\n\r\n" << details << "\r\n"));
1472+
}
1473+
1474+
private:
1475+
void HandleAction(const TString& action, const TCgiParameters& cgi, const TActorContext& ctx) {
1476+
if (Ev->Get()->Method != HTTP_METHOD_POST) {
1477+
SendBadRequest("Action requires a POST method", ctx);
1478+
return;
1479+
}
1480+
1481+
if (action == TCgi::TActions::SplitOneToOne) {
1482+
TTabletId tabletId = TTabletId(TryParseTabletId(cgi.Get(TCgi::ShardID)));
1483+
TShardIdx shardIdx = Self->GetShardIdx(tabletId);
1484+
if (!shardIdx) {
1485+
SendBadRequest("Cannot find the specified shard", ctx);
1486+
return;
1487+
}
1488+
auto* info = Self->ShardInfos.FindPtr(shardIdx);
1489+
if (!info) {
1490+
SendBadRequest("Cannot find the specified shard info", ctx);
1491+
return;
1492+
}
1493+
TPathId pathId = info->PathId;
1494+
auto* table = Self->Tables.FindPtr(pathId);
1495+
if (!table) {
1496+
SendBadRequest("Cannot find the specified shard's table", ctx);
1497+
return;
1498+
}
1499+
1500+
ctx.Register(new TMonitoringShardSplitOneToOne(std::move(Ev), Self->TabletID(), pathId, tabletId));
1501+
return;
1502+
}
1503+
1504+
SendBadRequest("Action not supported", ctx);
1505+
}
13641506
};
13651507

13661508
bool TSchemeShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) {

ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,75 @@ class TSplitMerge: public TSubOperation {
705705
return true;
706706
}
707707

708+
bool AllocateDstForOneToOne(
709+
const NKikimrSchemeOp::TSplitMergeTablePartitions& info,
710+
TTxId txId,
711+
const TPathId& pathId,
712+
const TVector<ui64>& srcPartitionIdxs,
713+
const TTableInfo::TCPtr tableInfo,
714+
TTxState& op,
715+
const TChannelsBindings& channels,
716+
TString& errStr,
717+
TOperationContext& context)
718+
{
719+
Y_UNUSED(errStr);
720+
721+
// 1 source shard is split/merged into 1 shard
722+
Y_ABORT_UNLESS(srcPartitionIdxs.size() == 1);
723+
Y_ABORT_UNLESS(info.SplitBoundarySize() == 0);
724+
725+
TString firstRangeBegin;
726+
if (srcPartitionIdxs[0] != 0) {
727+
// Take the end of previous shard
728+
firstRangeBegin = tableInfo->GetPartitions()[srcPartitionIdxs[0]-1].EndOfRange;
729+
} else {
730+
TVector<TCell> firstKey;
731+
ui32 keyColCount = 0;
732+
for (const auto& col : tableInfo->Columns) {
733+
if (col.second.IsKey()) {
734+
++keyColCount;
735+
}
736+
}
737+
// Or start from (NULL, NULL, .., NULL)
738+
firstKey.resize(keyColCount);
739+
firstRangeBegin = TSerializedCellVec::Serialize(firstKey);
740+
}
741+
742+
op.SplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>();
743+
// Fill src shards
744+
TString prevRangeEnd = firstRangeBegin;
745+
for (ui64 pi : srcPartitionIdxs) {
746+
auto* srcRange = op.SplitDescription->AddSourceRanges();
747+
auto shardIdx = tableInfo->GetPartitions()[pi].ShardIdx;
748+
srcRange->SetShardIdx(ui64(shardIdx.GetLocalId()));
749+
srcRange->SetTabletID(ui64(context.SS->ShardInfos[shardIdx].TabletID));
750+
srcRange->SetKeyRangeBegin(prevRangeEnd);
751+
TString rangeEnd = tableInfo->GetPartitions()[pi].EndOfRange;
752+
srcRange->SetKeyRangeEnd(rangeEnd);
753+
prevRangeEnd = rangeEnd;
754+
}
755+
756+
// Fill dst shard
757+
TShardInfo datashardInfo = TShardInfo::DataShardInfo(txId, pathId);
758+
datashardInfo.BindedChannels = channels;
759+
760+
auto idx = context.SS->RegisterShardInfo(datashardInfo);
761+
762+
ui64 lastSrcPartition = srcPartitionIdxs.back();
763+
TString lastRangeEnd = tableInfo->GetPartitions()[lastSrcPartition].EndOfRange;
764+
765+
TTxState::TShardOperation dstShardOp(idx, ETabletType::DataShard, TTxState::CreateParts);
766+
dstShardOp.RangeEnd = lastRangeEnd;
767+
op.Shards.push_back(dstShardOp);
768+
769+
auto* dstRange = op.SplitDescription->AddDestinationRanges();
770+
dstRange->SetShardIdx(ui64(idx.GetLocalId()));
771+
dstRange->SetKeyRangeBegin(firstRangeBegin);
772+
dstRange->SetKeyRangeEnd(lastRangeEnd);
773+
774+
return true;
775+
}
776+
708777
THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
709778
const TTabletId ssId = context.SS->SelfTabletId();
710779

@@ -942,6 +1011,12 @@ class TSplitMerge: public TSubOperation {
9421011
setResultError(NKikimrScheme::StatusInvalidParameter, errStr);
9431012
return result;
9441013
}
1014+
} else if (srcPartitionIdxs.size() == 1 && dstCount == 1 && info.GetAllowOneToOneSplitMerge()) {
1015+
// This is one-to-one split/merge
1016+
if (!AllocateDstForOneToOne(info, OperationId.GetTxId(), path.Base()->PathId, srcPartitionIdxs, tableInfo, op, channelsBinding, errStr, context)) {
1017+
setResultError(NKikimrScheme::StatusInvalidParameter, errStr);
1018+
return result;
1019+
}
9451020
} else {
9461021
setResultError(NKikimrScheme::StatusInvalidParameter, "Invalid request: only 1->N or N->1 are supported");
9471022
return result;

ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,53 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
8787

8888
}
8989

90+
Y_UNIT_TEST(ConcurrentSplitOneToOne) {
91+
TTestBasicRuntime runtime;
92+
93+
TTestEnvOptions opts;
94+
opts.EnableBackgroundCompaction(false);
95+
96+
TTestEnv env(runtime, opts);
97+
98+
ui64 txId = 100;
99+
100+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
101+
Name: "Table"
102+
Columns { Name: "Key" Type: "Utf8"}
103+
Columns { Name: "Value" Type: "Utf8"}
104+
KeyColumnNames: ["Key", "Value"]
105+
)");
106+
env.TestWaitNotification(runtime, txId);
107+
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
108+
{NLs::PartitionKeys({""})});
109+
110+
TVector<THolder<IEventHandle>> suppressed;
111+
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvHive::TEvCreateTablet::EventType);
112+
113+
TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"(
114+
SourceTabletId: 72075186233409546
115+
AllowOneToOneSplitMerge: true
116+
)");
117+
118+
RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
119+
120+
TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"(
121+
SourceTabletId: 72075186233409546
122+
AllowOneToOneSplitMerge: true
123+
)",
124+
{NKikimrScheme::StatusMultipleModifications});
125+
126+
WaitForSuppressed(runtime, suppressed, 2, prevObserver);
127+
128+
RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
129+
130+
env.TestWaitNotification(runtime, {txId-1, txId});
131+
env.TestWaitTabletDeletion(runtime, TTestTxConfig::FakeHiveTablets); //delete src
132+
133+
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
134+
{NLs::PartitionKeys({""})});
135+
}
136+
90137
Y_UNIT_TEST(Split10Shards) {
91138
TTestBasicRuntime runtime;
92139

0 commit comments

Comments
 (0)