Skip to content

[WIP][feat][client] Support collect fs metrics to mdsv2 #224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/blockaccess/block_accesser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static bvar::Adder<uint64_t> block_get_async_num("block_get_async_num");
static bvar::Adder<uint64_t> block_get_sync_num("block_get_sync_num");

using dingofs::utils::kMB;
using stub::metric::ObjectMetric;

Status BlockAccesserImpl::Init() {
if (options_.type == AccesserType::kS3) {
Expand Down Expand Up @@ -106,6 +107,11 @@ Status BlockAccesserImpl::Put(const std::string& key, const char* buffer,
(s.ok() ? "ok" : "fail"));
});

// object storage write metrics
BlockAccessMetricGuard metric_guard(&s,
&ObjectMetric::GetInstance().write_object,
length, butil::cpuwide_time_us());

block_put_sync_num << 1;

auto dec = ::absl::MakeCleanup([&]() { block_put_sync_num << -1; });
Expand All @@ -131,6 +137,11 @@ void BlockAccesserImpl::AsyncPut(
return absl::StrFormat("async_put_block (%s, %d) : %d", ctx->key,
ctx->buffer_size, ctx->ret_code);
});
// object storage write metrics
Status s = (ctx->ret_code == 0 ? Status::OK() : Status::Unknown(""));
BlockAccessMetricGuard metric_guard(
&s, &ObjectMetric::GetInstance().write_object, ctx->buffer_size,
start_us);

block_put_async_num << -1;
inflight_bytes_throttle_->OnComplete(ctx->buffer_size);
Expand Down Expand Up @@ -173,6 +184,11 @@ Status BlockAccesserImpl::Get(const std::string& key, std::string* data) {
(s.ok() ? "ok" : "fail"));
});

// object storage read metrics
BlockAccessMetricGuard metric_guard(&s,
&ObjectMetric::GetInstance().read_object,
data->length(), butil::cpuwide_time_us());

block_get_sync_num << 1;
auto dec = ::absl::MakeCleanup([&]() { block_get_sync_num << -1; });

Expand All @@ -198,6 +214,11 @@ void BlockAccesserImpl::AsyncGet(
ctx->offset, ctx->len, ctx->ret_code);
});

// object storage read metrics
Status s = (ctx->ret_code == 0 ? Status::OK() : Status::Unknown(""));
BlockAccessMetricGuard metric_guard(
&s, &ObjectMetric::GetInstance().read_object, ctx->len, start_us);

block_get_async_num << -1;
inflight_bytes_throttle_->OnComplete(ctx->len);

Expand All @@ -223,6 +244,11 @@ Status BlockAccesserImpl::Range(const std::string& key, off_t offset,
(s.ok() ? "ok" : "fail"));
});

// object storage read metrics
BlockAccessMetricGuard metric_guard(&s,
&ObjectMetric::GetInstance().read_object,
length, butil::cpuwide_time_us());

block_get_sync_num << 1;
auto dec = ::absl::MakeCleanup([&]() { block_get_sync_num << -1; });

Expand Down
25 changes: 24 additions & 1 deletion src/blockaccess/block_accesser.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "blockaccess/accesser.h"
#include "blockaccess/accesser_common.h"
#include "common/status.h"
#include "stub/metric/metric.h"
#include "utils/throttle.h"

