Skip to content

Commit e92e224

Browse files
blob writing error processing for portion-write-mode (#12029)
1 parent ae1af5f commit e92e224

File tree

14 files changed

+155
-33
lines changed

14 files changed

+155
-33
lines changed

ydb/core/kqp/ut/olap/write_ut.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,29 @@
1414
namespace NKikimr::NKqp {
1515

1616
Y_UNIT_TEST_SUITE(KqpOlapWrite) {
17+
Y_UNIT_TEST(WriteFails) {
18+
auto csController = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NOlap::TWaitCompactionController>();
19+
csController->SetSmallSizeDetector(1000000);
20+
csController->SetIndexWriteControllerEnabled(false);
21+
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
22+
csController->SetOverrideBlobPutResultOnWriteValue(NKikimrProto::EReplyStatus::BLOCKED);
23+
Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->ResetWriteCounters();
24+
25+
auto settings = TKikimrSettings().SetWithSampleTables(false);
26+
TKikimrRunner kikimr(settings);
27+
kikimr.GetTestServer().GetRuntime()->GetAppData().FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true);
28+
kikimr.GetTestServer().GetRuntime()->GetAppData().FeatureFlags.SetEnableWritePortionsOnInsert(true);
29+
TLocalHelper(kikimr).CreateTestOlapTable();
30+
Tests::NCommon::TLoggerInit(kikimr)
31+
.SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS")
32+
.SetPriority(NActors::NLog::PRI_DEBUG)
33+
.Initialize();
34+
{
35+
auto batch = TLocalHelper(kikimr).TestArrowBatch(30000, 1000000, 11000);
36+
TLocalHelper(kikimr).SendDataViaActorSystem("/Root/olapStore/olapTable", batch, Ydb::StatusIds::INTERNAL_ERROR);
37+
}
38+
}
39+
1740
Y_UNIT_TEST(TierDraftsGC) {
1841
auto csController = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NOlap::TWaitCompactionController>();
1942
csController->SetSmallSizeDetector(1000000);

ydb/core/protos/counters_columnshard.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,4 +203,6 @@ enum ETxTypes {
203203
TXTYPE_START_INTERNAL_SCAN = 36 [(TxTypeOpts) = {Name: "TxStartInternalScan"}];
204204
TXTYPE_DATA_SHARING_START_SOURCE_CURSOR = 37 [(TxTypeOpts) = {Name: "TxDataSharingStartSourceCursor"}];
205205
TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}];
206+
TXTYPE_WRITE_PORTIONS_FINISHED = 39 [(TxTypeOpts) = {Name: "TxWritePortionsFinished"}];
207+
TXTYPE_WRITE_PORTIONS_FAILED = 40 [(TxTypeOpts) = {Name: "TxWritePortionsFailed"}];
206208
}

ydb/core/testlib/cs_helper.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class THelperSchemaless : public NCommon::THelper {
1515
void CreateTestOlapStore(TActorId sender, TString scheme);
1616
void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme);
1717
void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const;
18-
void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& expectedStatus = Ydb::StatusIds::SUCCESS) const;
18+
void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& expectedStatus = Ydb::StatusIds::SUCCESS) const;
1919

2020
virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const = 0;
2121
};

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,27 +105,52 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
105105
AFL_VERIFY(CommitSnapshot);
106106
Self->OperationsManager->AddTemporaryTxLink(op->GetLockId());
107107
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), *CommitSnapshot);
108+
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
108109
}
109110
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
110111
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
111112
}
112113
Self->SetupCompaction(pathIds);
113-
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
114114
}
115115

116116
TTxBlobsWritingFinished::TTxBlobsWritingFinished(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus,
117117
const std::shared_ptr<NOlap::IBlobsWritingAction>& writingActions, std::vector<TInsertedPortions>&& packs,
118-
const std::vector<TFailedWrite>& fails)
118+
const std::vector<TNoDataWrite>& noDataWrites)
119119
: TBase(self, "TTxBlobsWritingFinished")
120-
, PutBlobResult(writeStatus)
121120
, Packs(std::move(packs))
122121
, WritingActions(writingActions) {
123-
Y_UNUSED(PutBlobResult);
124-
for (auto&& i : fails) {
122+
for (auto&& i : noDataWrites) {
125123
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
126124
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)i.GetWriteMeta().GetWriteId());
127125
Results.emplace_back(std::move(ev), i.GetWriteMeta().GetSource(), op->GetCookie());
128126
}
129127
}
130128

