Skip to content

Commit 4b09967

Browse files
EvWrite codes unification with kqp (#9698)
1 parent 55fae2a commit 4b09967

File tree

11 files changed

+97
-37
lines changed

11 files changed

+97
-37
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <ydb/core/kqp/runtime/kqp_transport.h>
1919
#include <ydb/core/kqp/opt/kqp_query_plan.h>
2020
#include <ydb/core/tx/columnshard/columnshard.h>
21+
#include <ydb/core/tx/data_events/common/error_codes.h>
2122
#include <ydb/core/tx/datashard/datashard.h>
2223
#include <ydb/core/tx/long_tx_service/public/events.h>
2324
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
@@ -862,37 +863,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
862863
void ShardError(const NKikimrDataEvents::TEvWriteResult& result) {
863864
NYql::TIssues issues;
864865
NYql::IssuesFromMessage(result.GetIssues(), issues);
865-
866-
switch (result.GetStatus()) {
867-
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED:
868-
case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED:
869-
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED: {
870-
YQL_ENSURE(false);
871-
}
872-
case NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED: {
873-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issues);
874-
}
875-
case NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED:
876-
case NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR: {
877-
return ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, issues);
878-
}
879-
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
880-
return ReplyErrorAndDie(Ydb::StatusIds::OVERLOADED, issues);
881-
}
882-
case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED: {
883-
return ReplyErrorAndDie(Ydb::StatusIds::CANCELLED, issues);
884-
}
885-
case NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST: {
886-
return ReplyErrorAndDie(Ydb::StatusIds::BAD_REQUEST, issues);
887-
}
888-
case NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED: {
889-
return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, issues);
890-
}
891-
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
892-
issues.AddIssue(NYql::YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated."));
893-
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issues);
894-
}
866+
auto statusConclusion = NEvWrite::NErrorCodes::TOperator::GetStatusInfo(result.GetStatus());
867+
AFL_ENSURE(statusConclusion.IsSuccess())("error", statusConclusion.GetErrorMessage());
868+
if (result.GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN) {
869+
issues.AddIssue(NYql::YqlIssue({}, statusConclusion->GetIssueCode(), statusConclusion->GetIssueGeneralText()));
895870
}
871+
return ReplyErrorAndDie(statusConclusion->GetYdbStatusCode(), issues);
896872
}
897873

898874
void PQTabletError(const NKikimrPQ::TEvProposeTransactionResult& result) {

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7953,13 +7953,13 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
79537953
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull));
79547954
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
79557955
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
7956-
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::GENERIC_ERROR);
7956+
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::BAD_REQUEST);
79577957
}
79587958
{
79597959
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull));
79607960
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
79617961
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
7962-
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::GENERIC_ERROR);
7962+
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::BAD_REQUEST);
79637963
}
79647964

