Skip to content

Commit 04a9e0e

Browse files
committed
Add scan try-catch
1 parent e0df3b7 commit 04a9e0e

File tree

6 files changed

+178
-115
lines changed

6 files changed

+178
-115
lines changed

ydb/core/tx/datashard/build_index/common_helper.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@ class TBatchRowsUploader
128128
}
129129

130130
void AddIssue(const std::exception& exc) {
131+
HasBuildError = true;
131132
UploadStatus.Issues.AddIssue(NYql::TIssue(TStringBuilder()
132133
<< "Scan failed " << exc.what()));
133-
HasBuildError = true;
134134
}
135135

136136
template<typename TResponse>

ydb/core/tx/datashard/build_index/local_kmeans.cpp

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -275,65 +275,80 @@ class TLocalKMeansScan final : public TLocalKMeansScanBase {
275275

276276
EScan Seek(TLead& lead, ui64 seq) noexcept final
277277
{
278-
LOG_D("Seek " << seq << " " << Debug());
278+
try {
279+
LOG_D("Seek " << seq << " " << Debug());
279280

280-
if (IsExhausted) {
281-
return Uploader.CanFinish()
282-
? EScan::Final
283-
: EScan::Sleep;
284-
}
281+
if (IsExhausted) {
282+
return Uploader.CanFinish()
283+
? EScan::Final
284+
: EScan::Sleep;
285+
}
285286

286-
lead = Lead;
287+
lead = Lead;
287288

288-
return EScan::Feed;
289+
return EScan::Feed;
290+
} catch (const std::exception& exc) {
291+
Uploader.AddIssue(exc);
292+
return EScan::Final;
293+
}
289294
}
290295

291296
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final
292297
{
293-
LOG_T("Feed " << Debug());
298+
try {
299+
LOG_T("Feed " << Debug());
294300

295-
++ReadRows;
296-
ReadBytes += CountBytes(key, row);
301+
++ReadRows;
302+
ReadBytes += CountBytes(key, row);
297303

298-
if (PrefixColumns && Prefix && !TCellVectorsEquals{}(Prefix.GetCells(), key.subspan(0, PrefixColumns))) {
299-
if (!FinishPrefix()) {
300-
// scan current prefix rows with a new state again
301-
return EScan::Reset;
304+
if (PrefixColumns && Prefix && !TCellVectorsEquals{}(Prefix.GetCells(), key.subspan(0, PrefixColumns))) {
305+
if (!FinishPrefix()) {
306+
// scan current prefix rows with a new state again
307+
return EScan::Reset;
308+
}
302309
}
303-
}
304310

305-
if (PrefixColumns && !Prefix) {
306-
Prefix = TSerializedCellVec{key.subspan(0, PrefixColumns)};
307-
auto newParent = key.at(0).template AsValue<ui64>();
308-
Child += (newParent - Parent) * Clusters.GetK();
309-
Parent = newParent;
310-
}
311+
if (PrefixColumns && !Prefix) {
312+
Prefix = TSerializedCellVec{key.subspan(0, PrefixColumns)};
313+
auto newParent = key.at(0).template AsValue<ui64>();
314+
Child += (newParent - Parent) * Clusters.GetK();
315+
Parent = newParent;
316+
}
311317

312-
if (IsFirstPrefixFeed && IsPrefixRowsValid) {
313-
PrefixRows.AddRow(TSerializedCellVec{key}, TSerializedCellVec::Serialize(*row));
314-
if (HasReachedLimits(PrefixRows, ScanSettings)) {
315-
PrefixRows.Clear();
316-
IsPrefixRowsValid = false;
318+
if (IsFirstPrefixFeed && IsPrefixRowsValid) {
319+
PrefixRows.AddRow(TSerializedCellVec{key}, TSerializedCellVec::Serialize(*row));
320+
if (HasReachedLimits(PrefixRows, ScanSettings)) {
321+
PrefixRows.Clear();
322+
IsPrefixRowsValid = false;
323+
}
317324
}
318-
}
319325

320-
Feed(key, *row);
326+
Feed(key, *row);
321327

322-
return Uploader.ShouldWaitUpload() ? EScan::Sleep : EScan::Feed;
328+
return Uploader.ShouldWaitUpload() ? EScan::Sleep : EScan::Feed;
329+
} catch (const std::exception& exc) {
330+
Uploader.AddIssue(exc);
331+
return EScan::Final;
332+
}
323333
}
324334

325335
EScan Exhausted() noexcept final
326336
{
327-
LOG_D("Exhausted " << Debug());
337+
try {
338+
LOG_D("Exhausted " << Debug());
328339

329-
if (!FinishPrefix()) {
340+
if (!FinishPrefix()) {
341+
return EScan::Reset;
342+
}
343+
344+
IsExhausted = true;
345+
346+
// call Seek to wait uploads
330347
return EScan::Reset;
348+
} catch (const std::exception& exc) {
349+
Uploader.AddIssue(exc);
350+
return EScan::Final;
331351
}
332-
333-
IsExhausted = true;
334-
335-
// call Seek to wait uploads
336-
return EScan::Reset;
337352
}
338353

339354
private:

ydb/core/tx/datashard/build_index/prefix_kmeans.cpp

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -306,69 +306,84 @@ class TPrefixKMeansScan final : public TPrefixKMeansScanBase {
306306

307307
EScan Seek(TLead& lead, ui64 seq) noexcept final
308308
{
309-
LOG_D("Seek " << seq << " " << Debug());
309+
try {
310+
LOG_D("Seek " << seq << " " << Debug());
310311

311-
if (IsExhausted) {
312-
return Uploader.CanFinish()
313-
? EScan::Final
314-
: EScan::Sleep;
315-
}
312+
if (IsExhausted) {
313+
return Uploader.CanFinish()
314+
? EScan::Final
315+
: EScan::Sleep;
316+
}
316317

317-
lead = Lead;
318+
lead = Lead;
318319

319-
return EScan::Feed;
320+
return EScan::Feed;
321+
} catch (const std::exception& exc) {
322+
Uploader.AddIssue(exc);
323+
return EScan::Final;
324+
}
320325
}
321326

322327
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final
323328
{
324-
LOG_T("Feed " << Debug());
329+
try {
330+
LOG_T("Feed " << Debug());
325331

326-
++ReadRows;
327-
ReadBytes += CountBytes(key, row);
332+
++ReadRows;
333+
ReadBytes += CountBytes(key, row);
328334

329-
if (Prefix && !TCellVectorsEquals{}(Prefix.GetCells(), key.subspan(0, PrefixColumns))) {
330-
if (!FinishPrefix()) {
331-
// scan current prefix rows with a new state again
332-
return EScan::Reset;
335+
if (Prefix && !TCellVectorsEquals{}(Prefix.GetCells(), key.subspan(0, PrefixColumns))) {
336+
if (!FinishPrefix()) {
337+
// scan current prefix rows with a new state again
338+
return EScan::Reset;
339+
}
333340
}
334-
}
335341

336-
if (!Prefix) {
337-
Prefix = TSerializedCellVec{key.subspan(0, PrefixColumns)};
342+
if (!Prefix) {
343+
Prefix = TSerializedCellVec{key.subspan(0, PrefixColumns)};
338344

339-
// write {Prefix..., Parent} row to PrefixBuf:
340-
auto pk = TSerializedCellVec::Serialize(Prefix.GetCells());
341-
std::array<TCell, 1> cells;
342-
cells[0] = TCell::Make(Parent);
343-
TSerializedCellVec::UnsafeAppendCells(cells, pk);
344-
PrefixBuf->AddRow(TSerializedCellVec{std::move(pk)}, TSerializedCellVec::Serialize({}));
345-
}
345+
// write {Prefix..., Parent} row to PrefixBuf:
346+
auto pk = TSerializedCellVec::Serialize(Prefix.GetCells());
347+
std::array<TCell, 1> cells;
348+
cells[0] = TCell::Make(Parent);
349+
TSerializedCellVec::UnsafeAppendCells(cells, pk);
350+
PrefixBuf->AddRow(TSerializedCellVec{std::move(pk)}, TSerializedCellVec::Serialize({}));
351+
}
346352

347-
if (IsFirstPrefixFeed && IsPrefixRowsValid) {
348-
PrefixRows.AddRow(TSerializedCellVec{key}, TSerializedCellVec::Serialize(*row));
349-
if (HasReachedLimits(PrefixRows, ScanSettings)) {
350-
PrefixRows.Clear();
351-
IsPrefixRowsValid = false;
353+
if (IsFirstPrefixFeed && IsPrefixRowsValid) {
354+
PrefixRows.AddRow(TSerializedCellVec{key}, TSerializedCellVec::Serialize(*row));
355+
if (HasReachedLimits(PrefixRows, ScanSettings)) {
356+
PrefixRows.Clear();
357+
IsPrefixRowsValid = false;
358+
}
352359
}
353-
}
354360

355-
Feed(key, *row);
361+
Feed(key, *row);
356362

357-
return Uploader.ShouldWaitUpload() ? EScan::Sleep : EScan::Feed;
363+
return Uploader.ShouldWaitUpload() ? EScan::Sleep : EScan::Feed;
364+
} catch (const std::exception& exc) {
365+
Uploader.AddIssue(exc);
366+
return EScan::Final;
367+
}
358368
}
359369

360370
EScan Exhausted() noexcept final
361371
{
362-
LOG_D("Exhausted " << Debug());
372+
try {
373+
LOG_D("Exhausted " << Debug());
363374

364-
if (!FinishPrefix()) {
375+
if (!FinishPrefix()) {
376+
return EScan::Reset;
377+
}
378+
379+
IsExhausted = true;
380+
381+
// call Seek to wait uploads
365382
return EScan::Reset;
383+
} catch (const std::exception& exc) {
384+
Uploader.AddIssue(exc);
385+
return EScan::Final;
366386
}
367-
368-
IsExhausted = true;
369-
370-
// call Seek to wait uploads
371-
return EScan::Reset;
372387
}
373388

374389
private:

ydb/core/tx/datashard/build_index/reshuffle_kmeans.cpp

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -234,39 +234,54 @@ class TReshuffleKMeansScan final : public TReshuffleKMeansScanBase {
234234

235235
EScan Seek(TLead& lead, ui64 seq) noexcept final
236236
{
237-
LOG_D("Seek " << seq << " " << Debug());
237+
try {
238+
LOG_D("Seek " << seq << " " << Debug());
238239

239-
if (IsExhausted) {
240-
return Uploader.CanFinish()
241-
? EScan::Final
242-
: EScan::Sleep;
243-
}
240+
if (IsExhausted) {
241+
return Uploader.CanFinish()
242+
? EScan::Final
243+
: EScan::Sleep;
244+
}
244245

245-
lead = Lead;
246+
lead = Lead;
246247

247-
return EScan::Feed;
248+
return EScan::Feed;
249+
} catch (const std::exception& exc) {
250+
Uploader.AddIssue(exc);
251+
return EScan::Final;
252+
}
248253
}
249254

250255
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final
251256
{
252-
LOG_T("Feed " << Debug());
257+
try {
258+
LOG_T("Feed " << Debug());
253259

254-
++ReadRows;
255-
ReadBytes += CountBytes(key, row);
260+
++ReadRows;
261+
ReadBytes += CountBytes(key, row);
256262

257-
Feed(key, *row);
263+
Feed(key, *row);
258264

259-
return Uploader.ShouldWaitUpload() ? EScan::Sleep : EScan::Feed;
265+
return Uploader.ShouldWaitUpload() ? EScan::Sleep : EScan::Feed;
266+
} catch (const std::exception& exc) {
267+
Uploader.AddIssue(exc);
268+
return EScan::Final;
269+
}
260270
}
261271

262272
EScan Exhausted() noexcept final
263273
{
264-
LOG_D("Exhausted " << Debug());
265-
266-
IsExhausted = true;
267-
268-
// call Seek to wait uploads
269-
return EScan::Reset;
274+
try {
275+
LOG_D("Exhausted " << Debug());
276+
277+
IsExhausted = true;
278+
279+
// call Seek to wait uploads
280+
return EScan::Reset;
281+
} catch (const std::exception& exc) {
282+
Uploader.AddIssue(exc);
283+
return EScan::Final;
284+
}
270285
}
271286

272287
private:

ydb/core/tx/datashard/build_index/sample_k.cpp

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -115,35 +115,56 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
115115
}
116116

117117
EScan Seek(TLead& lead, ui64 seq) noexcept final {
118-
LOG_D("Seek " << seq << " " << Debug());
118+
try {
119+
LOG_D("Seek " << seq << " " << Debug());
119120

120-
lead = Lead;
121+
lead = Lead;
121122

122-
return EScan::Feed;
123+
return EScan::Feed;
124+
} catch (const std::exception& exc) {
125+
HasBuildError = true;
126+
Issues.AddIssue(NYql::TIssue(TStringBuilder()
127+
<< "Scan failed " << exc.what()));
128+
return EScan::Final;
129+
}
123130
}
124131

125132
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final {
126-
LOG_T("Feed " << Debug());
133+
try {
134+
LOG_T("Feed " << Debug());
127135

128-
++ReadRows;
129-
ReadBytes += CountBytes(key, row);
136+
++ReadRows;
137+
ReadBytes += CountBytes(key, row);
130138

131-
Sampler.Add([&row](){
132-
return TSerializedCellVec::Serialize(*row);
133-
});
139+
Sampler.Add([&row](){
140+
return TSerializedCellVec::Serialize(*row);
141+
});
142+
143+
if (Sampler.GetMaxProbability() == 0) {
144+
return EScan::Final;
145+
}
134146

135-
if (Sampler.GetMaxProbability() == 0) {
147+
return EScan::Feed;
148+
} catch (const std::exception& exc) {
149+
HasBuildError = true;
150+
Issues.AddIssue(NYql::TIssue(TStringBuilder()
151+
<< "Scan failed " << exc.what()));
136152
return EScan::Final;
137153
}
138-
139-
return EScan::Feed;
140154
}
141155

142156
EScan Exhausted() noexcept final
143157
{
144-
LOG_D("Exhausted " << Debug());
158+
try {
159+
LOG_D("Exhausted " << Debug());
145160

146-
return EScan::Final;
161+
return EScan::Final;
162+
} catch (const std::exception& exc) {
163+
HasBuildError = true;
164+
Issues.AddIssue(NYql::TIssue(TStringBuilder()
165+
<< "Scan failed " << exc.what()));
166+
return EScan::Final;
167+
}
147168
}
148169

149170
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept final {

0 commit comments

Comments
 (0)