Skip to content

Commit 30e4c7d

Browse files
committed
KqpSinkTx.OlapInvalidateOnError has been fixed (#18950)
1 parent 55be93b commit 30e4c7d

File tree

5 files changed

+37
-37
lines changed

5 files changed

+37
-37
lines changed

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4825,12 +4825,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
48254825
result = session.ExecuteQuery(fmt::format(R"(
48264826
INSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ({}u, 0);
48274827
)", index), NYdb::NQuery::TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync();
4828-
if (GetIsOlap()) {
4829-
// https://github.com/ydb-platform/ydb/issues/14383
4830-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
4831-
} else {
4832-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
4833-
}
4828+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
48344829
}
48354830
}
48364831
};

ydb/core/tx/columnshard/columnshard_private_events.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ struct TEvPrivate {
225225
public:
226226
enum EErrorClass {
227227
Internal,
228-
Request
228+
Request,
229+
ConstraintViolation
229230
};
230231

231232
private:
@@ -241,6 +242,8 @@ struct TEvPrivate {
241242
return NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR;
242243
case EErrorClass::Request:
243244
return NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST;
245+
case EErrorClass::ConstraintViolation:
246+
return NKikimrDataEvents::TEvWriteResult::STATUS_CONSTRAINT_VIOLATION;
244247
}
245248
}
246249

ydb/core/tx/columnshard/operations/batch_builder/merger.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,18 @@
44

55
namespace NKikimr::NOlap {
66

7-
NKikimr::TConclusionStatus IMerger::Finish() {
7+
NKikimr::NOlap::IMerger::TYdbConclusionStatus IMerger::Finish() {
88
while (!IncomingFinished) {
99
auto result = OnIncomingOnly(IncomingPosition);
1010
if (result.IsFail()) {
1111
return result;
1212
}
1313
IncomingFinished = !IncomingPosition.NextPosition(1);
1414
}
15-
return TConclusionStatus::Success();
15+
return TYdbConclusionStatus::Success();
1616
}
1717

18-
NKikimr::TConclusionStatus IMerger::AddExistsDataOrdered(const std::shared_ptr<arrow::Table>& data) {
18+
NKikimr::NOlap::IMerger::TYdbConclusionStatus IMerger::AddExistsDataOrdered(const std::shared_ptr<arrow::Table>& data) {
1919
AFL_VERIFY(data);
2020
NArrow::NMerger::TRWSortableBatchPosition existsPosition(data, 0, Schema->GetPKColumnNames(),
2121
Schema->GetIndexInfo().GetColumnSTLNames(false), false);
@@ -40,10 +40,10 @@ NKikimr::TConclusionStatus IMerger::AddExistsDataOrdered(const std::shared_ptr<a
4040
}
4141
}
4242
AFL_VERIFY(exsistFinished);
43-
return TConclusionStatus::Success();
43+
return TYdbConclusionStatus::Success();
4444
}
4545

46-
NKikimr::TConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& exists, const NArrow::NMerger::TSortableBatchPosition& incoming) {
46+
NKikimr::NOlap::IMerger::TYdbConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& exists, const NArrow::NMerger::TSortableBatchPosition& incoming) {
4747
auto rGuard = Builder.StartRecord();
4848
AFL_VERIFY(Schema->GetIndexInfo().GetColumnIds(false).size() == exists.GetData().GetColumns().size())
4949
("index", Schema->GetIndexInfo().GetColumnIds(false).size())("exists", exists.GetData().GetColumns().size());
@@ -57,7 +57,7 @@ NKikimr::TConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSo
5757
rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk);
5858
}
5959
}
60-
return TConclusionStatus::Success();
60+
return TYdbConclusionStatus::Success();
6161
}
6262

