Skip to content

Commit 5522a96

Browse files
fix race on scan start with indexation cleaning blobs (#7968)
1 parent 274bfdd commit 5522a96

File tree

7 files changed

+149
-170
lines changed

7 files changed

+149
-170
lines changed

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#include "constructor.h"
2-
#include "resolver.h"
32
#include "read_metadata.h"
3+
#include "resolver.h"
4+
45
#include <ydb/core/tx/columnshard/columnshard_impl.h>
56

67
namespace NKikimr::NOlap::NReader::NPlain {
78

8-
NKikimr::TConclusionStatus TIndexScannerConstructor::ParseProgram(const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const {
9+
NKikimr::TConclusionStatus TIndexScannerConstructor::ParseProgram(
10+
const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const {
911
AFL_VERIFY(vIndex);
1012
auto& indexInfo = vIndex->GetSchema(Snapshot)->GetIndexInfo();
1113
TIndexColumnResolver columnResolver(indexInfo);
@@ -17,15 +19,17 @@ std::vector<TNameTypeInfo> TIndexScannerConstructor::GetPrimaryKeyScheme(const N
1719
return indexInfo.GetPrimaryKeyColumns();
1820
}
1921

20-
NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructor::DoBuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
22+
NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructor::DoBuildReadMetadata(
23+
const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
2124
auto& insertTable = self->InsertTable;
2225
auto& index = self->TablesManager.GetPrimaryIndex();
2326
if (!insertTable || !index) {
2427
return std::shared_ptr<TReadMetadataBase>();
2528
}
2629

2730
if (read.GetSnapshot().GetPlanInstant() < self->GetMinReadSnapshot().GetPlanInstant()) {
28-
return TConclusionStatus::Fail(TStringBuilder() << "Snapshot too old: " << read.GetSnapshot());
31+
return TConclusionStatus::Fail(TStringBuilder() << "Snapshot too old: " << read.GetSnapshot() << ". CS min read snapshot: "
32+
<< self->GetMinReadSnapshot() << ". now: " << TInstant::Now());
2933
}
3034

3135
TDataStorageAccessor dataAccessor(insertTable, index);
@@ -39,4 +43,4 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
3943
return static_pointer_cast<TReadMetadataBase>(readMetadata);
4044
}
4145

42-
}
46+
} // namespace NKikimr::NOlap::NReader::NPlain
Lines changed: 53 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,86 @@
11
#include "tx_internal_scan.h"
2-
#include <ydb/core/tx/columnshard/engines/reader/actor/actor.h>
3-
#include <ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h>
4-
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h>
2+
53
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
64
#include <ydb/core/sys_view/common/schema.h>
5+
#include <ydb/core/tx/columnshard/engines/reader/actor/actor.h>
6+
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h>
77
#include <ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h>
8+
#include <ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h>
89

910
namespace NKikimr::NOlap::NReader {
1011

11-
bool TTxInternalScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/) {
12-
TMemoryProfileGuard mpg("TTxInternalScan::Execute");
12+
void TTxInternalScan::SendError(const TString& problem, const TString& details, const TActorContext& ctx) const {
13+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("problem", problem)("details", details);
1314
auto& request = *InternalScanEvent->Get();
14-
const TSnapshot snapshot = request.ReadToSnapshot.value_or(NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId));
15+
auto scanComputeActor = InternalScanEvent->Sender;
1516

16-
TReadDescription read(snapshot, request.GetReverse());
17-
read.PathId = request.GetPathId();
18-
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
19-
std::unique_ptr<IScannerConstructor> scannerConstructor(new NPlain::TIndexScannerConstructor(snapshot, request.GetItemsLimit(), request.GetReverse()));
20-
read.ColumnIds = request.GetColumnIds();
21-
read.ColumnNames = request.GetColumnNames();
22-
if (request.RangesFilter) {
23-
read.PKRangesFilter = std::move(*request.RangesFilter);
24-
}
17+
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, Self->TabletID());
18+
ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
19+
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST,
20+
TStringBuilder() << "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << problem << "/"
21+
<< details);
22+
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
2523

26-
const TVersionedIndex* vIndex = Self->GetIndexOptional() ? &Self->GetIndexOptional()->GetVersionedIndex() : nullptr;
27-
AFL_VERIFY(vIndex);
28-
{
29-
TProgramContainer pContainer;
30-
pContainer.OverrideProcessingColumns(read.ColumnNames);
31-
read.SetProgram(std::move(pContainer));
32-
}
24+
ctx.Send(scanComputeActor, ev.Release());
25+
}
3326

34-
{
35-
auto newRange = scannerConstructor->BuildReadMetadata(Self, read);
36-
if (!newRange) {
37-
ErrorDescription = newRange.GetErrorMessage();
38-
ReadMetadataRange = nullptr;
39-
return true;
40-
}
41-
ReadMetadataRange = newRange.DetachResult();
42-
}
43-
AFL_VERIFY(ReadMetadataRange);
27+
bool TTxInternalScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/) {
4428
return true;
4529
}
4630

4731
void TTxInternalScan::Complete(const TActorContext& ctx) {
4832
TMemoryProfileGuard mpg("TTxInternalScan::Complete");
33+
4934
auto& request = *InternalScanEvent->Get();
5035
auto scanComputeActor = InternalScanEvent->Sender;
51-
const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()("tablet", Self->TabletID());
52-
53-
if (!ReadMetadataRange) {
54-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", "no metadata")("error", ErrorDescription);
36+
const TSnapshot snapshot = request.ReadToSnapshot.value_or(NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId));
37+
const NActors::TLogContextGuard gLogging =
38+
NActors::TLogContextBuilder::Build()("tablet", Self->TabletID())("snapshot", snapshot.DebugString());
39+
TReadMetadataPtr readMetadataRange;
40+
{
41+
TReadDescription read(snapshot, request.GetReverse());
42+
read.PathId = request.GetPathId();
43+
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
44+
std::unique_ptr<IScannerConstructor> scannerConstructor(
45+
new NPlain::TIndexScannerConstructor(snapshot, request.GetItemsLimit(), request.GetReverse()));
46+
read.ColumnIds = request.GetColumnIds();
47+
read.ColumnNames = request.GetColumnNames();
48+
if (request.RangesFilter) {
49+
read.PKRangesFilter = std::move(*request.RangesFilter);
50+
}
5551

56-
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, Self->TabletID());
57-
ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
58-
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, TStringBuilder()
59-
<< "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << ErrorDescription ? ErrorDescription : "no metadata ranges");
60-
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
52+
const TVersionedIndex* vIndex = Self->GetIndexOptional() ? &Self->GetIndexOptional()->GetVersionedIndex() : nullptr;
53+
AFL_VERIFY(vIndex);
54+
{
55+
TProgramContainer pContainer;
56+
pContainer.OverrideProcessingColumns(read.ColumnNames);
57+
read.SetProgram(std::move(pContainer));
58+
}
6159

62-
ctx.Send(scanComputeActor, ev.Release());
63-
return;
60+
{
61+
auto newRange = scannerConstructor->BuildReadMetadata(Self, read);
62+
if (!newRange) {
63+
return SendError("cannot create read metadata", newRange.GetErrorMessage(), ctx);
64+
}
65+
readMetadataRange = TValidator::CheckNotNull(newRange.DetachResult());
66+
}
6467
}
68+
6569
TStringBuilder detailedInfo;
6670
if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) {
67-
detailedInfo << " read metadata: (" << *ReadMetadataRange << ")";
71+
detailedInfo << " read metadata: (" << *readMetadataRange << ")";
6872
}
6973

