Skip to content

Commit b50302c

Browse files
authored
tools restore optimal defaults (#14398)
1 parent a5fc6d8 commit b50302c

File tree

8 files changed

+91
-44
lines changed

8 files changed

+91
-44
lines changed

ydb/public/lib/ydb_cli/commands/ydb_tools.cpp

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <util/generic/serialized_enum.h>
1515
#include <util/stream/format.h>
1616
#include <util/string/split.h>
17+
#include <util/system/info.h>
1718

1819
namespace NYdb::NConsoleClient {
1920

@@ -142,18 +143,16 @@ void TCommandRestore::Config(TConfig& config) {
142143

143144
NDump::TRestoreSettings defaults;
144145

145-
config.Opts->AddLongOption("restore-data", "Whether to restore data or not")
146+
config.Opts->AddLongOption("restore-data", "Whether to restore data or not.")
146147
.DefaultValue(defaults.RestoreData_).StoreResult(&RestoreData);
147148

148-
config.Opts->AddLongOption("restore-indexes", "Whether to restore indexes or not")
149+
config.Opts->AddLongOption("restore-indexes", "Whether to restore indexes or not.")
149150
.DefaultValue(defaults.RestoreIndexes_).StoreResult(&RestoreIndexes);
150151

151-
config.Opts->AddLongOption("restore-acl", "Whether to restore ACL and owner or not")
152+
config.Opts->AddLongOption("restore-acl", "Whether to restore ACL and owner or not.")
152153
.DefaultValue(defaults.RestoreACL_).StoreResult(&RestoreACL);
153154

154-
config.Opts->AddLongOption("skip-document-tables", TStringBuilder()
155-
<< "Document API tables cannot be restored for now. "
156-
<< "Specify this option to skip such tables")
155+
config.Opts->AddLongOption("skip-document-tables", "Skip Document API tables.")
157156
.DefaultValue(defaults.SkipDocumentTables_).StoreResult(&SkipDocumentTables)
158157
.Hidden(); // Deprecated
159158

@@ -162,37 +161,60 @@ void TCommandRestore::Config(TConfig& config) {
162161
" will be reverted in case of error.")
163162
.StoreTrue(&SavePartialResult);
164163

165-
config.Opts->AddLongOption("bandwidth", "Limit data upload bandwidth, bytes per second (example: 2MiB)")
166-
.DefaultValue("0").StoreResult(&UploadBandwidth);
167-
168-
config.Opts->AddLongOption("rps", "Limit requests per second (example: 100)")
169-
.DefaultValue(defaults.RateLimiterSettings_.GetRps()).StoreResult(&UploadRps);
164+
config.Opts->AddLongOption("bandwidth", "Limit data upload bandwidth, bytes per second (example: 2MiB).")
165+
.DefaultValue("no limit")
166+
.Handler1T<TString>([this](const TString& arg) {
167+
UploadBandwidth = (arg == "no limit") ? "0" : arg;
168+
})
169+
.Hidden();
170+
171+
config.Opts->AddLongOption("rps", "Limit requests per second (example: 100).")
172+
.DefaultValue("no limit")
173+
.Handler1T<TString>([this](const TString& arg) {
174+
UploadRps = (arg == "no limit") ? "0" : arg;
175+
});
170176

171-
config.Opts->AddLongOption("upload-batch-rows", "Limit upload batch size in rows (example: 1K)")
172-
.DefaultValue(defaults.RowsPerRequest_).StoreResult(&RowsPerRequest);
177+
config.Opts->AddLongOption("upload-batch-rows", "Limit upload batch size in rows (example: 1K)."
178+
" Not applicable in ImportData mode.")
179+
.DefaultValue("no limit")
180+
.Handler1T<TString>([this](const TString& arg) {
181+
RowsPerRequest = (arg == "no limit") ? "0" : arg;
182+
});
173183

174-
config.Opts->AddLongOption("upload-batch-bytes", "Limit upload batch size in bytes (example: 1MiB)")
175-
.DefaultValue(HumanReadableSize(defaults.BytesPerRequest_, SF_BYTES)).StoreResult(&BytesPerRequest);
184+
config.Opts->AddLongOption("upload-batch-rus", "Limit upload batch size in request units (example: 100)."
185+
" Not applicable in ImportData mode.")
186+
.DefaultValue("no limit")
187+
.Handler1T<TString>([this](const TString& arg) {
188+
RequestUnitsPerRequest = (arg == "no limit") ? "0" : arg;
189+
});
176190

177-
config.Opts->AddLongOption("upload-batch-rus", "Limit upload batch size in request units (example: 100)")
178-
.DefaultValue(defaults.RequestUnitsPerRequest_).StoreResult(&RequestUnitsPerRequest);
191+
config.Opts->AddLongOption("upload-batch-bytes", "Limit upload batch size in bytes (example: 1MiB).")
192+
.DefaultValue("auto")
193+
.Handler1T<TString>([this](const TString& arg) {
194+
BytesPerRequest = (arg == "auto") ? "0" : arg;
195+
});
179196

180-
config.Opts->AddLongOption("in-flight", "Limit in-flight request count")
181-
.DefaultValue(defaults.InFly_).StoreResult(&InFly);
197+
config.Opts->AddLongOption("in-flight", "Limit in-flight request count.")
198+
.DefaultValue("auto")
199+
.Handler1T<TString>([this](const TString& arg) {
200+
InFlight = (arg == "auto") ? 0 : FromString<ui32>(arg);
201+
});
182202

183203
config.Opts->AddLongOption("bulk-upsert", "Use BulkUpsert - a more efficient way to upload data with lower consistency level."
184-
" Global secondary indexes are not supported in this mode.")
204+
" Global secondary indexes are not supported in this mode.")
185205
.StoreTrue(&UseBulkUpsert)
186206
.Hidden(); // Deprecated. Using ImportData should be more effective.
187207

188208
config.Opts->AddLongOption("import-data", "Use ImportData - a more efficient way to upload data."
189-
" ImportData will throw an error if you try to upload data into an existing table that has"
190-
" secondary indexes or is in the process of building them. If you need to restore a table"
191-
" with secondary indexes, make sure it's not already present in the scheme.")
209+
" ImportData will throw an error if you try to upload data into an existing table that has"
210+
" secondary indexes or is in the process of building them. If you need to restore a table"
211+
" with secondary indexes, make sure it's not already present in the scheme.")
192212
.StoreTrue(&UseImportData);
193213

194214
config.Opts->MutuallyExclusive("bandwidth", "rps");
195215
config.Opts->MutuallyExclusive("import-data", "bulk-upsert");
216+
config.Opts->MutuallyExclusive("import-data", "upload-batch-rows");
217+
config.Opts->MutuallyExclusive("import-data", "upload-batch-rus");
196218
}
197219

