Skip to content

Commit 42da2fe

Browse files
authored
Tpcc import with arenas (#17333) (#20756)
1 parent dd776b0 commit 42da2fe

File tree

3 files changed

+144
-43
lines changed

3 files changed

+144
-43
lines changed

ydb/library/workload/tpcc/import.cpp

Lines changed: 107 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include <util/random/shuffle.h>
1919
#include <util/string/printf.h>
2020

21+
#include <google/protobuf/arena.h>
22+
2123
#include <contrib/libs/ftxui/include/ftxui/component/component.hpp>
2224
#include <contrib/libs/ftxui/include/ftxui/component/component_base.hpp>
2325
#include <contrib/libs/ftxui/include/ftxui/component/screen_interactive.hpp>
@@ -120,10 +122,15 @@ int GetRandomCount(int warehouseId, int customerId, int districtId) {
120122

121123
//-----------------------------------------------------------------------------
122124

123-
NTable::TBulkUpsertResult LoadItems(NTable::TTableClient& client, const TString& tableFullPath, TLog* Log) {
125+
NTable::TBulkUpsertResult LoadItems(
126+
NTable::TTableClient& client,
127+
const TString& tableFullPath,
128+
google::protobuf::Arena& arena,
129+
TLog* Log)
130+
{
124131
LOG_T("Loading " << ITEM_COUNT << " items...");
125132

126-
auto valueBuilder = TValueBuilder();
133+
auto valueBuilder = TValueBuilder(&arena);
127134
valueBuilder.BeginList();
128135

129136
for (int i = 1; i <= ITEM_COUNT; ++i) {
@@ -151,7 +158,12 @@ NTable::TBulkUpsertResult LoadItems(NTable::TTableClient& client, const TString&
151158
}
152159

153160
valueBuilder.EndList();
154-
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();
161+
162+
NTable::TBulkUpsertSettings bulkSettings;
163+
bulkSettings.Arena(&arena);
164+
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
165+
arena.Reset();
166+
return result;
155167
}
156168

157169
//-----------------------------------------------------------------------------
@@ -161,11 +173,12 @@ NTable::TBulkUpsertResult LoadWarehouses(
161173
const TString& tableFullPath,
162174
int startId,
163175
int lastId,
176+
google::protobuf::Arena& arena,
164177
TLog* Log)
165178
{
166179
LOG_T("Loading warehouses " << startId << " to " << lastId);
167180

168-
auto valueBuilder = TValueBuilder();
181+
auto valueBuilder = TValueBuilder(&arena);
169182
valueBuilder.BeginList();
170183

171184
for (int warehouseId = startId; warehouseId <= lastId; ++warehouseId) {
@@ -184,7 +197,12 @@ NTable::TBulkUpsertResult LoadWarehouses(
184197
}
185198

186199
valueBuilder.EndList();
187-
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();
200+
201+
NTable::TBulkUpsertSettings bulkSettings;
202+
bulkSettings.Arena(&arena);
203+
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
204+
arena.Reset();
205+
return result;
188206
}
189207

190208
//-----------------------------------------------------------------------------
@@ -195,11 +213,12 @@ NTable::TBulkUpsertResult LoadStock(
195213
int wh,
196214
int itemId,
197215
int itemsToLoad,
216+
google::protobuf::Arena& arena,
198217
TLog* Log)
199218
{
200219
LOG_T("Loading stock for warehouse " << wh << " items " << itemId << " to " << (itemId + itemsToLoad - 1));
201220

202-
auto valueBuilder = TValueBuilder();
221+
auto valueBuilder = TValueBuilder(&arena);
203222
valueBuilder.BeginList();
204223

205224
for (int i = 0; i < itemsToLoad; ++i) {
@@ -242,7 +261,12 @@ NTable::TBulkUpsertResult LoadStock(
242261
}
243262

244263
valueBuilder.EndList();
245-
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();
264+
265+
NTable::TBulkUpsertSettings bulkSettings;
266+
bulkSettings.Arena(&arena);
267+
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
268+
arena.Reset();
269+
return result;
246270
}
247271

248272
//-----------------------------------------------------------------------------
@@ -252,11 +276,12 @@ NTable::TBulkUpsertResult LoadDistricts(
252276
const TString& tableFullPath,
253277
int startId,
254278
int lastId,
279+
google::protobuf::Arena& arena,
255280
TLog* Log)
256281
{
257282
LOG_T("Loading districts for warehouses " << startId << " to " << lastId);
258283

259-
auto valueBuilder = TValueBuilder();
284+
auto valueBuilder = TValueBuilder(&arena);
260285
valueBuilder.BeginList();
261286

262287
for (int warehouseId = startId; warehouseId <= lastId; ++warehouseId) {
@@ -279,7 +304,12 @@ NTable::TBulkUpsertResult LoadDistricts(
279304
}
280305

281306
valueBuilder.EndList();
282-
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();
307+
308+
NTable::TBulkUpsertSettings bulkSettings;
309+
bulkSettings.Arena(&arena);
310+
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
311+
arena.Reset();
312+
return result;
283313
}
284314

285315
//-----------------------------------------------------------------------------
@@ -289,11 +319,12 @@ NTable::TBulkUpsertResult LoadCustomers(
289319
const TString& tableFullPath,
290320
int wh,
291321
int district,
322+
google::protobuf::Arena& arena,
292323
TLog* Log)
293324
{
294325
LOG_T("Loading customers for warehouse " << wh << " district " << district);
295326

296-
auto valueBuilder = TValueBuilder();
327+
auto valueBuilder = TValueBuilder(&arena);
297328
valueBuilder.BeginList();
298329

299330
for (int customerId = 1; customerId <= CUSTOMERS_PER_DISTRICT; ++customerId) {
@@ -333,7 +364,12 @@ NTable::TBulkUpsertResult LoadCustomers(
333364
}
334365

335366
valueBuilder.EndList();
336-
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();
367+
368+
NTable::TBulkUpsertSettings bulkSettings;
369+
bulkSettings.Arena(&arena);
370+
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
371+
arena.Reset();
372+
return result;
337373
}
338374

339375
//-----------------------------------------------------------------------------
@@ -343,11 +379,12 @@ NTable::TBulkUpsertResult LoadCustomerHistory(
343379
const TString& tableFullPath,
344380
int wh,
345381
int district,
382+
google::protobuf::Arena& arena,
346383
TLog* Log)
347384
{
348385
LOG_T("Loading customer history for warehouse " << wh << " district " << district);
349386

350-
auto valueBuilder = TValueBuilder();
387+
auto valueBuilder = TValueBuilder(&arena);
351388
valueBuilder.BeginList();
352389

353390
i64 prevTs = 0;
@@ -375,7 +412,12 @@ NTable::TBulkUpsertResult LoadCustomerHistory(
375412
}
376413

377414
valueBuilder.EndList();
378-
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();
415+
416+
NTable::TBulkUpsertSettings bulkSettings;
417+
bulkSettings.Arena(&arena);
418+
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
419+
arena.Reset();
420+
return result;
379421
}
380422

381423
//-----------------------------------------------------------------------------
@@ -385,11 +427,12 @@ NTable::TBulkUpsertResult LoadOpenOrders(
385427
const TString& tableFullPath,
386428
int wh,
387429
int district,
430+
google::protobuf::Arena& arena,
388431
TLog* Log)
389432
{
390433
LOG_T("Loading open orders for warehouse " << wh << " district " << district);
391434

392-
auto valueBuilder = TValueBuilder();
435+
auto valueBuilder = TValueBuilder(&arena);
393436
valueBuilder.BeginList();
394437

395438
// TPC-C 4.3.3.1: o_c_id must be a permutation of [1, customersPerDistrict]
@@ -419,7 +462,12 @@ NTable::TBulkUpsertResult LoadOpenOrders(
419462
}
420463

421464
valueBuilder.EndList();
422-
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();
465+
466+
NTable::TBulkUpsertSettings bulkSettings;
467+
bulkSettings.Arena(&arena);
468+
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
469+
arena.Reset();
470+
return result;
423471
}
424472

425473
//-----------------------------------------------------------------------------
@@ -429,11 +477,12 @@ NTable::TBulkUpsertResult LoadNewOrders(
429477
const TString& tableFullPath,
430478
int wh,
431479
int district,
480+
google::protobuf::Arena& arena,
432481
TLog* Log)
433482
{
434483
LOG_T("Loading new orders for warehouse " << wh << " district " << district);
435484

436-
auto valueBuilder = TValueBuilder();
485+
auto valueBuilder = TValueBuilder(&arena);
437486
valueBuilder.BeginList();
438487

439488
static_assert(FIRST_UNPROCESSED_O_ID < CUSTOMERS_PER_DISTRICT,
@@ -450,7 +499,12 @@ NTable::TBulkUpsertResult LoadNewOrders(
450499
}
451500

452501
valueBuilder.EndList();
453-
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();
502+
503+
NTable::TBulkUpsertSettings bulkSettings;
504+
bulkSettings.Arena(&arena);
505+
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
506+
arena.Reset();
507+
return result;
454508
}
455509

456510
//-----------------------------------------------------------------------------
@@ -460,11 +514,12 @@ NTable::TBulkUpsertResult LoadOrderLines(
460514
const TString& tableFullPath,
461515
int wh,
462516
int district,
517+
google::protobuf::Arena& arena,
463518
TLog* Log)
464519
{
465520
LOG_T("Loading order lines for warehouse " << wh << " district " << district);
466521

467-
auto valueBuilder = TValueBuilder();
522+
auto valueBuilder = TValueBuilder(&arena);
468523
valueBuilder.BeginList();
469524

470525
for (int orderId = 1; orderId <= CUSTOMERS_PER_DISTRICT; ++orderId) {
@@ -503,7 +558,12 @@ NTable::TBulkUpsertResult LoadOrderLines(
503558
}
504559

505560
valueBuilder.EndList();
506-
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();
561+
562+
NTable::TBulkUpsertSettings bulkSettings;
563+
bulkSettings.Arena(&arena);
564+
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
565+
arena.Reset();
566+
return result;
507567
}
508568

509569
//-----------------------------------------------------------------------------
@@ -545,21 +605,27 @@ void ExecuteWithRetry(const TString& operationName, LoadFunc loadFunc, TLog* Log
545605

546606
//-----------------------------------------------------------------------------
547607

548-
void LoadSmallTables(TDriver& driver, const TString& path, int warehouseCount, TLog* Log) {
608+
void LoadSmallTables(
609+
TDriver& driver,
610+
const TString& path,
611+
int warehouseCount,
612+
google::protobuf::Arena& arena,
613+
TLog* Log)
614+
{
549615
NTable::TTableClient tableClient(driver);
550616

551617
TString itemTablePath = path + "/" + TABLE_ITEM;
552618
TString warehouseTablePath = path + "/" + TABLE_WAREHOUSE;
553619
TString districtTablePath = path + "/" + TABLE_DISTRICT;
554620

555621
ExecuteWithRetry("LoadItems", [&]() {
556-
return LoadItems(tableClient, itemTablePath, Log);
622+
return LoadItems(tableClient, itemTablePath, arena, Log);
557623
}, Log);
558624
ExecuteWithRetry("LoadWarehouses", [&]() {
559-
return LoadWarehouses(tableClient, warehouseTablePath, 1, warehouseCount, Log);
625+
return LoadWarehouses(tableClient, warehouseTablePath, 1, warehouseCount, arena, Log);
560626
}, Log);
561627
ExecuteWithRetry("LoadDistricts", [&]() {
562-
return LoadDistricts(tableClient, districtTablePath, 1, warehouseCount, Log);
628+
return LoadDistricts(tableClient, districtTablePath, 1, warehouseCount, arena, Log);
563629
}, Log);
564630
}
565631

@@ -609,7 +675,15 @@ struct TLoadState {
609675

610676
//-----------------------------------------------------------------------------
611677

612-
void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLoadState& state, TLog* Log) {
678+
void LoadRange(
679+
TDriver& driver,
680+
const TString& path,
681+
int whStart,
682+
int whEnd,
683+
TLoadState& state,
684+
google::protobuf::Arena& arena,
685+
TLog* Log)
686+
{
613687
NTable::TTableClient tableClient(driver);
614688

615689
static_assert(ITEM_COUNT % 10 == 0, "ITEM_COUNT must be divisible by 10");
@@ -638,10 +712,10 @@ void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLo
638712

639713
for (int district = DISTRICT_LOW_ID; district <= DISTRICT_HIGH_ID; ++district) {
640714
ExecuteWithRetry("LoadCustomers", [&]() {
641-
return LoadCustomers(tableClient, customerTablePath, wh, district, Log);
715+
return LoadCustomers(tableClient, customerTablePath, wh, district, arena, Log);
642716
}, Log);
643717
ExecuteWithRetry("LoadOpenOrders", [&]() {
644-
return LoadOpenOrders(tableClient, oorderTablePath, wh, district, Log);
718+
return LoadOpenOrders(tableClient, oorderTablePath, wh, district, arena, Log);
645719
}, Log);
646720
}
647721
state.DataSizeLoaded.fetch_add(indexedPerWh, std::memory_order_relaxed);
@@ -661,19 +735,19 @@ void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLo
661735
int startItemId = batch * itemBatchSize + 1;
662736
int itemsToLoad = itemBatchSize;
663737
ExecuteWithRetry("LoadStock", [&]() {
664-
return LoadStock(tableClient, stockTablePath, wh, startItemId, itemsToLoad, Log);
738+
return LoadStock(tableClient, stockTablePath, wh, startItemId, itemsToLoad, arena, Log);
665739
}, Log);
666740
}
667741

668742
for (int district = DISTRICT_LOW_ID; district <= DISTRICT_HIGH_ID; ++district) {
669743
ExecuteWithRetry("LoadOrderLines", [&]() {
670-
return LoadOrderLines(tableClient, orderLineTablePath, wh, district, Log);
744+
return LoadOrderLines(tableClient, orderLineTablePath, wh, district, arena, Log);
671745
}, Log);
672746
ExecuteWithRetry("LoadCustomerHistory", [&]() {
673-
return LoadCustomerHistory(tableClient, historyTablePath, wh, district, Log);
747+
return LoadCustomerHistory(tableClient, historyTablePath, wh, district, arena, Log);
674748
}, Log);
675749
ExecuteWithRetry("LoadNewOrders", [&]() {
676-
return LoadNewOrders(tableClient, newOrderTablePath, wh, district, Log);
750+
return LoadNewOrders(tableClient, newOrderTablePath, wh, district, arena, Log);
677751
}, Log);
678752
}
679753

@@ -856,13 +930,14 @@ class TPCCLoader {
856930
int whEnd = (threadId + 1) * Config.WarehouseCount / threadCount;
857931

858932
threads.emplace_back([threadId, &drivers, driverCount, this, whStart, whEnd]() {
933+
google::protobuf::Arena arena;
859934
auto& driver = drivers[threadId % driverCount];
860935
if (threadId == 0) {
861-
LoadSmallTables(driver, Config.Path, Config.WarehouseCount, Log.get());
936+
LoadSmallTables(driver, Config.Path, Config.WarehouseCount, arena, Log.get());
862937
} else {
863938
std::this_thread::sleep_for(std::chrono::milliseconds(threadId));
864939
}
865-
LoadRange(driver, Config.Path, whStart, whEnd, LoadState, Log.get());
940+
LoadRange(driver, Config.Path, whStart, whEnd, LoadState, arena, Log.get());
866941
});
867942
}
868943

ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#include <util/datetime/base.h>
66

7+
#include <google/protobuf/arena.h>
8+
79
#include <optional>
810
#include <memory>
911

@@ -528,7 +530,7 @@ class TValueBuilderBase : public TMoveOnly {
528530
protected:
529531
TValueBuilderBase(TValueBuilderBase&&);
530532

531-
TValueBuilderBase();
533+
TValueBuilderBase(google::protobuf::Arena* arena = nullptr);
532534

533535
TValueBuilderBase(const TType& type);
534536

@@ -544,7 +546,7 @@ class TValueBuilderBase : public TMoveOnly {
544546

545547
class TValueBuilder : public TValueBuilderBase<TValueBuilder> {
546548
public:
547-
TValueBuilder();
549+
TValueBuilder(google::protobuf::Arena* arena = nullptr);
548550

549551
TValueBuilder(const TType& type);
550552

0 commit comments

Comments
 (0)