Skip to content

Merge double accelerations into 24-3 #11740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ struct TEvBlobStorage {
EvInplacePatch,
EvAssimilate,

EvGetQueuesInfo, // for debugging purposes

//
EvPutResult = EvPut + 512, /// 268 632 576
EvGetResult,
Expand All @@ -502,6 +504,8 @@ struct TEvBlobStorage {
EvInplacePatchResult,
EvAssimilateResult,

EvQueuesInfo, // for debugging purposes

// proxy <-> vdisk interface
EvVPut = EvPut + 2 * 512, /// 268 633 088
EvVGet,
Expand Down Expand Up @@ -869,6 +873,7 @@ struct TEvBlobStorage {
EvRunActor = EvPut + 15 * 512,
EvVMockCtlRequest,
EvVMockCtlResponse,
EvDelayedMessageWrapper,

// incremental huge blob keeper
EvIncrHugeInit = EvPut + 17 * 512,
Expand Down
36 changes: 36 additions & 0 deletions ydb/core/blobstorage/backpressure/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,39 @@
#define QLOG_DEBUG_S(marker, arg) QLOG_LOG_S(marker, NActors::NLog::PRI_DEBUG , arg)

LWTRACE_USING(BLOBSTORAGE_PROVIDER);

namespace NKikimr::NBsQueue {

// Special timer for debug purposes, which works with virtual time of TTestActorSystem
struct TActivationContextTimer {
TActivationContextTimer()
: CreationTimestamp(NActors::TActivationContext::Monotonic())
{}

double Passed() const {
return (NActors::TActivationContext::Monotonic() - CreationTimestamp).SecondsFloat();
}

TMonotonic CreationTimestamp;
};

struct TBSQueueTimer {
TBSQueueTimer(bool useActorSystemTime)
{
if (useActorSystemTime) {
Timer.emplace<TActivationContextTimer>();
} else {
Timer.emplace<THPTimer>();
}
}

std::variant<THPTimer, TActivationContextTimer> Timer;

double Passed() const {
return std::visit([](const auto& timer) -> double {
return timer.Passed();
}, Timer);
}
};

} // namespace NKikimr::NBsQueue
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/backpressure/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ IEventBase *TEventHolder::MakeErrorReply(NKikimrProto::EReplyStatus status, cons

void TEventHolder::SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId,
ui64 sequenceId, bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
const THPTimer& processingTimer) {
const TBSQueueTimer& processingTimer) {
// check that we are not discarded yet
Y_ABORT_UNLESS(Type != 0);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/backpressure/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class TEventHolder {

void SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId, ui64 sequenceId,
bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
const THPTimer& processingTimer);
const TBSQueueTimer& processingTimer);

void Discard();
};
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/backpressure/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace NKikimr::NBsQueue {

TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility)
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility, bool useActorSystemTime)
: Queues(bspctx)
, WindowSize(0)
, InFlightCost(0)
Expand All @@ -16,6 +16,7 @@ TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamic
, ClientId(clientId)
, BytesWaiting(0)
, InterconnectChannel(interconnectChannel)
, UseActorSystemTime(useActorSystemTime)
// use parent group visibility
, QueueWaitingItems(counters->GetCounter("QueueWaitingItems", false, visibility))
, QueueWaitingBytes(counters->GetCounter("QueueWaitingBytes", false, visibility))
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/blobstorage/backpressure/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ class TBlobStorageQueue {
const ui64 QueueCookie;
ui64 Cost;
bool DirtyCost;
THPTimer ProcessingTimer;
TBSQueueTimer ProcessingTimer;

TTrackableList<TItem>::iterator Iterator;

template<typename TEvent>
TItem(TAutoPtr<TEventHandle<TEvent>>& event, TInstant deadline,
const ::NMonitoring::TDynamicCounters::TCounterPtr& serItems,
const ::NMonitoring::TDynamicCounters::TCounterPtr& serBytes,
const TBSProxyContextPtr& bspctx, ui32 interconnectChannel,
bool local)
bool local, bool useActorSystemTime)
: Queue(EItemQueue::NotSet)
, CostEssence(*event->Get())
, Span(TWilson::VDiskTopLevel, std::move(event->TraceId), "Backpressure.InFlight")
Expand All @@ -70,6 +71,7 @@ class TBlobStorageQueue {
, QueueCookie(RandomNumber<ui64>())
, Cost(0)
, DirtyCost(true)
, ProcessingTimer(useActorSystemTime)
{
if (Span) {
Span
Expand Down Expand Up @@ -129,6 +131,8 @@ class TBlobStorageQueue {

const ui32 InterconnectChannel;

const bool UseActorSystemTime;

public:
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingItems;
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingBytes;
Expand Down Expand Up @@ -156,7 +160,8 @@ class TBlobStorageQueue {
TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
const TBlobStorageGroupType &gType,
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public);
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public,
bool useActorSystemTime = false);

~TBlobStorageQueue();

Expand Down Expand Up @@ -213,7 +218,8 @@ class TBlobStorageQueue {
TItemList::iterator newIt;
if (Queues.Unused.empty()) {
newIt = Queues.Waiting.emplace(Queues.Waiting.end(), event, deadline,
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local);
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local,
UseActorSystemTime);
++*QueueSize;
} else {
newIt = Queues.Unused.begin();
Expand All @@ -222,7 +228,7 @@ class TBlobStorageQueue {
TItem& item = *newIt;
item.~TItem();
new(&item) TItem(event, deadline, QueueSerializedItems, QueueSerializedBytes, BSProxyCtx,
InterconnectChannel, local);
InterconnectChannel, local, UseActorSystemTime);
}

newIt->Iterator = newIt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
ui32 interconnectChannel, bool /*local*/, TDuration watchdogTimeout,
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility)
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
bool useActorSystemTime)
: BSProxyCtx(bspctx)
, QueueName(queueName)
, Counters(counters->GetSubgroup("queue", queueName))
, Queue(Counters, LogPrefix, bspctx, clientId, interconnectChannel,
(info ? info->Type : TErasureType::ErasureNone), visibility)
(info ? info->Type : TErasureType::ErasureNone), visibility, useActorSystemTime)
, VDiskIdShort(vdiskId)
, QueueId(queueId)
, QueueWatchdogTimeout(watchdogTimeout)
Expand Down Expand Up @@ -975,9 +976,10 @@ IActor* CreateVDiskBackpressureClient(const TIntrusivePtr<TBlobStorageGroupInfo>
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility) {
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
bool useActorSystemTime) {
return new NBsQueue::TVDiskBackpressureClientActor(info, vdiskId, queueId, counters, bspctx, clientId, queueName,
interconnectChannel, local, watchdogTimeout, flowRecord, visibility);
interconnectChannel, local, watchdogTimeout, flowRecord, visibility, useActorSystemTime);
}

} // NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace NKikimr {
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility);
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
bool useActorSystemTime = false);

} // NKikimr
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/common/defs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#pragma once

