Skip to content

Commit 8f553e4

Browse files
authored
Set position to state only if all data portions succesfully send (#11912)
1 parent b07b2ad commit 8f553e4

File tree

2 files changed

+31
-13
lines changed

2 files changed

+31
-13
lines changed

ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -161,22 +161,34 @@ class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter {
161161
void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try {
162162
TAtomic counter = 0;
163163
for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
164+
TVector<TAsyncStatus> sendingResults;
164165
for (const auto& data: portions) {
165166
AtomicIncrement(counter);
166-
Writer->WriteDataPortion(data).Apply(
167-
[data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
168-
const auto& res = result.GetValueSync();
169-
data->SetSendResult(res);
170-
auto guard = Guard(Lock);
171-
if (!res.IsSuccess()) {
172-
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
173-
AtomicIncrement(ErrorsCount);
174-
} else if (data->GetSize()) {
175-
Bar->AddProgress(data->GetSize());
176-
}
177-
AtomicDecrement(counter);
178-
});
167+
sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
168+
AtomicDecrement(counter);
169+
return result.GetValueSync();
170+
}));
179171
}
172+
NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) {
173+
bool success = true;
174+
for (size_t i = 0; i < portions.size(); ++i) {
175+
const auto& data = portions[i];
176+
const auto& res = sendingResults[i].GetValueSync();
177+
auto guard = Guard(Lock);
178+
if (!res.IsSuccess()) {
179+
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
180+
AtomicIncrement(ErrorsCount);
181+
success = false;
182+
} else if (data->GetSize()) {
183+
Bar->AddProgress(data->GetSize());
184+
}
185+
}
186+
if (success) {
187+
for (size_t i = 0; i < portions.size(); ++i) {
188+
portions[i]->SetSendResult(sendingResults[i].GetValueSync());
189+
}
190+
}
191+
});
180192
if (AtomicGet(ErrorsCount)) {
181193
break;
182194
}

ydb/tests/olap/load/test_tpch.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,18 @@ class TestTpch100(TpchSuiteBase):
7171

7272

7373
class TestTpch1000(TpchSuiteBase):
74+
tables_size: dict[str, int] = {
75+
'lineitem': 5999989709,
76+
}
7477
scale: int = 1000
7578
check_canonical: bool = False
7679
timeout = max(TpchSuiteBase.timeout, 3600.)
7780

7881

7982
class TestTpch10000(TpchSuiteBase):
83+
tables_size: dict[str, int] = {
84+
'lineitem': 59999994267,
85+
}
8086
scale: int = 10000
8187
iterations: int = 2
8288
check_canonical: bool = False

0 commit comments

Comments
 (0)