Skip to content

Tpcc import with arenas (#17333) #20756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 107 additions & 32 deletions ydb/library/workload/tpcc/import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <util/random/shuffle.h>
#include <util/string/printf.h>

#include <google/protobuf/arena.h>

#include <contrib/libs/ftxui/include/ftxui/component/component.hpp>
#include <contrib/libs/ftxui/include/ftxui/component/component_base.hpp>
#include <contrib/libs/ftxui/include/ftxui/component/screen_interactive.hpp>
Expand Down Expand Up @@ -120,10 +122,15 @@ int GetRandomCount(int warehouseId, int customerId, int districtId) {

//-----------------------------------------------------------------------------

NTable::TBulkUpsertResult LoadItems(NTable::TTableClient& client, const TString& tableFullPath, TLog* Log) {
NTable::TBulkUpsertResult LoadItems(
NTable::TTableClient& client,
const TString& tableFullPath,
google::protobuf::Arena& arena,
TLog* Log)
{
LOG_T("Loading " << ITEM_COUNT << " items...");

auto valueBuilder = TValueBuilder();
auto valueBuilder = TValueBuilder(&arena);
valueBuilder.BeginList();

for (int i = 1; i <= ITEM_COUNT; ++i) {
Expand Down Expand Up @@ -151,7 +158,12 @@ NTable::TBulkUpsertResult LoadItems(NTable::TTableClient& client, const TString&
}

valueBuilder.EndList();
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();

NTable::TBulkUpsertSettings bulkSettings;
bulkSettings.Arena(&arena);
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
arena.Reset();
return result;
}

//-----------------------------------------------------------------------------
Expand All @@ -161,11 +173,12 @@ NTable::TBulkUpsertResult LoadWarehouses(
const TString& tableFullPath,
int startId,
int lastId,
google::protobuf::Arena& arena,
TLog* Log)
{
LOG_T("Loading warehouses " << startId << " to " << lastId);

auto valueBuilder = TValueBuilder();
auto valueBuilder = TValueBuilder(&arena);
valueBuilder.BeginList();

for (int warehouseId = startId; warehouseId <= lastId; ++warehouseId) {
Expand All @@ -184,7 +197,12 @@ NTable::TBulkUpsertResult LoadWarehouses(
}

valueBuilder.EndList();
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();

NTable::TBulkUpsertSettings bulkSettings;
bulkSettings.Arena(&arena);
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
arena.Reset();
return result;
}

//-----------------------------------------------------------------------------
Expand All @@ -195,11 +213,12 @@ NTable::TBulkUpsertResult LoadStock(
int wh,
int itemId,
int itemsToLoad,
google::protobuf::Arena& arena,
TLog* Log)
{
LOG_T("Loading stock for warehouse " << wh << " items " << itemId << " to " << (itemId + itemsToLoad - 1));

auto valueBuilder = TValueBuilder();
auto valueBuilder = TValueBuilder(&arena);
valueBuilder.BeginList();

for (int i = 0; i < itemsToLoad; ++i) {
Expand Down Expand Up @@ -242,7 +261,12 @@ NTable::TBulkUpsertResult LoadStock(
}

valueBuilder.EndList();
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();

NTable::TBulkUpsertSettings bulkSettings;
bulkSettings.Arena(&arena);
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
arena.Reset();
return result;
}

//-----------------------------------------------------------------------------
Expand All @@ -252,11 +276,12 @@ NTable::TBulkUpsertResult LoadDistricts(
const TString& tableFullPath,
int startId,
int lastId,
google::protobuf::Arena& arena,
TLog* Log)
{
LOG_T("Loading districts for warehouses " << startId << " to " << lastId);

auto valueBuilder = TValueBuilder();
auto valueBuilder = TValueBuilder(&arena);
valueBuilder.BeginList();

for (int warehouseId = startId; warehouseId <= lastId; ++warehouseId) {
Expand All @@ -279,7 +304,12 @@ NTable::TBulkUpsertResult LoadDistricts(
}

valueBuilder.EndList();
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();

NTable::TBulkUpsertSettings bulkSettings;
bulkSettings.Arena(&arena);
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
arena.Reset();
return result;
}

//-----------------------------------------------------------------------------
Expand All @@ -289,11 +319,12 @@ NTable::TBulkUpsertResult LoadCustomers(
const TString& tableFullPath,
int wh,
int district,
google::protobuf::Arena& arena,
TLog* Log)
{
LOG_T("Loading customers for warehouse " << wh << " district " << district);

auto valueBuilder = TValueBuilder();
auto valueBuilder = TValueBuilder(&arena);
valueBuilder.BeginList();

for (int customerId = 1; customerId <= CUSTOMERS_PER_DISTRICT; ++customerId) {
Expand Down Expand Up @@ -333,7 +364,12 @@ NTable::TBulkUpsertResult LoadCustomers(
}

valueBuilder.EndList();
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();

NTable::TBulkUpsertSettings bulkSettings;
bulkSettings.Arena(&arena);
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
arena.Reset();
return result;
}

//-----------------------------------------------------------------------------
Expand All @@ -343,11 +379,12 @@ NTable::TBulkUpsertResult LoadCustomerHistory(
const TString& tableFullPath,
int wh,
int district,
google::protobuf::Arena& arena,
TLog* Log)
{
LOG_T("Loading customer history for warehouse " << wh << " district " << district);

auto valueBuilder = TValueBuilder();
auto valueBuilder = TValueBuilder(&arena);
valueBuilder.BeginList();

i64 prevTs = 0;
Expand Down Expand Up @@ -375,7 +412,12 @@ NTable::TBulkUpsertResult LoadCustomerHistory(
}

valueBuilder.EndList();
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();

NTable::TBulkUpsertSettings bulkSettings;
bulkSettings.Arena(&arena);
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
arena.Reset();
return result;
}

//-----------------------------------------------------------------------------
Expand All @@ -385,11 +427,12 @@ NTable::TBulkUpsertResult LoadOpenOrders(
const TString& tableFullPath,
int wh,
int district,
google::protobuf::Arena& arena,
TLog* Log)
{
LOG_T("Loading open orders for warehouse " << wh << " district " << district);

auto valueBuilder = TValueBuilder();
auto valueBuilder = TValueBuilder(&arena);
valueBuilder.BeginList();

// TPC-C 4.3.3.1: o_c_id must be a permutation of [1, customersPerDistrict]
Expand Down Expand Up @@ -419,7 +462,12 @@ NTable::TBulkUpsertResult LoadOpenOrders(
}

valueBuilder.EndList();
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();

NTable::TBulkUpsertSettings bulkSettings;
bulkSettings.Arena(&arena);
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
arena.Reset();
return result;
}

//-----------------------------------------------------------------------------
Expand All @@ -429,11 +477,12 @@ NTable::TBulkUpsertResult LoadNewOrders(
const TString& tableFullPath,
int wh,
int district,
google::protobuf::Arena& arena,
TLog* Log)
{
LOG_T("Loading new orders for warehouse " << wh << " district " << district);

auto valueBuilder = TValueBuilder();
auto valueBuilder = TValueBuilder(&arena);
valueBuilder.BeginList();

static_assert(FIRST_UNPROCESSED_O_ID < CUSTOMERS_PER_DISTRICT,
Expand All @@ -450,7 +499,12 @@ NTable::TBulkUpsertResult LoadNewOrders(
}

valueBuilder.EndList();
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();

NTable::TBulkUpsertSettings bulkSettings;
bulkSettings.Arena(&arena);
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
arena.Reset();
return result;
}

//-----------------------------------------------------------------------------
Expand All @@ -460,11 +514,12 @@ NTable::TBulkUpsertResult LoadOrderLines(
const TString& tableFullPath,
int wh,
int district,
google::protobuf::Arena& arena,
TLog* Log)
{
LOG_T("Loading order lines for warehouse " << wh << " district " << district);

auto valueBuilder = TValueBuilder();
auto valueBuilder = TValueBuilder(&arena);
valueBuilder.BeginList();

for (int orderId = 1; orderId <= CUSTOMERS_PER_DISTRICT; ++orderId) {
Expand Down Expand Up @@ -503,7 +558,12 @@ NTable::TBulkUpsertResult LoadOrderLines(
}

valueBuilder.EndList();
return client.BulkUpsert(tableFullPath, valueBuilder.Build()).ExtractValueSync();

NTable::TBulkUpsertSettings bulkSettings;
bulkSettings.Arena(&arena);
auto result = client.BulkUpsert(tableFullPath, valueBuilder.Build(), bulkSettings).ExtractValueSync();
arena.Reset();
return result;
}

//-----------------------------------------------------------------------------
Expand Down Expand Up @@ -545,21 +605,27 @@ void ExecuteWithRetry(const TString& operationName, LoadFunc loadFunc, TLog* Log

//-----------------------------------------------------------------------------

void LoadSmallTables(TDriver& driver, const TString& path, int warehouseCount, TLog* Log) {
void LoadSmallTables(
TDriver& driver,
const TString& path,
int warehouseCount,
google::protobuf::Arena& arena,
TLog* Log)
{
NTable::TTableClient tableClient(driver);

TString itemTablePath = path + "/" + TABLE_ITEM;
TString warehouseTablePath = path + "/" + TABLE_WAREHOUSE;
TString districtTablePath = path + "/" + TABLE_DISTRICT;

ExecuteWithRetry("LoadItems", [&]() {
return LoadItems(tableClient, itemTablePath, Log);
return LoadItems(tableClient, itemTablePath, arena, Log);
}, Log);
ExecuteWithRetry("LoadWarehouses", [&]() {
return LoadWarehouses(tableClient, warehouseTablePath, 1, warehouseCount, Log);
return LoadWarehouses(tableClient, warehouseTablePath, 1, warehouseCount, arena, Log);
}, Log);
ExecuteWithRetry("LoadDistricts", [&]() {
return LoadDistricts(tableClient, districtTablePath, 1, warehouseCount, Log);
return LoadDistricts(tableClient, districtTablePath, 1, warehouseCount, arena, Log);
}, Log);
}

Expand Down Expand Up @@ -609,7 +675,15 @@ struct TLoadState {

//-----------------------------------------------------------------------------

void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLoadState& state, TLog* Log) {
void LoadRange(
TDriver& driver,
const TString& path,
int whStart,
int whEnd,
TLoadState& state,
google::protobuf::Arena& arena,
TLog* Log)
{
NTable::TTableClient tableClient(driver);

static_assert(ITEM_COUNT % 10 == 0, "ITEM_COUNT must be divisible by 10");
Expand Down Expand Up @@ -638,10 +712,10 @@ void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLo

for (int district = DISTRICT_LOW_ID; district <= DISTRICT_HIGH_ID; ++district) {
ExecuteWithRetry("LoadCustomers", [&]() {
return LoadCustomers(tableClient, customerTablePath, wh, district, Log);
return LoadCustomers(tableClient, customerTablePath, wh, district, arena, Log);
}, Log);
ExecuteWithRetry("LoadOpenOrders", [&]() {
return LoadOpenOrders(tableClient, oorderTablePath, wh, district, Log);
return LoadOpenOrders(tableClient, oorderTablePath, wh, district, arena, Log);
}, Log);
}
state.DataSizeLoaded.fetch_add(indexedPerWh, std::memory_order_relaxed);
Expand All @@ -661,19 +735,19 @@ void LoadRange(TDriver& driver, const TString& path, int whStart, int whEnd, TLo
int startItemId = batch * itemBatchSize + 1;
int itemsToLoad = itemBatchSize;
ExecuteWithRetry("LoadStock", [&]() {
return LoadStock(tableClient, stockTablePath, wh, startItemId, itemsToLoad, Log);
return LoadStock(tableClient, stockTablePath, wh, startItemId, itemsToLoad, arena, Log);
}, Log);
}

for (int district = DISTRICT_LOW_ID; district <= DISTRICT_HIGH_ID; ++district) {
ExecuteWithRetry("LoadOrderLines", [&]() {
return LoadOrderLines(tableClient, orderLineTablePath, wh, district, Log);
return LoadOrderLines(tableClient, orderLineTablePath, wh, district, arena, Log);
}, Log);
ExecuteWithRetry("LoadCustomerHistory", [&]() {
return LoadCustomerHistory(tableClient, historyTablePath, wh, district, Log);
return LoadCustomerHistory(tableClient, historyTablePath, wh, district, arena, Log);
}, Log);
ExecuteWithRetry("LoadNewOrders", [&]() {
return LoadNewOrders(tableClient, newOrderTablePath, wh, district, Log);
return LoadNewOrders(tableClient, newOrderTablePath, wh, district, arena, Log);
}, Log);
}

Expand Down Expand Up @@ -856,13 +930,14 @@ class TPCCLoader {
int whEnd = (threadId + 1) * Config.WarehouseCount / threadCount;

threads.emplace_back([threadId, &drivers, driverCount, this, whStart, whEnd]() {
google::protobuf::Arena arena;
auto& driver = drivers[threadId % driverCount];
if (threadId == 0) {
LoadSmallTables(driver, Config.Path, Config.WarehouseCount, Log.get());
LoadSmallTables(driver, Config.Path, Config.WarehouseCount, arena, Log.get());
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(threadId));
}
LoadRange(driver, Config.Path, whStart, whEnd, LoadState, Log.get());
LoadRange(driver, Config.Path, whStart, whEnd, LoadState, arena, Log.get());
});
}

Expand Down
6 changes: 4 additions & 2 deletions ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include <util/datetime/base.h>

#include <google/protobuf/arena.h>

#include <optional>
#include <memory>

Expand Down Expand Up @@ -528,7 +530,7 @@ class TValueBuilderBase : public TMoveOnly {
protected:
TValueBuilderBase(TValueBuilderBase&&);

TValueBuilderBase();
TValueBuilderBase(google::protobuf::Arena* arena = nullptr);

TValueBuilderBase(const TType& type);

Expand All @@ -544,7 +546,7 @@ class TValueBuilderBase : public TMoveOnly {

class TValueBuilder : public TValueBuilderBase<TValueBuilder> {
public:
TValueBuilder();
TValueBuilder(google::protobuf::Arena* arena = nullptr);

TValueBuilder(const TType& type);

Expand Down
Loading
Loading