79657965
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[]");

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ bool TTxWrite::CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
2525
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
2626
NOlap::TCommittedData commitData(userData, Self->GetLastPlannedSnapshot(), Self->Generation(), writeId);
2727
if (Self->TablesManager.HasTable(userData->GetPathId())) {
28-
Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData));
28+
auto counters = Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData));
29+
Self->Counters.GetTabletCounters()->OnWriteCommitted(counters);
2930
}
3031
Self->UpdateInsertTableCounters();
3132
return true;

ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1093,7 +1093,7 @@ class TPortionBuckets {
10931093
Y_UNUSED(LeftBucket->Actualize(currentInstant));
10941094
AddBucketToRating(LeftBucket);
10951095
for (auto&& i : Buckets) {
1096-
const i64 rating = i.second->GetWeight();
1096+
const i64 rating = i.second->GetLastWeight();
10971097
if (i.second->Actualize(currentInstant)) {
10981098
RemoveBucketFromRating(i.second, rating);
10991099
AddBucketToRating(i.second);

ydb/core/tx/columnshard/operations/batch_builder/builder.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr<ITask>& /*t
3232
ActualSchema->PrepareForModification(batchConclusion.DetachResult(), WriteData.GetWriteMeta().GetModificationType());
3333
if (preparedConclusion.IsFail()) {
3434
ReplyError("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage(),
35-
NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);
35+
NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Request);
3636
return TConclusionStatus::Fail("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage());
3737
}
3838
auto batch = preparedConclusion.DetachResult();
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#include "error_codes.h"
2+
3+
namespace NKikimr::NEvWrite::NErrorCodes {
4+
5+
TConclusion<NErrorCodes::TOperator::TYdbStatusInfo> TOperator::GetStatusInfo(
6+
const NKikimrDataEvents::TEvWriteResult::EStatus value) {
7+
switch (value) {
8+
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED:
9+
case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED:
10+
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED:
11+
return TConclusionStatus::Fail("Incorrect status for interpretation to YdbStatus");
12+
case NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED:
13+
return TYdbStatusInfo(Ydb::StatusIds::ABORTED, NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED, "Request aborted");
14+
case NKikimrDataEvents::TEvWriteResult::STATUS_DISK_SPACE_EXHAUSTED:
15+
return TYdbStatusInfo(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED, "Disk space exhausted");
16+
case NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR:
17+
return TYdbStatusInfo(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR, "Request aborted");
18+
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED:
19+
return TYdbStatusInfo(Ydb::StatusIds::OVERLOADED, NYql::TIssuesIds::KIKIMR_OVERLOADED, "System overloaded");
20+
case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED:
21+
return TYdbStatusInfo(Ydb::StatusIds::CANCELLED, NYql::TIssuesIds::KIKIMR_OPERATION_CANCELLED, "Request cancelled");
22+
case NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST:
23+
return TYdbStatusInfo(Ydb::StatusIds::BAD_REQUEST, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, "Incorrect request");
24+
case NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED:
25+
return TYdbStatusInfo(Ydb::StatusIds::SCHEME_ERROR, NYql::TIssuesIds::KIKIMR_SCHEMA_CHANGED, "Schema changed");
26+
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
27+
return TYdbStatusInfo(Ydb::StatusIds::ABORTED, NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated.");
28+
}
29+
}
30+
}
31+
32+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#pragma once
2+
#include <ydb/core/protos/data_events.pb.h>
3+
#include <ydb/core/protos/tx_columnshard.pb.h>
4+
5+
#include <ydb/library/accessor/accessor.h>
6+
#include <ydb/library/conclusion/result.h>
7+
#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
8+
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
9+
10+
namespace NKikimr::NEvWrite::NErrorCodes {
11+
12+
class TOperator {
13+
public:
14+
class TYdbStatusInfo {
15+
private:
16+
YDB_READONLY(Ydb::StatusIds::StatusCode, YdbStatusCode, Ydb::StatusIds::STATUS_CODE_UNSPECIFIED);
17+
YDB_READONLY(NYql::TIssuesIds::EIssueCode, IssueCode, NYql::TIssuesIds::UNEXPECTED);
18+
YDB_READONLY_DEF(TString, IssueGeneralText);
19+
20+
public:
21+
TYdbStatusInfo(const Ydb::StatusIds::StatusCode code, const NYql::TIssuesIds::EIssueCode issueCode, const TString& issueMessage)
22+
: YdbStatusCode(code)
23+
, IssueCode(issueCode)
24+
, IssueGeneralText(issueMessage) {
25+
}
26+
};
27+
28+
static TConclusion<TYdbStatusInfo> GetStatusInfo(const NKikimrDataEvents::TEvWriteResult::EStatus value);
29+
};
30+
31+
} // namespace NKikimr::NEvWrite::NErrorCodes

ydb/core/tx/data_events/common/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ LIBRARY()
22

33
PEERDIR(
44
ydb/core/protos
5+
ydb/library/yql/core/issue/protos
6+
ydb/public/api/protos
57
)
68

79
SRCS(
810
modification_type.cpp
11+
error_codes.cpp
912
)
1013

1114
END()

ydb/core/tx/data_events/shard_writer.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "shard_writer.h"
2+
#include "common/error_codes.h"
23

34
#include <ydb/core/base/tablet_pipe.h>
45
#include <ydb/core/base/tablet_pipecache.h>
@@ -86,8 +87,9 @@ namespace NKikimr::NEvWrite {
8687

8788
auto gPassAway = PassAwayGuard();
8889
if (ydbStatus != NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) {
89-
ExternalController->OnFail(Ydb::StatusIds::GENERIC_ERROR,
90-
TStringBuilder() << "Cannot write data into shard " << ShardId << " in longTx " <<
90+
auto statusInfo = NEvWrite::NErrorCodes::TOperator::GetStatusInfo(ydbStatus).DetachResult();
91+
ExternalController->OnFail(statusInfo.GetYdbStatusCode(),
92+
TStringBuilder() << "Cannot write data into shard(" << statusInfo.GetIssueGeneralText() << ") " << ShardId << " in longTx " <<
9193
ExternalController->GetLongTxId().ToString());
9294
return;
9395
}

ydb/library/yql/core/issue/protos/issue_id.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ message TIssuesIds {
7979
KIKIMR_UNSUPPORTED = 2030;
8080
KIKIMR_BAD_COLUMN_TYPE = 2031;
8181
KIKIMR_NO_COLUMN_DEFAULT_VALUE = 2032;
82+
KIKIMR_DISK_SPACE_EXHAUSTED = 2033;
83+
KIKIMR_SCHEMA_CHANGED = 2034;
84+
KIKIMR_INTERNAL_ERROR = 2035;
8285

8386
// kikimr warnings
8487
KIKIMR_READ_MODIFIED_TABLE = 2500;

0 commit comments

Comments
 (0)