7074
const TVersionedIndex* index = nullptr;
7175
if (Self->HasIndex()) {
7276
index = &Self->GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex();
7377
}
74-
const TConclusion<ui64> requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRange, index);
75-
if (!requestCookie) {
76-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", requestCookie.GetErrorMessage())("trace_details", detailedInfo);
77-
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, Self->TabletID());
78-
79-
ev->Record.SetStatus(Ydb::StatusIds::INTERNAL_ERROR);
80-
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder()
81-
<< "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << requestCookie.GetErrorMessage());
82-
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
83-
Self->Counters.GetScanCounters().OnScanFinished(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero());
84-
ctx.Send(scanComputeActor, ev.Release());
85-
return;
86-
}
87-
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(),
88-
TComputeShardingPolicy(), ScanId, TxId, ScanGen, *requestCookie, Self->TabletID(), TDuration::Max(), ReadMetadataRange,
89-
NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters()));
78+
const ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(readMetadataRange, index);
79+
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(), TComputeShardingPolicy(),
80+
ScanId, TxId, ScanGen, requestCookie, Self->TabletID(), TDuration::Max(), readMetadataRange, NKikimrDataEvents::FORMAT_ARROW,
81+
Self->Counters.GetScanCounters()));
9082

9183
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxInternalScan started")("actor_id", scanActor)("trace_detailed", detailedInfo);
9284
}
9385

94-
}
86+
} // namespace NKikimr::NOlap::NReader

ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ class TTxInternalScan: public NTabletFlatExecutor::TTransactionBase<NColumnShard
1010
const ui32 ScanGen = 1;
1111
const ui32 TxId = 1;
1212
const ui32 ScanId = 1;
13+
void SendError(const TString& problem, const TString& details, const TActorContext& ctx) const;
14+
1315
public:
1416
using TReadMetadataPtr = TReadMetadataBase::TConstPtr;
1517

@@ -23,9 +25,7 @@ class TTxInternalScan: public NTabletFlatExecutor::TTransactionBase<NColumnShard
2325
TTxType GetTxType() const override { return NColumnShard::TXTYPE_START_INTERNAL_SCAN; }
2426

2527
private:
26-
TString ErrorDescription;
2728
TEvColumnShard::TEvInternalScan::TPtr InternalScanEvent;
28-
TReadMetadataPtr ReadMetadataRange;
2929
};
3030

3131
}

0 commit comments

Comments
 (0)