Skip to content

Commit d4c693f

Browse files
ivanmorozov333vlad-gogov
andauthored
fix error on start internal scanner (#11289)
Co-authored-by: vlad-gogov <vlad-gogov@ydb.tech>
1 parent 7fe34bf commit d4c693f

File tree

7 files changed

+43
-11
lines changed

7 files changed

+43
-11
lines changed

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2960,6 +2960,31 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
29602960
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
29612961
}
29622962
}
2963+
2964+
Y_UNIT_TEST(ScanFailedSnapshotTooOld) {
2965+
NKikimrConfig::TAppConfig appConfig;
2966+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
2967+
appConfig.MutableColumnShardConfig()->SetMaxReadStaleness_ms(5000);
2968+
auto settings = TKikimrSettings().SetAppConfig(appConfig).SetWithSampleTables(false);
2969+
TTestHelper testHelper(settings);
2970+
2971+
TTestHelper::TColumnTable cnt;
2972+
TVector<TTestHelper::TColumnSchema> schema = {
2973+
TTestHelper::TColumnSchema().SetName("key").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
2974+
TTestHelper::TColumnSchema().SetName("c").SetType(NScheme::NTypeIds::Int32).SetNullable(true)
2975+
};
2976+
cnt.SetName("/Root/cnt").SetPrimaryKey({ "key" }).SetSchema(schema);
2977+
testHelper.CreateTable(cnt);
2978+
Sleep(TDuration::Seconds(10));
2979+
auto client = testHelper.GetKikimr().GetQueryClient();
2980+
auto result =
2981+
client
2982+
.ExecuteQuery(
2983+
TStringBuilder() << "$v = SELECT CAST(COUNT(*) AS INT32) FROM `/Root/cnt`; INSERT INTO `/Root/cnt` (key, c) values(1, $v);",
2984+
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
2985+
.GetValueSync();
2986+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
2987+
}
29632988
}
29642989

29652990
}

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,9 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
283283
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
284284

285285
NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager,
286-
Counters.GetIndexationCounters().SplitterCounters, Counters.GetCSCounters().WritingCounters);
286+
Counters.GetIndexationCounters().SplitterCounters, Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot());
287287
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(
288-
BufferizationWriteActorId, std::move(writeData), GetLastTxSnapshot(), context);
288+
BufferizationWriteActorId, std::move(writeData), context);
289289
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
290290
}
291291
}
@@ -591,7 +591,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
591591
writeOperation->SetBehaviour(behaviour);
592592
NOlap::TWritingContext wContext(
593593
pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
594-
Counters.GetCSCounters().WritingCounters);
594+
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max());
595595
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
596596
writeOperation->Start(*this, arrowData, source, wContext);
597597
}

ydb/core/tx/columnshard/data_reader/actor.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanData::TPtr& ev) {
2323
} else {
2424
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "restore_task_finished")("reason", status.GetErrorMessage());
2525
}
26+
PassAway();
2627
}
2728
}
2829

@@ -35,10 +36,11 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& ev) {
3536
}
3637

3738
void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanError::TPtr& ev) {
38-
SwitchStage(EStage::WaitData, EStage::Finished);
39+
SwitchStage(std::nullopt, EStage::Finished);
3940
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "problem_on_restore_data")(
4041
"reason", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
4142
RestoreTask->OnError(NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
43+
PassAway();
4244
}
4345

4446
void TActor::Bootstrap(const TActorContext& /*ctx*/) {

ydb/core/tx/columnshard/data_reader/actor.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,10 @@ class TActor: public NActors::TActorBootstrapped<TActor> {
5959

6060
EStage Stage = EStage::Initialization;
6161
static inline const ui64 FreeSpace = ((ui64)8) << 20;
62-
void SwitchStage(const EStage from, const EStage to) {
63-
AFL_VERIFY(Stage == from)("from", (ui32)from)("real", (ui32)Stage)("to", (ui32)to);
62+
void SwitchStage(const std::optional<EStage> from, const EStage to) {
63+
if (from) {
64+
AFL_VERIFY(Stage == *from)("from", (ui32)*from)("real", (ui32)Stage)("to", (ui32)to);
65+
}
6466
Stage = to;
6567
}
6668

ydb/core/tx/columnshard/operations/batch_builder/builder.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ class TBuildBatchesTask: public NConveyor::ITask {
2626
}
2727

2828
TBuildBatchesTask(
29-
const NActors::TActorId bufferActorId, NEvWrite::TWriteData&& writeData, const TSnapshot& actualSnapshot, const TWritingContext& context)
29+
const NActors::TActorId bufferActorId, NEvWrite::TWriteData&& writeData, const TWritingContext& context)
3030
: WriteData(std::move(writeData))
3131
, BufferActorId(bufferActorId)
32-
, ActualSnapshot(actualSnapshot)
32+
, ActualSnapshot(context.GetApplyToSnapshot())
3333
, Context(context) {
3434
}
3535
};

ydb/core/tx/columnshard/operations/common/context.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,20 @@ class TWritingContext {
1414
YDB_READONLY_DEF(std::shared_ptr<IStoragesManager>, StoragesManager);
1515
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TSplitterCounters>, SplitterCounters);
1616
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TWriteCounters>, WritingCounters);
17+
YDB_READONLY(TSnapshot, ApplyToSnapshot, TSnapshot::Zero());
1718

1819
public:
1920
TWritingContext(const ui64 tabletId, const NActors::TActorId& tabletActorId, const std::shared_ptr<ISnapshotSchema>& actualSchema,
2021
const std::shared_ptr<IStoragesManager>& operators, const std::shared_ptr<NColumnShard::TSplitterCounters>& splitterCounters,
21-
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters)
22+
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters, const TSnapshot& applyToSnapshot)
2223
: TabletId(tabletId)
2324
, TabletActorId(tabletActorId)
2425
, ActualSchema(actualSchema)
2526
, StoragesManager(operators)
2627
, SplitterCounters(splitterCounters)
27-
, WritingCounters(writingCounters) {
28+
, WritingCounters(writingCounters)
29+
, ApplyToSnapshot(applyToSnapshot)
30+
{
2831
}
2932
};
3033
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/operations/write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ void TWriteOperation::Start(
3737
NEvWrite::TWriteData writeData(writeMeta, data, owner.TablesManager.GetPrimaryIndex()->GetReplaceKey(),
3838
owner.StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING_OPERATOR), WritePortions);
3939
std::shared_ptr<NConveyor::ITask> task =
40-
std::make_shared<NOlap::TBuildBatchesTask>(owner.BufferizationWriteActorId, std::move(writeData), owner.GetLastTxSnapshot(), context);
40+
std::make_shared<NOlap::TBuildBatchesTask>(owner.BufferizationWriteActorId, std::move(writeData), context);
4141
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
4242

4343
Status = EOperationStatus::Started;

0 commit comments

Comments
 (0)