Skip to content

Commit 4e677a5

Browse files
timeout for shard data writing (#10275)
1 parent 82aa854 commit 4e677a5

File tree

3 files changed

+61
-26
lines changed

3 files changed

+61
-26
lines changed

ydb/core/tx/data_events/shard_writer.cpp

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ namespace NKikimr::NEvWrite {
4242
}
4343

4444
TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
45-
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite)
45+
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite,
46+
const std::optional<TDuration> timeout
47+
)
4648
: ShardId(shardId)
4749
, WritePartIdx(writePartIdx)
4850
, TableId(tableId)
@@ -54,6 +56,7 @@ namespace NKikimr::NEvWrite {
5456
, ActorSpan(parentSpan.BuildChildrenSpan("ShardWriter"))
5557
, ModificationType(mType)
5658
, ImmediateWrite(immediateWrite)
59+
, Timeout(timeout)
5760
{
5861
}
5962

@@ -71,6 +74,9 @@ namespace NKikimr::NEvWrite {
7174

7275
void TShardWriter::Bootstrap() {
7376
SendWriteRequest();
77+
if (Timeout) {
78+
Schedule(*Timeout, new TEvents::TEvWakeup(1));
79+
}
7480
Become(&TShardWriter::StateMain);
7581
}
7682

@@ -138,16 +144,25 @@ namespace NKikimr::NEvWrite {
138144
}
139145
}
140146

141-
void TShardWriter::HandleTimeout(const TActorContext& /*ctx*/) {
142-
RetryWriteRequest(false);
147+
void TShardWriter::Handle(NActors::TEvents::TEvWakeup::TPtr& ev) {
148+
if (ev->Get()->Tag) {
149+
auto gPassAway = PassAwayGuard();
150+
ExternalController->OnFail(Ydb::StatusIds::TIMEOUT, TStringBuilder()
151+
<< "Cannot write data (TIMEOUT) into shard " << ShardId << " in longTx "
152+
<< ExternalController->GetLongTxId().ToString());
153+
ExternalController->GetCounters()->OnGlobalTimeout();
154+
} else {
155+
ExternalController->GetCounters()->OnRetryTimeout();
156+
RetryWriteRequest(false);
157+
}
143158
}
144159

145160
bool TShardWriter::RetryWriteRequest(const bool delayed) {
146161
if (NumRetries >= MaxRetriesPerShard) {
147162
return false;
148163
}
149164
if (delayed) {
150-
Schedule(OverloadTimeout(), new TEvents::TEvWakeup());
165+
Schedule(OverloadTimeout(), new TEvents::TEvWakeup(0));
151166
} else {
152167
++NumRetries;
153168
SendWriteRequest();

ydb/core/tx/data_events/shard_writer.h

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
#pragma once
22

3-
#include "common/modification_type.h"
43
#include "events.h"
54
#include "shards_splitter.h"
65

7-
#include <ydb/library/accessor/accessor.h>
6+
#include "common/modification_type.h"
7+
88
#include <ydb/core/base/tablet_pipecache.h>
9+
#include <ydb/core/tx/columnshard/counters/common/owner.h>
910
#include <ydb/core/tx/long_tx_service/public/events.h>
11+
12+
#include <ydb/library/accessor/accessor.h>
1013
#include <ydb/library/actors/core/actor_bootstrapped.h>
1114
#include <ydb/library/actors/wilson/wilson_profile_span.h>
12-
#include <ydb/core/tx/columnshard/counters/common/owner.h>
13-
1415

1516
namespace NKikimr::NEvWrite {
1617

@@ -19,6 +20,7 @@ class TWriteIdForShard {
1920
YDB_READONLY(ui64, ShardId, 0);
2021
YDB_READONLY(ui64, WriteId, 0);
2122
YDB_READONLY(ui64, WritePartId, 0);
23+
2224
public:
2325
TWriteIdForShard() = default;
2426
TWriteIdForShard(const ui64 shardId, const ui64 writeId, const ui32 writePartId)
@@ -40,6 +42,9 @@ class TCSUploadCounters: public NColumnShard::TCommonCountersOwner {
4042
NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
4143
NMonitoring::TDynamicCounters::TCounterPtr BytesCount;
4244
NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
45+
NMonitoring::TDynamicCounters::TCounterPtr GlobalTimeoutCount;
46+
NMonitoring::TDynamicCounters::TCounterPtr RetryTimeoutCount;
47+
4348
public:
4449
TCSUploadCounters()
4550
: TBase("CSUpload")
@@ -51,8 +56,18 @@ class TCSUploadCounters: public NColumnShard::TCommonCountersOwner {
5156
, RowsDistribution(TBase::GetHistogram("Requests/Rows", NMonitoring::ExponentialHistogram(15, 2, 16)))
5257
, RowsCount(TBase::GetDeriviative("Rows"))
5358
, BytesCount(TBase::GetDeriviative("Bytes"))
54-
, FailsCount(TBase::GetDeriviative("Fails")) {
59+
, FailsCount(TBase::GetDeriviative("Fails"))
60+
, GlobalTimeoutCount(TBase::GetDeriviative("GlobalTimeouts"))
61+
, RetryTimeoutCount(TBase::GetDeriviative("RetryTimeouts"))
62+
{
63+
}
64+
65+
void OnGlobalTimeout() const {
66+
GlobalTimeoutCount->Inc();
67+
}
5568

69+
void OnRetryTimeout() const {
70+
RetryTimeoutCount->Inc();
5671
}
5772

5873
void OnRequest(const ui64 rows, const ui64 bytes) const {
@@ -110,6 +125,7 @@ class TWritersController {
110125
LongTxActorId.Send(NLongTxService::MakeLongTxServiceID(LongTxActorId.NodeId()), req.Release());
111126
}
112127
}
128+
113129
public:
114130
using TPtr = std::shared_ptr<TWritersController>;
115131

@@ -131,10 +147,10 @@ class TWritersController {
131147
, Issues(issues) {
132148
}
133149
};
134-
135150
};
136151

137-
TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite);
152+
TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId,
153+
const bool immediateWrite);
138154
void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId);
139155
void OnFail(const Ydb::StatusIds::StatusCode code, const TString& message);
140156
};
@@ -158,40 +174,43 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {
158174
NWilson::TProfileSpan ActorSpan;
159175
EModificationType ModificationType;
160176
const bool ImmediateWrite = false;
177+
const std::optional<TDuration> Timeout;
161178

162179
void SendWriteRequest();
163180
static TDuration OverloadTimeout() {
164181
return TDuration::MilliSeconds(OverloadedDelayMs);
165182
}
166183
void SendToTablet(THolder<IEventBase> event) {
167-
Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), ShardId, true),
168-
IEventHandle::FlagTrackDelivery, 0, ActorSpan.GetTraceId());
184+
Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), ShardId, true), IEventHandle::FlagTrackDelivery, 0,
185+
ActorSpan.GetTraceId());
169186
}
170187
virtual void PassAway() override {
171188
Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0));
172189
TBase::PassAway();
173190
}
191+
174192
public:
175193
TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
176194
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx,
177-
const EModificationType mType, const bool immediateWrite);
195+
const EModificationType mType, const bool immediateWrite, const std::optional<TDuration> timeout = std::nullopt);
178196