129+
bool TTxBlobsWritingFailed::DoExecute(TTransactionContext& txc, const TActorContext& ctx) {
130+
for (auto&& pack : Packs) {
131+
const auto& writeMeta = pack.GetWriteMeta();
132+
AFL_VERIFY(!writeMeta.HasLongTxId());
133+
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
134+
Self->OperationsManager->AddTemporaryTxLink(op->GetLockId());
135+
Self->OperationsManager->AbortTransactionOnExecute(*Self, op->GetLockId(), txc);
136+
137+
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), op->GetLockId(),
138+
NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "cannot write blob: " + ::ToString(PutBlobResult));
139+
Results.emplace_back(std::move(ev), writeMeta.GetSource(), op->GetCookie());
140+
}
141+
return true;
142+
}
143+
144+
void TTxBlobsWritingFailed::DoComplete(const TActorContext& ctx) {
145+
for (auto&& i : Results) {
146+
i.DoSendReply(ctx);
147+
Self->Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::PutBlob);
148+
}
149+
for (auto&& pack : Packs) {
150+
const auto& writeMeta = pack.GetWriteMeta();
151+
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
152+
Self->OperationsManager->AbortTransactionOnComplete(*Self, op->GetLockId());
153+
}
154+
}
155+
131156
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.h

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ class TColumnShard;
1515
class TTxBlobsWritingFinished: public NOlap::NDataSharing::TExtendedTransactionBase<TColumnShard> {
1616
private:
1717
using TBase = NOlap::NDataSharing::TExtendedTransactionBase<TColumnShard>;
18-
const NKikimrProto::EReplyStatus PutBlobResult;
1918
std::vector<TInsertedPortions> Packs;
2019
const std::shared_ptr<NOlap::IBlobsWritingAction> WritingActions;
2120
std::optional<NOlap::TSnapshot> CommitSnapshot;
@@ -43,12 +42,52 @@ class TTxBlobsWritingFinished: public NOlap::NDataSharing::TExtendedTransactionB
4342
public:
4443
TTxBlobsWritingFinished(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus,
4544
const std::shared_ptr<NOlap::IBlobsWritingAction>& writingActions, std::vector<TInsertedPortions>&& packs,
46-
const std::vector<TFailedWrite>& fails);
45+
const std::vector<TNoDataWrite>& noDataWrites);
46+
47+
virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override;
48+
virtual void DoComplete(const TActorContext& ctx) override;
49+
TTxType GetTxType() const override {
50+
return TXTYPE_WRITE_PORTIONS_FINISHED;
51+
}
52+
};
53+
54+
class TTxBlobsWritingFailed: public NOlap::NDataSharing::TExtendedTransactionBase<TColumnShard> {
55+
private:
56+
using TBase = NOlap::NDataSharing::TExtendedTransactionBase<TColumnShard>;
57+
const NKikimrProto::EReplyStatus PutBlobResult;
58+
std::vector<TInsertedPortions> Packs;
59+
60+
class TReplyInfo {
61+
private:
62+
std::unique_ptr<NActors::IEventBase> Event;
63+
TActorId DestinationForReply;
64+
const ui64 Cookie;
65+
66+
public:
67+
TReplyInfo(std::unique_ptr<NActors::IEventBase>&& ev, const TActorId& destinationForReply, const ui64 cookie)
68+
: Event(std::move(ev))
69+
, DestinationForReply(destinationForReply)
70+
, Cookie(cookie) {
71+
}
72+
73+
void DoSendReply(const TActorContext& ctx) {
74+
ctx.Send(DestinationForReply, Event.release(), 0, Cookie);
75+
}
76+
};
77+
78+
std::vector<TReplyInfo> Results;
79+
80+
public:
81+
TTxBlobsWritingFailed(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus, std::vector<TInsertedPortions>&& packs)
82+
: TBase(self)
83+
, PutBlobResult(writeStatus)
84+
, Packs(std::move(packs)) {
85+
}
4786

4887
virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override;
4988
virtual void DoComplete(const TActorContext& ctx) override;
5089
TTxType GetTxType() const override {
51-
return TXTYPE_WRITE;
90+
return TXTYPE_WRITE_PORTIONS_FAILED;
5291
}
5392
};
5493

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,20 +91,34 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
9191
TMemoryProfileGuard mpg("TEvWritePortionResult");
9292
NActors::TLogContextGuard gLogging =
9393
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("event", "TEvWritePortionResult");
94-
AFL_VERIFY(ev->Get()->GetWriteStatus() == NKikimrProto::OK);
95-
std::vector<TInsertedPortions> writtenPacks = ev->Get()->DetachInsertedPacks();
96-
std::vector<TFailedWrite> fails = ev->Get()->DetachFails();
97-
const TMonotonic now = TMonotonic::Now();
98-
for (auto&& i : writtenPacks) {
99-
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
94+
std::vector<TNoDataWrite> noDataWrites = ev->Get()->DetachNoDataWrites();
95+
for (auto&& i : noDataWrites) {
10096
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
10197
}
102-
for (auto&& i : fails) {
103-
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
98+
if (ev->Get()->GetWriteStatus() == NKikimrProto::OK) {
99+
std::vector<TInsertedPortions> writtenPacks = ev->Get()->DetachInsertedPacks();
100+
const TMonotonic now = TMonotonic::Now();
101+
for (auto&& i : writtenPacks) {
102+
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
103+
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
104+
}
105+
Execute(new TTxBlobsWritingFinished(
106+
this, ev->Get()->GetWriteStatus(), ev->Get()->GetWriteAction(), std::move(writtenPacks), std::move(noDataWrites)),
107+
ctx);
108+
} else {
109+
if (noDataWrites.size()) {
110+
Execute(new TTxBlobsWritingFinished(this, NKikimrProto::OK, ev->Get()->GetWriteAction(), {}, std::move(noDataWrites)), ctx);
111+
}
112+
113+
std::vector<TInsertedPortions> writtenPacks = ev->Get()->DetachInsertedPacks();
114+
const TMonotonic now = TMonotonic::Now();
115+
for (auto&& i : writtenPacks) {
116+
Counters.OnWritePutBlobsFailed(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
117+
Counters.GetCSCounters().OnWritePutBlobsFail(now - i.GetWriteMeta().GetWriteStartInstant());
118+
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
119+
}
120+
Execute(new TTxBlobsWritingFailed(this, ev->Get()->GetWriteStatus(), std::move(writtenPacks)), ctx);
104121
}
105-
Execute(
106-
new TTxBlobsWritingFinished(this, ev->Get()->GetWriteStatus(), ev->Get()->GetWriteAction(), std::move(writtenPacks), std::move(fails)),
107-
ctx);
108122
}
109123

110124
void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActorContext& ctx) {

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class TTxRemoveSharedBlobs;
9999
class TOperationsManager;
100100
class TWaitEraseTablesTxSubscriber;
101101
class TTxBlobsWritingFinished;
102+
class TTxBlobsWritingFailed;
102103

103104
namespace NLoading {
104105
class TInsertTableInitializer;
@@ -165,6 +166,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
165166
friend class TTxPlanStep;
166167
friend class TTxWrite;
167168
friend class TTxBlobsWritingFinished;
169+
friend class TTxBlobsWritingFailed;
168170
friend class TTxReadBase;
169171
friend class TTxRead;
170172
friend class TTxWriteIndex;

ydb/core/tx/columnshard/counters/counters_manager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ class TCountersManager {
9292
TabletCounters->OnWritePutBlobsSuccess(rowsWritten);
9393
CSCounters.OnWritePutBlobsSuccess(d);
9494
}
95+
96+
void OnWritePutBlobsFailed(const TDuration d, const ui64 /*rowsWritten*/) const {
97+
TabletCounters->OnWriteFailure();
98+
CSCounters.OnWritePutBlobsFail(d);
99+
}
95100
};
96101

97102
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/hooks/abstract/abstract.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ class ICSController {
149149
const std::set<NOlap::TSnapshot>& /*snapshotsToSave*/, const std::set<NOlap::TSnapshot>& /*snapshotsToRemove*/) {
150150
}
151151

152+
virtual NKikimrProto::EReplyStatus OverrideBlobPutResultOnWrite(const NKikimrProto::EReplyStatus originalStatus) const {
153+
return originalStatus;
154+
}
155+
152156
ui64 GetMemoryLimitScanPortion() const {
153157
return DoGetMemoryLimitScanPortion(GetConfig().GetMemoryLimitScanPortion());
154158
}

ydb/core/tx/columnshard/hooks/testing/controller.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ class TController: public TReadOnlyController {
2323
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideTasksActualizationLag);
2424
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideReadTimeoutClean);
2525
YDB_ACCESSOR(std::optional<ui64>, OverrideMemoryLimitForPortionReading, 100);
26+
YDB_ACCESSOR_DEF(std::optional<NKikimrProto::EReplyStatus>, OverrideBlobPutResultOnWriteValue);
27+
2628
EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force;
2729

2830
YDB_ACCESSOR(std::optional<ui64>, OverrideReduceMemoryIntervalLimit, 1024);
@@ -202,6 +204,10 @@ class TController: public TReadOnlyController {
202204
}
203205

204206
public:
207+
virtual NKikimrProto::EReplyStatus OverrideBlobPutResultOnWrite(const NKikimrProto::EReplyStatus originalStatus) const override {
208+
return OverrideBlobPutResultOnWriteValue.value_or(originalStatus);
209+
}
210+
205211
const TAtomicCounter& GetIndexWriteControllerBrokeCount() const {
206212
return IndexWriteControllerBrokeCount;
207213
}

0 commit comments

Comments
 (0)