198220
void TCommandRestore::ExtractParams(TConfig& config) {
@@ -208,17 +230,24 @@ int TCommandRestore::Run(TConfig& config) {
208230
.RestoreACL(RestoreACL)
209231
.SkipDocumentTables(SkipDocumentTables)
210232
.SavePartialResult(SavePartialResult)
211-
.RowsPerRequest(NYdb::SizeFromString(RowsPerRequest))
212-
.InFly(InFly);
233+
.RowsPerRequest(NYdb::SizeFromString(RowsPerRequest));
234+
235+
if (InFlight) {
236+
settings.MaxInFlight(InFlight);
237+
} else if (!UseImportData) {
238+
settings.MaxInFlight(NSystemInfo::CachedNumberOfCpus());
239+
}
213240

214241
if (auto bytesPerRequest = NYdb::SizeFromString(BytesPerRequest)) {
215-
if (bytesPerRequest > NDump::TRestoreSettings::MaxBytesPerRequest) {
242+
if (UseImportData && bytesPerRequest > NDump::TRestoreSettings::MaxImportDataBytesPerRequest) {
216243
throw TMisuseException()
217244
<< "--upload-batch-bytes cannot be larger than "
218-
<< HumanReadableSize(NDump::TRestoreSettings::MaxBytesPerRequest, SF_BYTES);
245+
<< HumanReadableSize(NDump::TRestoreSettings::MaxImportDataBytesPerRequest, SF_BYTES);
219246
}
220247

221248
settings.BytesPerRequest(bytesPerRequest);
249+
} else if (UseImportData) {
250+
settings.BytesPerRequest(NDump::TRestoreSettings::MaxImportDataBytesPerRequest);
222251
}
223252

224253
if (RequestUnitsPerRequest) {

ydb/public/lib/ydb_cli/commands/ydb_tools.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class TCommandRestore : public TToolsCommand, public TCommandWithPath {
6666
TString RowsPerRequest;
6767
TString BytesPerRequest;
6868
TString RequestUnitsPerRequest;
69-
ui32 InFly;
69+
ui32 InFlight;
7070
bool UseBulkUpsert = false;
7171
bool UseImportData = false;
7272
};

ydb/public/lib/ydb_cli/dump/dump.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class TDumpResult: public TStatus {
6464
struct TRateLimiterSettings {
6565
using TSelf = TRateLimiterSettings;
6666

67-
FLUENT_SETTING_DEFAULT(ui32, Rate, 30);
67+
FLUENT_SETTING_DEFAULT(ui32, Rate, Max<ui32>());
6868
FLUENT_SETTING_DEFAULT(TDuration, Interval, TDuration::Seconds(1));
6969
FLUENT_SETTING_DEFAULT(TDuration, ReactionTime, TDuration::MilliSeconds(50));
7070

@@ -91,7 +91,7 @@ struct TRestoreSettings: public TOperationRequestSettings<TRestoreSettings> {
9191
ImportData,
9292
};
9393

94-
static constexpr ui64 MaxBytesPerRequest = 16_MB;
94+
static constexpr ui64 MaxImportDataBytesPerRequest = 16_MB;
9595

9696
FLUENT_SETTING_DEFAULT(EMode, Mode, EMode::Yql);
9797
FLUENT_SETTING_DEFAULT(bool, DryRun, false);
@@ -105,9 +105,9 @@ struct TRestoreSettings: public TOperationRequestSettings<TRestoreSettings> {
105105
FLUENT_SETTING_DEFAULT(ui64, MemLimit, 32_MB);
106106
FLUENT_SETTING_DEFAULT(ui64, RowsPerRequest, 0);
107107
FLUENT_SETTING_DEFAULT(ui64, BytesPerRequest, 512_KB);
108-
FLUENT_SETTING_DEFAULT(ui64, RequestUnitsPerRequest, 30);
108+
FLUENT_SETTING_DEFAULT(ui64, RequestUnitsPerRequest, 0);
109109
FLUENT_SETTING_DEFAULT(ui64, FileBufferSize, 2_MB);
110-
FLUENT_SETTING_DEFAULT(ui32, InFly, 10);
110+
FLUENT_SETTING_DEFAULT(ui32, MaxInFlight, 0);
111111
FLUENT_SETTING_DEFAULT(TRateLimiterSettings, RateLimiterSettings, {});
112112

113113
}; // TRestoreSettings

ydb/public/lib/ydb_cli/dump/restore_compat.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class TDataWriter: public NPrivate::IDataWriter {
128128
Y_ENSURE(dataAccumulator);
129129

130130
TUploader::TOptions opts;
131-
opts.InFly = settings.InFly_;
131+
opts.InFly = settings.MaxInFlight_;
132132
opts.Rate = settings.RateLimiterSettings_.Rate_;
133133
opts.Interval = settings.RateLimiterSettings_.Interval_;
134134
opts.ReactionTime = settings.RateLimiterSettings_.ReactionTime_;

ydb/public/lib/ydb_cli/dump/restore_impl.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <util/generic/vector.h>
2323
#include <util/stream/file.h>
2424
#include <util/string/join.h>
25+
#include <util/system/info.h>
2526

2627
#include <google/protobuf/text_format.h>
2728

@@ -711,7 +712,8 @@ TRestoreResult TRestoreClient::RestoreTable(
711712
}
712713

713714
if (settings.RestoreData_) {
714-
auto result = RestoreData(fsPath, dbPath, settings, withoutIndexesDesc);
715+
const ui32 partitionCount = scheme.partition_at_keys().split_points().size() + 1;
716+
auto result = RestoreData(fsPath, dbPath, settings, withoutIndexesDesc, partitionCount);
715717
if (!result.IsSuccess()) {
716718
return result;
717719
}
@@ -797,6 +799,7 @@ THolder<NPrivate::IDataWriter> TRestoreClient::CreateDataWriter(
797799
const TString& dbPath,
798800
const TRestoreSettings& settings,
799801
const TTableDescription& desc,
802+
ui32 partitionCount,
800803
const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators)
801804
{
802805
THolder<NPrivate::IDataWriter> writer;
@@ -809,7 +812,7 @@ THolder<NPrivate::IDataWriter> TRestoreClient::CreateDataWriter(
809812
}
810813

811814
case TRestoreSettings::EMode::ImportData: {
812-
writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulators, settings, Log));
815+
writer.Reset(CreateImportDataWriter(dbPath, desc, partitionCount, ImportClient, TableClient, accumulators, settings, Log));
813816
break;
814817
}
815818
}
@@ -824,7 +827,11 @@ TRestoreResult TRestoreClient::CreateDataAccumulators(
824827
const TTableDescription& desc,
825828
ui32 dataFilesCount)
826829
{
827-
const ui32 accumulatorsCount = std::min(settings.InFly_, dataFilesCount);
830+
size_t accumulatorsCount = settings.MaxInFlight_;
831+
if (!accumulatorsCount) {
832+
accumulatorsCount = Min<size_t>(dataFilesCount, NSystemInfo::CachedNumberOfCpus());
833+
}
834+
828835
outAccumulators.resize(accumulatorsCount);
829836

830837
switch (settings.Mode_) {
@@ -855,7 +862,8 @@ TRestoreResult TRestoreClient::RestoreData(
855862
const TFsPath& fsPath,
856863
const TString& dbPath,
857864
const TRestoreSettings& settings,
858-
const TTableDescription& desc)
865+
const TTableDescription& desc,
866+
ui32 partitionCount)
859867
{
860868
// Threads can access memory owned by this vector through pointers during restore operation
861869
TVector<TFsPath> dataFiles = CollectDataFiles(fsPath);
@@ -870,7 +878,7 @@ TRestoreResult TRestoreClient::RestoreData(
870878
return res;
871879
}
872880

873-
THolder<NPrivate::IDataWriter> writer = CreateDataWriter(dbPath, settings, desc, accumulators);
881+
THolder<NPrivate::IDataWriter> writer = CreateDataWriter(dbPath, settings, desc, partitionCount, accumulators);
874882

875883
TVector<TFuture<TRestoreResult>> accumulatorResults(Reserve(accumulators.size()));
876884
TThreadPool accumulatorWorkers(TThreadPool::TParams().SetBlocking(true));
@@ -963,7 +971,7 @@ TRestoreResult TRestoreClient::RestoreData(
963971
}
964972

965973
if (dataFound) {
966-
writer = CreateDataWriter(dbPath, settings, desc, accumulators);
974+
writer = CreateDataWriter(dbPath, settings, desc, partitionCount, accumulators);
967975
for (auto& acc : accumulators) {
968976
while (acc->Ready(true)) {
969977
if (!writer->Push(acc->GetData(true))) {

ydb/public/lib/ydb_cli/dump/restore_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,14 @@ class TRestoreClient {
136136
TRestoreResult RestoreRateLimiter(const TFsPath& fsPath, const TString& coordinationNodePath, const TString& resourcePath);
137137

138138
TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc);
139-
TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc);
139+
TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc, ui32 partitionCount);
140140
TRestoreResult RestoreIndexes(const TString& dbPath, const NTable::TTableDescription& desc);
141141
TRestoreResult RestoreChangefeeds(const TFsPath& path, const TString& dbPath);
142142
TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
143143
TRestoreResult RestoreConsumers(const TString& topicPath, const std::vector<NTopic::TConsumer>& consumers);
144144

145145
THolder<NPrivate::IDataWriter> CreateDataWriter(const TString& dbPath, const TRestoreSettings& settings,
146-
const NTable::TTableDescription& desc, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators);
146+
const NTable::TTableDescription& desc, ui32 partitionCount, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators);
147147
TRestoreResult CreateDataAccumulators(TVector<THolder<NPrivate::IDataAccumulator>>& outAccumulators,
148148
const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc,
149149
ui32 dataFilesCount);

ydb/public/lib/ydb_cli/dump/restore_import_data.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <util/stream/str.h>
1919
#include <util/string/builder.h>
2020
#include <util/string/cast.h>
21+
#include <util/system/info.h>
2122
#include <util/system/mutex.h>
2223
#include <util/thread/pool.h>
2324

@@ -903,6 +904,7 @@ class TDataWriter: public NPrivate::IDataWriter {
903904
explicit TDataWriter(
904905
const TString& path,
905906
const TTableDescription& desc,
907+
ui32 partitionCount,
906908
const TRestoreSettings& settings,
907909
TImportClient& importClient,
908910
TTableClient& tableClient,
@@ -925,11 +927,17 @@ class TDataWriter: public NPrivate::IDataWriter {
925927
}
926928

927929
TasksQueue = MakeHolder<TThreadPool>(TThreadPool::TParams().SetBlocking(true).SetCatching(true));
928-
TasksQueue->Start(settings.InFly_, settings.InFly_ + 1);
930+
931+
size_t threadCount = settings.MaxInFlight_;
932+
if (!threadCount) {
933+
threadCount = Min<size_t>(partitionCount, NSystemInfo::CachedNumberOfCpus());
934+
}
935+
936+
TasksQueue->Start(threadCount, threadCount + 1);
929937
}
930938

931939
bool Push(NPrivate::TBatch&& data) override {
932-
if (data.size() > TRestoreSettings::MaxBytesPerRequest) {
940+
if (data.size() > TRestoreSettings::MaxImportDataBytesPerRequest) {
933941
LOG_E("Too much data: " << data.GetLocation());
934942
return false;
935943
}
@@ -989,13 +997,14 @@ NPrivate::IDataAccumulator* CreateImportDataAccumulator(
989997
NPrivate::IDataWriter* CreateImportDataWriter(
990998
const TString& path,
991999
const TTableDescription& desc,
1000+
ui32 partitionCount,
9921001
TImportClient& importClient,
9931002
TTableClient& tableClient,
9941003
const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators,
9951004
const TRestoreSettings& settings,
9961005
const std::shared_ptr<TLog>& log)
9971006
{
998-
return new TDataWriter(path, desc, settings, importClient, tableClient, accumulators, log);
1007+
return new TDataWriter(path, desc, partitionCount, settings, importClient, tableClient, accumulators, log);
9991008
}
10001009

10011010
} // NDump

ydb/public/lib/ydb_cli/dump/restore_import_data.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ NPrivate::IDataAccumulator* CreateImportDataAccumulator(
1616
NPrivate::IDataWriter* CreateImportDataWriter(
1717
const TString& path,
1818
const NTable::TTableDescription& desc,
19+
ui32 partitionCount,
1920
NImport::TImportClient& importClient,
2021
NTable::TTableClient& tableClient,
2122
const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators,

0 commit comments

Comments
 (0)