179197
STFUNC(StateMain) {
180198
switch (ev->GetTypeRewrite()) {
181199
hFunc(TEvColumnShard::TEvWriteResult, Handle);
182200
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
183201
hFunc(NEvents::TDataEvents::TEvWriteResult, Handle);
184-
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
202+
hFunc(NActors::TEvents::TEvWakeup, Handle);
185203
}
186204
}
187205

188206
void Bootstrap();
189207

208+
void Handle(NActors::TEvents::TEvWakeup::TPtr& ev);
190209
void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev);
191210
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev);
192211
void Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev);
193-
void HandleTimeout(const TActorContext& ctx);
212+
194213
private:
195214
bool RetryWriteRequest(const bool delayed = true);
196215
};
197-
}
216+
} // namespace NKikimr::NEvWrite

ydb/core/tx/tx_proxy/rpc_long_tx.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/core/formats/arrow/size_calcer.h>
44
#include <ydb/core/tx/columnshard/columnshard.h>
5+
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
56
#include <ydb/core/tx/data_events/shard_writer.h>
67
#include <ydb/core/tx/long_tx_service/public/events.h>
78
#include <ydb/core/tx/schemeshard/schemeshard.h>
@@ -21,7 +22,8 @@ using namespace NLongTxService;
2122
// Common logic of LongTx Write that takes care of splitting the data according to the sharding scheme,
2223
// sending it to shards and collecting their responses
2324
template <class TLongTxWriteImpl>
24-
class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
25+
class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl>,
26+
NColumnShard::TMonitoringObjectsCounter<TLongTxWriteBase<TLongTxWriteImpl>> {
2527
using TBase = TActorBootstrapped<TLongTxWriteImpl>;
2628
static inline TAtomicCounter MemoryInFlight = 0;
2729

@@ -37,8 +39,7 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
3739
, Path(path)
3840
, DedupId(dedupId)
3941
, LongTxId(longTxId)
40-
, ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max<ui32>()), "TLongTxWriteBase")
41-
{
42+
, ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max<ui32>()), "TLongTxWriteBase") {
4243
if (token) {
4344
UserToken.emplace(token);
4445
}
@@ -95,7 +96,8 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
9596
accessor.reset();
9697

9798
const auto& splittedData = shardsSplitter->GetSplitData();
98-
InternalController = std::make_shared<NEvWrite::TWritersController>(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite);
99+
InternalController =
100+
std::make_shared<NEvWrite::TWritersController>(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite);
99101
ui32 sumBytes = 0;
100102
ui32 rowsCount = 0;
101103
ui32 writeIdx = 0;
@@ -104,9 +106,9 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
104106
InternalController->GetCounters()->OnRequest(shardInfo->GetRowsCount(), shardInfo->GetBytes());
105107
sumBytes += shardInfo->GetBytes();
106108
rowsCount += shardInfo->GetRowsCount();
107-
this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo,
108-
ActorSpan, InternalController,
109-
++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite));
109+
this->Register(
110+
new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo,
111+
ActorSpan, InternalController, ++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite, TDuration::Seconds(20)));
110112
}
111113
}
112114
pSpan.Attribute("affected_shards_count", (long)splittedData.GetShardsInfo().size());
@@ -235,8 +237,7 @@ class TLongTxWriteInternal: public TLongTxWriteBase<TLongTxWriteInternal> {
235237
, ReplyTo(replyTo)
236238
, NavigateResult(navigateResult)
237239
, Batch(batch)
238-
, Issues(issues)
239-
{
240+
, Issues(issues) {
240241
Y_ABORT_UNLESS(Issues);
241242
DataAccessor = std::make_unique<TParsedBatchData>(Batch);
242243
}

0 commit comments

Comments
 (0)