Skip to content

Commit 74671b6

Browse files
authored
Fix potential data race (#14505)
1 parent 2559a8f commit 74671b6

File tree

1 file changed

+38
-34
lines changed

1 file changed

+38
-34
lines changed

ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe
7272
}
7373
NThreading::WaitAll(sendings).Wait();
7474
const bool wereErrors = AtomicGet(ErrorsCount);
75-
Cout << "Fill table " << dataGen->GetName() << " "<< (wereErrors ? "Failed" : "OK" ) << " " << Bar->GetCurProgress() << " / " << Bar->GetCapacity() << " (" << (Now() - start) << ")" << Endl;
75+
with_lock(Lock) {
76+
Cout << "Fill table " << dataGen->GetName() << " "<< (wereErrors ? "Failed" : "OK" ) << " " << Bar->GetCurProgress() << " / " << Bar->GetCapacity() << " (" << (Now() - start) << ")" << Endl;
77+
}
7678
if (wereErrors) {
7779
break;
7880
}
@@ -232,48 +234,50 @@ class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter {
232234
TAdaptiveLock Lock;
233235
};
234236

235-
void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try {
237+
void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept {
236238
TAtomic counter = 0;
237-
for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
238-
TVector<TAsyncStatus> sendingResults;
239-
for (const auto& data: portions) {
240-
AtomicIncrement(counter);
241-
sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
242-
AtomicDecrement(counter);
243-
return result.GetValueSync();
244-
}));
245-
}
246-
NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) {
247-
bool success = true;
248-
for (size_t i = 0; i < portions.size(); ++i) {
249-
const auto& data = portions[i];
250-
const auto& res = sendingResults[i].GetValueSync();
251-
auto guard = Guard(Lock);
252-
if (!res.IsSuccess()) {
253-
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
254-
AtomicIncrement(ErrorsCount);
255-
success = false;
256-
} else if (data->GetSize()) {
257-
Bar->AddProgress(data->GetSize());
258-
}
239+
try {
240+
for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
241+
TVector<TAsyncStatus> sendingResults;
242+
for (const auto& data: portions) {
243+
AtomicIncrement(counter);
244+
sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
245+
AtomicDecrement(counter);
246+
return result.GetValueSync();
247+
}));
259248
}
260-
if (success) {
249+
NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) {
250+
bool success = true;
261251
for (size_t i = 0; i < portions.size(); ++i) {
262-
portions[i]->SetSendResult(sendingResults[i].GetValueSync());
252+
const auto& data = portions[i];
253+
const auto& res = sendingResults[i].GetValueSync();
254+
auto guard = Guard(Lock);
255+
if (!res.IsSuccess()) {
256+
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
257+
AtomicIncrement(ErrorsCount);
258+
success = false;
259+
} else if (data->GetSize()) {
260+
Bar->AddProgress(data->GetSize());
261+
}
263262
}
263+
if (success) {
264+
for (size_t i = 0; i < portions.size(); ++i) {
265+
portions[i]->SetSendResult(sendingResults[i].GetValueSync());
266+
}
267+
}
268+
});
269+
if (AtomicGet(ErrorsCount)) {
270+
break;
264271
}
265-
});
266-
if (AtomicGet(ErrorsCount)) {
267-
break;
268272
}
273+
} catch (...) {
274+
auto g = Guard(Lock);
275+
Cerr << "Fill table " << dataGen->GetName() << " failed: " << CurrentExceptionMessage() << ", backtrace: ";
276+
PrintBackTrace();
277+
AtomicSet(ErrorsCount, 1);
269278
}
270279
while(AtomicGet(counter) > 0) {
271280
Sleep(TDuration::MilliSeconds(100));
272281
}
273-
} catch (...) {
274-
auto g = Guard(Lock);
275-
Cerr << "Fill table " << dataGen->GetName() << " failed: " << CurrentExceptionMessage() << ", backtrace: ";
276-
PrintBackTrace();
277-
AtomicSet(ErrorsCount, 1);
278282
}
279283
}

0 commit comments

Comments
 (0)