6363
TUpdateMerger::TUpdateMerger(const NArrow::TContainerWithIndexes<arrow::RecordBatch>& incoming,

ydb/core/tx/columnshard/operations/batch_builder/merger.h

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44
#include <ydb/core/formats/arrow/reader/result_builder.h>
55
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
66
#include <ydb/library/conclusion/status.h>
7+
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
78

89
namespace NKikimr::NOlap {
910

1011
class IMerger {
12+
public:
13+
using TYdbConclusionStatus = TConclusionSpecialStatus<Ydb::StatusIds::StatusCode, Ydb::StatusIds::SUCCESS, Ydb::StatusIds::BAD_REQUEST>;
1114
private:
1215
NArrow::NMerger::TRWSortableBatchPosition IncomingPosition;
1316

14-
virtual TConclusionStatus OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& exists, const NArrow::NMerger::TSortableBatchPosition& incoming) = 0;
15-
virtual TConclusionStatus OnIncomingOnly(const NArrow::NMerger::TSortableBatchPosition& incoming) = 0;
17+
virtual TYdbConclusionStatus OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& exists, const NArrow::NMerger::TSortableBatchPosition& incoming) = 0;
18+
virtual TYdbConclusionStatus OnIncomingOnly(const NArrow::NMerger::TSortableBatchPosition& incoming) = 0;
1619
protected:
1720
std::shared_ptr<ISnapshotSchema> Schema;
1821
NArrow::TContainerWithIndexes<arrow::RecordBatch> IncomingData;
@@ -29,19 +32,19 @@ class IMerger {
2932

3033
virtual NArrow::TContainerWithIndexes<arrow::RecordBatch> BuildResultBatch() = 0;
3134

32-
TConclusionStatus Finish();
35+
TYdbConclusionStatus Finish();
3336

34-
TConclusionStatus AddExistsDataOrdered(const std::shared_ptr<arrow::Table>& data);
37+
TYdbConclusionStatus AddExistsDataOrdered(const std::shared_ptr<arrow::Table>& data);
3538
};
3639

3740
class TInsertMerger: public IMerger {
3841
private:
3942
using TBase = IMerger;
40-
virtual TConclusionStatus OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& exists, const NArrow::NMerger::TSortableBatchPosition& /*incoming*/) override {
41-
return TConclusionStatus::Fail("Conflict with existing key. " + exists.GetSorting()->DebugJson(exists.GetPosition()).GetStringRobust());
43+
virtual TYdbConclusionStatus OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& exists, const NArrow::NMerger::TSortableBatchPosition& /*incoming*/) override {
44+
return TYdbConclusionStatus::Fail(Ydb::StatusIds::PRECONDITION_FAILED, "Conflict with existing key. " + exists.GetSorting()->DebugJson(exists.GetPosition()).GetStringRobust());
4245
}
43-
virtual TConclusionStatus OnIncomingOnly(const NArrow::NMerger::TSortableBatchPosition& /*incoming*/) override {
44-
return TConclusionStatus::Success();
46+
virtual TYdbConclusionStatus OnIncomingOnly(const NArrow::NMerger::TSortableBatchPosition& /*incoming*/) override {
47+
return TYdbConclusionStatus::Success();
4548
}
4649
public:
4750
using TBase::TBase;
@@ -54,13 +57,13 @@ class TReplaceMerger: public IMerger {
5457
private:
5558
using TBase = IMerger;
5659
NArrow::TColumnFilter Filter = NArrow::TColumnFilter::BuildDenyFilter();
57-
virtual TConclusionStatus OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& /*exists*/, const NArrow::NMerger::TSortableBatchPosition& /*incoming*/) override {
60+
virtual TYdbConclusionStatus OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& /*exists*/, const NArrow::NMerger::TSortableBatchPosition& /*incoming*/) override {
5861
Filter.Add(true);
59-
return TConclusionStatus::Success();
62+
return TYdbConclusionStatus::Success();
6063
}
61-
virtual TConclusionStatus OnIncomingOnly(const NArrow::NMerger::TSortableBatchPosition& /*incoming*/) override {
64+
virtual TYdbConclusionStatus OnIncomingOnly(const NArrow::NMerger::TSortableBatchPosition& /*incoming*/) override {
6265
Filter.Add(false);
63-
return TConclusionStatus::Success();
66+
return TYdbConclusionStatus::Success();
6467
}
6568
public:
6669
using TBase::TBase;
@@ -80,13 +83,13 @@ class TUpdateMerger: public IMerger {
8083
std::vector<std::shared_ptr<arrow::BooleanArray>> HasIncomingDataFlags;
8184
const std::optional<NArrow::NMerger::TSortableBatchPosition> DefaultExists;
8285
const TString InsertDenyReason;
83-
virtual TConclusionStatus OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& exists, const NArrow::NMerger::TSortableBatchPosition& incoming) override;
84-
virtual TConclusionStatus OnIncomingOnly(const NArrow::NMerger::TSortableBatchPosition& incoming) override {
86+
virtual TYdbConclusionStatus OnEqualKeys(const NArrow::NMerger::TSortableBatchPosition& exists, const NArrow::NMerger::TSortableBatchPosition& incoming) override;
87+
virtual TYdbConclusionStatus OnIncomingOnly(const NArrow::NMerger::TSortableBatchPosition& incoming) override {
8588
if (!!InsertDenyReason) {
86-
return TConclusionStatus::Fail("insertion is impossible: " + InsertDenyReason);
89+
return TYdbConclusionStatus::Fail("insertion is impossible: " + InsertDenyReason);
8790
}
8891
if (!DefaultExists) {
89-
return TConclusionStatus::Success();
92+
return TYdbConclusionStatus::Success();
9093
} else {
9194
return OnEqualKeys(*DefaultExists, incoming);
9295
}

ydb/core/tx/columnshard/operations/batch_builder/restore.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ TConclusionStatus TModificationRestoreTask::DoOnDataChunk(const std::shared_ptr<
2525
if (result.IsFail()) {
2626
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "merge_data_problems")("write_id", WriteData.GetWriteMeta().GetWriteId())(
2727
"tablet_id", GetTabletId())("message", result.GetErrorMessage());
28-
SendErrorMessage(result.GetErrorMessage(), NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Request);
28+
SendErrorMessage(result.GetErrorMessage(), result.GetStatus() == Ydb::StatusIds::PRECONDITION_FAILED ? NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::ConstraintViolation : NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Request);
29+
return TConclusionStatus::Fail(result.GetErrorMessage());
2930
}
30-
return result;
31+
return TConclusionStatus::Success();
3132
}
3233

3334
void TModificationRestoreTask::DoOnError(const TString& errorMessage) {
@@ -37,12 +38,10 @@ void TModificationRestoreTask::DoOnError(const TString& errorMessage) {
3738
}
3839

3940
NKikimr::TConclusionStatus TModificationRestoreTask::DoOnFinished() {
40-
{
41-
auto result = Merger->Finish();
42-
if (result.IsFail()) {
43-
OnError("cannot finish merger: " + result.GetErrorMessage());
44-
return result;
45-
}
41+
auto result = Merger->Finish();
42+
if (result.IsFail()) {
43+
OnError("cannot finish merger: " + result.GetErrorMessage());
44+
return TConclusionStatus::Fail(result.GetErrorMessage());
4645
}
4746

4847
auto batchResult = Merger->BuildResultBatch();

0 commit comments

Comments
 (0)