namespace dingofs {
Expand All @@ -38,7 +39,7 @@ enum class RetryStrategy : uint8_t {
};

using RetryCallback = std::function<RetryStrategy(int code)>;

using ::dingofs::stub::metric::InterfaceMetric;
// BlockAccesser is a class that provides a way to access block from a data
// source. It is a base class for all data access classes.
class BlockAccesser {
Expand Down Expand Up @@ -153,6 +154,28 @@ class BlockAccesserImpl : public BlockAccesser {
std::unique_ptr<AsyncRequestInflightBytesThrottle> inflight_bytes_throttle_;
};

struct BlockAccessMetricGuard {
explicit BlockAccessMetricGuard(Status* status, InterfaceMetric* metric,
size_t count, uint64_t start)
: status_(status), metric_(metric), count_(count), start_(start) {}
~BlockAccessMetricGuard() {
if (status_->ok()) {
metric_->bps.count << count_;
metric_->qps.count << 1;
auto duration = butil::cpuwide_time_us() - start_;
metric_->latency << duration;
metric_->latTotal << duration;
} else {
metric_->eps.count << 1;
}
}

Status* status_;
InterfaceMetric* metric_;
size_t count_;
uint64_t start_;
};

using BlockAccesserSPtr = std::shared_ptr<BlockAccesser>;
using BlockAccesserUPtr = std::unique_ptr<BlockAccesser>;

Expand Down
27 changes: 3 additions & 24 deletions src/blockaccess/s3/s3_accesser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
namespace dingofs {
namespace blockaccess {

using stub::metric::MetricGuard;
using stub::metric::S3Metric;

bool S3Accesser::Init() {
const auto& s3_info = options_.s3_info;
LOG(INFO) << fmt::format(
Expand Down Expand Up @@ -57,12 +54,7 @@ bool S3Accesser::ContainerExist() { return client_->BucketExist(); }

Status S3Accesser::Put(const std::string& key, const char* buffer,
size_t length) {
int rc = 0;
// write s3 metrics
auto start = butil::cpuwide_time_us();
MetricGuard guard(&rc, &S3Metric::GetInstance().write_s3, length, start);

rc = client_->PutObject(S3Key(key), buffer, length);
int rc = client_->PutObject(S3Key(key), buffer, length);
if (rc < 0) {
LOG(ERROR) << fmt::format("[accesser] put object({}) fail, retcode: {}.",
key, rc);
Expand All @@ -77,21 +69,14 @@ void S3Accesser::AsyncPut(std::shared_ptr<PutObjectAsyncContext> context) {
auto start_time = butil::cpuwide_time_us();
context->cb = [&, start_time,
origin_cb](const std::shared_ptr<PutObjectAsyncContext>& ctx) {
MetricGuard guard(&ctx->ret_code, &S3Metric::GetInstance().write_s3,
ctx->buffer_size, start_time);
ctx->cb = origin_cb;
ctx->cb(ctx);
};
client_->PutObjectAsync(context);
}

Status S3Accesser::Get(const std::string& key, std::string* data) {
int rc; // read s3 metrics
auto start = butil::cpuwide_time_us();
MetricGuard guard(&rc, &S3Metric::GetInstance().read_s3, data->length(),
start);

rc = client_->GetObject(S3Key(key), data);
int rc = client_->GetObject(S3Key(key), data);
if (rc < 0) {
if (!client_->ObjectExist(S3Key(key))) { // TODO: more efficient
LOG(WARNING) << fmt::format("[accesser] object({}) not found.", key);
Expand All @@ -108,11 +93,7 @@ Status S3Accesser::Get(const std::string& key, std::string* data) {

Status S3Accesser::Range(const std::string& key, off_t offset, size_t length,
char* buffer) {
int rc; // read s3 metrics
auto start = butil::cpuwide_time_us();
MetricGuard guard(&rc, &S3Metric::GetInstance().read_s3, length, start);

rc = client_->RangeObject(S3Key(key), buffer, offset, length);
int rc = client_->RangeObject(S3Key(key), buffer, offset, length);
if (rc < 0) {
if (!client_->ObjectExist(S3Key(key))) { // TODO: more efficient
LOG(WARNING) << fmt::format("[accesser] object({}) not found.", key);
Expand All @@ -132,8 +113,6 @@ void S3Accesser::AsyncGet(std::shared_ptr<GetObjectAsyncContext> context) {
auto start_time = butil::cpuwide_time_us();
context->cb = [&, start_time,
origin_cb](const std::shared_ptr<GetObjectAsyncContext>& ctx) {
MetricGuard guard(&ctx->ret_code, &S3Metric::GetInstance().read_s3,
ctx->len, start_time);
ctx->cb = origin_cb;
ctx->cb(ctx);
};
Expand Down
2 changes: 2 additions & 0 deletions src/client/vfs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ add_subdirectory(data)
add_subdirectory(handle)
add_subdirectory(hub)
add_subdirectory(meta)
add_subdirectory(statistics)

add_library(vfs_lib
vfs_impl.cpp
Expand All @@ -30,6 +31,7 @@ target_link_libraries(vfs_lib
vfs_data
vfs_hub
vfs_handle
vfs_statistics
glog::glog
gflags::gflags
fmt::fmt
Expand Down
29 changes: 26 additions & 3 deletions src/client/vfs/meta/v2/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
#include <fcntl.h>

#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "client/common/dynamic_config.h"
#include "client/vfs/meta/v2/client_id.h"
#include "client/vfs/statistics/fs_stats_manager.h"
#include "client/vfs/vfs_meta.h"
#include "common/status.h"
#include "dingofs/error.pb.h"
Expand All @@ -36,6 +39,10 @@ namespace client {
namespace vfs {
namespace v2 {

USING_FLAG(push_metric_interval_millsecond)

using FsStatsManager = dingofs::client::vfs::FsStatsManager;

const uint32_t kMaxHostNameLength = 255;

const uint32_t kMaxXAttrNameLength = 255;
Expand Down Expand Up @@ -133,12 +140,14 @@ std::string FileSessionMap::Get(uint64_t fh) {
MDSV2FileSystem::MDSV2FileSystem(mdsv2::FsInfoPtr fs_info,
const ClientId& client_id,
MDSDiscoveryPtr mds_discovery,
MDSClientPtr mds_client)
MDSClientPtr mds_client,
FsStatsManagerUPtr fs_stats_manager)
: name_(fs_info->GetName()),
client_id_(client_id),
fs_info_(fs_info),
mds_discovery_(mds_discovery),
mds_client_(mds_client) {}
mds_client_(mds_client),
fs_stats_manager_(std::move(fs_stats_manager)) {}

MDSV2FileSystem::~MDSV2FileSystem() {} // NOLINT

Expand Down Expand Up @@ -271,6 +280,8 @@ void MDSV2FileSystem::Heartbeat() {
}
}

void MDSV2FileSystem::PushFsStatsToMDS() { fs_stats_manager_->PushFsStats(); }

bool MDSV2FileSystem::InitCrontab() {
// Add heartbeat crontab
crontab_configs_.push_back({
Expand All @@ -280,6 +291,14 @@ bool MDSV2FileSystem::InitCrontab() {
[this](void*) { this->Heartbeat(); },
});

// Add push fs stats crontab
crontab_configs_.push_back({
"PUSH_FSSTATS",
FLAGS_push_metric_interval_millsecond,
true,
[this](void*) { this->PushFsStatsToMDS(); },
});

crontab_manager_.AddCrontab(crontab_configs_);

return true;
Expand Down Expand Up @@ -646,8 +665,12 @@ MDSV2FileSystemUPtr MDSV2FileSystem::Build(const std::string& fs_name,
return nullptr;
}

// create fs stats manager
auto fs_stats_manager = FsStatsManager::New(fs_name, mds_client);

// create filesystem
return MDSV2FileSystem::New(fs_info, client_id, mds_discovery, mds_client);
return MDSV2FileSystem::New(fs_info, client_id, mds_discovery, mds_client,
std::move(fs_stats_manager));
}

} // namespace v2
Expand Down
14 changes: 11 additions & 3 deletions src/client/vfs/meta/v2/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "client/vfs/meta/v2/client_id.h"
#include "client/vfs/meta/v2/mds_client.h"
#include "client/vfs/meta/v2/mds_discovery.h"
#include "client/vfs/statistics/fs_stats_manager.h"
#include "client/vfs/vfs_meta.h"
#include "common/status.h"
#include "dingofs/mdsv2.pb.h"
Expand Down Expand Up @@ -77,15 +78,18 @@ class FileSessionMap {
class MDSV2FileSystem : public vfs::MetaSystem {
public:
MDSV2FileSystem(mdsv2::FsInfoPtr fs_info, const ClientId& client_id,
MDSDiscoveryPtr mds_discovery, MDSClientPtr mds_client);
MDSDiscoveryPtr mds_discovery, MDSClientPtr mds_client,
FsStatsManagerUPtr fs_stats_manager);
~MDSV2FileSystem() override;

static MDSV2FileSystemUPtr New(mdsv2::FsInfoPtr fs_info,
const ClientId& client_id,
MDSDiscoveryPtr mds_discovery,
MDSClientPtr mds_client) {
MDSClientPtr mds_client,
FsStatsManagerUPtr fs_stats_manager) {
return std::make_unique<MDSV2FileSystem>(fs_info, client_id, mds_discovery,
mds_client);
mds_client,
std::move(fs_stats_manager));
}

static MDSV2FileSystemUPtr Build(const std::string& fs_name,
Expand Down Expand Up @@ -155,6 +159,8 @@ class MDSV2FileSystem : public vfs::MetaSystem {

void Heartbeat();

void PushFsStatsToMDS();

bool InitCrontab();

const std::string name_;
Expand All @@ -172,6 +178,8 @@ class MDSV2FileSystem : public vfs::MetaSystem {
std::vector<mdsv2::CrontabConfig> crontab_configs_;
// This is manage crontab, like heartbeat.
mdsv2::CrontabManager crontab_manager_;
// Fs stats manager
FsStatsManagerUPtr fs_stats_manager_;
};

} // namespace v2
Expand Down
19 changes: 19 additions & 0 deletions src/client/vfs/meta/v2/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,25 @@ Status MDSClient::GetFsQuota(FsStat& fs_stat) {
return Status::OK();
}

Status MDSClient::PushFsStatsToMDS(const std::string& fs_name,
const pb::mdsv2::FsStatsData& fs_stat_data) {
CHECK(fs_name != "") << "fs_name is invalid.";

pb::mdsv2::SetFsStatsRequest request;
pb::mdsv2::SetFsStatsResponse response;

request.set_fs_name(fs_name);
request.mutable_stats()->CopyFrom(fs_stat_data);

auto status =
rpc_->SendRequest("MDSService", "SetFsStats", request, response);
if (!status.ok()) {
return status;
}

return Status::OK();
}

bool MDSClient::UpdateRouter() {
pb::mdsv2::FsInfo new_fs_info;
auto status = MDSClient::GetFsInfo(rpc_, fs_info_->GetName(), new_fs_info);
Expand Down
3 changes: 3 additions & 0 deletions src/client/vfs/meta/v2/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ class MDSClient {

Status GetFsQuota(FsStat& fs_stat);

Status PushFsStatsToMDS(const std::string& fs_name,
const pb::mdsv2::FsStatsData& fs_stat_data);

private:
EndPoint GetEndpoint(Ino ino);
EndPoint GetEndpointByParent(int64_t parent);
Expand Down
23 changes: 23 additions & 0 deletions src/client/vfs/statistics/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright (c) 2025 dingodb.com, Inc. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(vfs_statistics
fs_stats_manager.cpp
)

target_link_libraries(vfs_statistics
dingofs_common
stub_metric
PROTO_OBJS
)
Loading