Skip to content

Commit a1936b3

Browse files
committed
Add error handling in scans (#18798)
1 parent 30de5db commit a1936b3

12 files changed

+50
-36
lines changed

ydb/core/tx/datashard/build_index/common_helper.h

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ class TBatchRowsUploader
127127
return true;
128128
}
129129

130+
void AddIssue(const std::exception& exc) {
131+
UploadStatus.Issues.AddIssue(NYql::TIssue(TStringBuilder()
132+
<< "Scan failed " << exc.what()));
133+
HasBuildError = true;
134+
}
135+
130136
template<typename TResponse>
131137
void Finish(TResponse& response, NTable::EAbort abort) {
132138
if (UploaderId) {
@@ -136,18 +142,20 @@ class TBatchRowsUploader
136142

137143
response.SetUploadRows(UploadRows);
138144
response.SetUploadBytes(UploadBytes);
139-
if (abort != NTable::EAbort::None) {
145+
if (HasBuildError) {
146+
response.SetStatus(NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR);
147+
} else if (abort != NTable::EAbort::None) {
140148
response.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED);
141149
} else if (UploadStatus.IsNone() || UploadStatus.IsSuccess()) {
142150
response.SetStatus(NKikimrIndexBuilder::EBuildStatus::DONE);
143151
if (UploadStatus.IsNone()) {
144152
UploadStatus.Issues.AddIssue(NYql::TIssue("Shard or requested range is empty"));
145153
}
146-
NYql::IssuesToMessage(UploadStatus.Issues, response.MutableIssues());
147154
} else {
148155
response.SetStatus(NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR);
149-
NYql::IssuesToMessage(UploadStatus.Issues, response.MutableIssues());
150156
}
157+
158+
NYql::IssuesToMessage(UploadStatus.Issues, response.MutableIssues());
151159
}
152160

153161
const TUploadStatus& GetUploadStatus() const {
@@ -193,10 +201,10 @@ class TBatchRowsUploader
193201
void StartUploadRowsInternal() {
194202
LOG_D("TBatchRowsUploader StartUploadRowsInternal " << Debug());
195203

196-
Y_ASSERT(Uploading);
197-
Y_ASSERT(!Uploading.Buffer.IsEmpty());
198-
Y_ASSERT(!UploaderId);
199-
Y_ASSERT(Owner);
204+
Y_ENSURE(Uploading);
205+
Y_ENSURE(!Uploading.Buffer.IsEmpty());
206+
Y_ENSURE(!UploaderId);
207+
Y_ENSURE(Owner);
200208
auto actor = NTxProxy::CreateUploadRowsInternal(
201209
Owner, Uploading.Table, Uploading.Types, Uploading.Buffer.GetRowsData(),
202210
NTxProxy::EUploadRowsMode::WriteToTableShadow,
@@ -217,6 +225,7 @@ class TBatchRowsUploader
217225
ui64 UploadRows = 0;
218226
ui64 UploadBytes = 0;
219227
ui32 RetryCount = 0;
228+
bool HasBuildError = false;
220229
};
221230

222231
inline void StartScan(TDataShard* dataShard, TAutoPtr<NTable::IScan>&& scan, ui64 id,

ydb/core/tx/datashard/build_index/kmeans_helper.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ std::shared_ptr<NTxProxy::TUploadTypes> MakeOutputTypes(const TUserTable& table,
151151
break;
152152
}
153153
default:
154-
Y_ASSERT(false);
154+
Y_ENSURE(false);
155155

156156
}
157157
return result;

ydb/core/tx/datashard/build_index/kmeans_helper.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ class TClusters {
351351
K = 1;
352352
Clusters.resize(K);
353353
}
354-
Y_ASSERT(Clusters.size() == K);
354+
Y_ENSURE(Clusters.size() == K);
355355
ClusterSizes.resize(K, 0);
356356
AggregatedClusters.resize(K);
357357
for (auto& aggregate : AggregatedClusters) {
@@ -363,7 +363,7 @@ class TClusters {
363363

364364
bool RecomputeClusters()
365365
{
366-
Y_ASSERT(K >= 1);
366+
Y_ENSURE(K >= 1);
367367
ui64 vectorCount = 0;
368368
ui64 reassignedCount = 0;
369369
for (size_t i = 0; auto& aggregate : AggregatedClusters) {
@@ -375,12 +375,12 @@ class TClusters {
375375

376376
if (aggregate.Size != 0) {
377377
this->Fill(Clusters[i], aggregate.Cluster.data(), aggregate.Size);
378-
Y_ASSERT(aggregate.Size == 0);
378+
Y_ENSURE(aggregate.Size == 0);
379379
}
380380
++i;
381381
}
382-
Y_ASSERT(vectorCount >= K);
383-
Y_ASSERT(reassignedCount <= vectorCount);
382+
Y_ENSURE(vectorCount >= K);
383+
Y_ENSURE(reassignedCount <= vectorCount);
384384
if (K == 1) {
385385
return true;
386386
}
@@ -410,7 +410,7 @@ class TClusters {
410410

411411
std::optional<ui32> FindCluster(TArrayRef<const TCell> row, NTable::TPos embeddingPos)
412412
{
413-
Y_ASSERT(embeddingPos < row.size());
413+
Y_ENSURE(embeddingPos < row.size());
414414
const auto embedding = row.at(embeddingPos).AsRef();
415415
if (!IsExpectedSize<TCoord>(embedding, Dimensions)) {
416416
return {};
@@ -452,7 +452,7 @@ class TClusters {
452452

453453
void Fill(TString& d, TSum* embedding, ui64& c)
454454
{
455-
Y_ASSERT(c > 0);
455+
Y_ENSURE(c > 0);
456456
const auto count = static_cast<TSum>(std::exchange(c, 0));
457457
auto data = GetData(d.MutRef().data());
458458
for (auto& coord : data) {

ydb/core/tx/datashard/build_index/local_kmeans.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ class TLocalKMeansScan final : public TLocalKMeansScanBase {
395395
return true;
396396
}
397397

398-
Y_ASSERT(false);
398+
Y_ENSURE(false);
399399
return true;
400400
}
401401

@@ -421,7 +421,7 @@ class TLocalKMeansScan final : public TLocalKMeansScanBase {
421421
FeedBuildToPosting(key, row);
422422
break;
423423
default:
424-
Y_ASSERT(false);
424+
Y_ENSURE(false);
425425
}
426426
}
427427

ydb/core/tx/datashard/build_index/prefix_kmeans.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ class TPrefixKMeansScan final : public TPrefixKMeansScanBase {
419419
return true;
420420
}
421421

422-
Y_ASSERT(false);
422+
Y_ENSURE(false);
423423
return true;
424424
}
425425

@@ -439,7 +439,7 @@ class TPrefixKMeansScan final : public TPrefixKMeansScanBase {
439439
FeedBuildToPosting(key, row);
440440
break;
441441
default:
442-
Y_ASSERT(false);
442+
Y_ENSURE(false);
443443
}
444444
}
445445

ydb/core/tx/datashard/build_index/reshuffle_kmeans.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ class TReshuffleKMeansScan final : public TReshuffleKMeansScanBase {
286286
FeedBuildToPosting(key, row);
287287
break;
288288
default:
289-
Y_ASSERT(false);
289+
Y_ENSURE(false);
290290
}
291291
}
292292

ydb/core/tx/datashard/build_index/sample_k.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
6060
TSampler Sampler;
6161

6262
IDriver* Driver = nullptr;
63+
NYql::TIssues Issues;
64+
bool HasBuildError = false;
6365

6466
TLead Lead;
6567

@@ -149,13 +151,17 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
149151
record.SetReadRows(ReadRows);
150152
record.SetReadBytes(ReadBytes);
151153

152-
if (abort != NTable::EAbort::None) {
154+
if (HasBuildError) {
155+
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR);
156+
} else if (abort != EAbort::None) {
153157
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED);
154158
} else {
155159
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::DONE);
156160
FillResponse();
157161
}
158162

163+
NYql::IssuesToMessage(Issues, record.MutableIssues());
164+
159165
if (Response->Record.GetStatus() == NKikimrIndexBuilder::DONE) {
160166
LOG_N("Done " << Debug() << " " << Response->Record.ShortDebugString());
161167
} else {

ydb/core/tx/datashard/build_index/secondary_index.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
127127

128128
TUploadMonStats Stats = TUploadMonStats("tablets", "build_index_upload");
129129
TUploadStatus UploadStatus;
130+
bool HasBuildError = false;
130131

131132
TBuildScanUpload(ui64 buildIndexId,
132133
const TString& target,
@@ -236,11 +237,10 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
236237
progress->Record.SetRequestSeqNoGeneration(SeqNo.Generation);
237238
progress->Record.SetRequestSeqNoRound(SeqNo.Round);
238239

239-
if (abort != EAbort::None) {
240+
if (HasBuildError) {
241+
progress->Record.SetStatus(NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR);
242+
} else if (abort != EAbort::None) {
240243
progress->Record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED);
241-
UploadStatus.Issues.AddIssue(NYql::TIssue("Aborted by scan host env"));
242-
243-
LOG_W(Debug());
244244
} else if (!UploadStatus.IsSuccess()) {
245245
progress->Record.SetStatus(NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR);
246246
} else {

ydb/core/tx/datashard/build_index/ut/ut_local_kmeans.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Y_UNIT_TEST_SUITE(TTxDataShardLocalKMeansScan) {
2424

2525
static void DoBadRequest(Tests::TServer::TPtr server, TActorId sender,
2626
std::function<void(NKikimrTxDataShard::TEvLocalKMeansRequest&)> setupRequest,
27-
TString expectedError, bool expectedErrorSubstring = false)
27+
TString expectedError, bool expectedErrorSubstring = false, NKikimrIndexBuilder::EBuildStatus expectedStatus = NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST)
2828
{
2929
auto id = sId.fetch_add(1, std::memory_order_relaxed);
3030
auto& runtime = *server->GetRuntime();
@@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(TTxDataShardLocalKMeansScan) {
7777

7878
TAutoPtr<IEventHandle> handle;
7979
auto reply = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvLocalKMeansResponse>(handle);
80-
UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetStatus(), NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST);
80+
UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetStatus(), expectedStatus);
8181

8282
NYql::TIssues issues;
8383
NYql::IssuesFromMessage(reply->Record.GetIssues(), issues);
@@ -351,10 +351,9 @@ Y_UNIT_TEST_SUITE(TTxDataShardLocalKMeansScan) {
351351
"(4, \"\x65\x65\3\", \"four\"),"
352352
"(5, \"\x75\x75\3\", \"five\");");
353353

354-
// TODO: https://github.com/ydb-platform/ydb/issues/18656
355-
// DoBadRequest(server, sender, [](NKikimrTxDataShard::TEvLocalKMeansRequest& request) {
356-
// request.SetChild(Max<ui64>() - 100);
357-
// }, TStringBuilder() << "");
354+
DoBadRequest(server, sender, [](NKikimrTxDataShard::TEvLocalKMeansRequest& request) {
355+
request.SetChild(Max<ui64>() - 100);
356+
}, "Condition violated: `(parent & PostingParentFlag) == 0'", true, NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR);
358357
}
359358

360359
Y_UNIT_TEST (MainToPosting) {

ydb/core/tx/datashard/build_index/ut/ut_prefix_kmeans.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardPrefixKMeansScan) {
5050

5151
VectorIndexSettings settings;
5252
settings.set_vector_dimension(2);
53-
settings.set_vector_type(VectorIndexSettings::VECTOR_TYPE_FLOAT);
53+
settings.set_vector_type(VectorIndexSettings::VECTOR_TYPE_UINT8);
5454
settings.set_metric(VectorIndexSettings::DISTANCE_COSINE);
5555
*rec.MutableSettings() = settings;
5656

0 commit comments

Comments
 (0)