#include <ydb/core/blobstorage/defs.h>
#include <util/datetime/base.h>
14 changes: 14 additions & 0 deletions ydb/core/blobstorage/common/immediate_control_defaults.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "immediate_control_defaults.h"

namespace NKikimr {

TControlWrapper SlowDiskThresholdDefaultControl =
TControlWrapper(std::round(DefaultSlowDiskThreshold * 1000), 1, 1'000'000);

TControlWrapper PredictedDelayMultiplierDefaultControl =
TControlWrapper(std::round(DefaultPredictedDelayMultiplier * 1000), 0, 1'000'000);

TControlWrapper MaxNumOfSlowDisksDefaultControl =
TControlWrapper(DefaultMaxNumOfSlowDisks, 1, 2);

} // namespace NKikimr
18 changes: 18 additions & 0 deletions ydb/core/blobstorage/common/immediate_control_defaults.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include "defs.h"
#include <ydb/core/control/immediate_control_board_wrapper.h>

namespace NKikimr {

constexpr bool DefaultEnablePutBatching = true;
constexpr bool DefaultEnableVPatch = false;

constexpr float DefaultSlowDiskThreshold = 2;
constexpr float DefaultPredictedDelayMultiplier = 1;
constexpr ui32 DefaultMaxNumOfSlowDisks = 2;

extern TControlWrapper SlowDiskThresholdDefaultControl;
extern TControlWrapper PredictedDelayMultiplierDefaultControl;
extern TControlWrapper MaxNumOfSlowDisksDefaultControl;
}
11 changes: 11 additions & 0 deletions ydb/core/blobstorage/common/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY()

PEERDIR(
ydb/core/base
)

SRCS(
immediate_control_defaults.cpp
)

END()
Loading
Loading