Skip to content

Commit 19e2682

Browse files
chuandewWine93
authored andcommitted
[feat][client] Add aws thread pool option and bthread work num option
1 parent c5da879 commit 19e2682

File tree

11 files changed

+69
-13
lines changed

11 files changed

+69
-13
lines changed

conf/client.conf

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ fuseClient.warmupThreadsNum=10
9696
# if no disk cache this option will be ignored
9797
fuseClient.in_time_warmup=false
9898

99+
fuseClient.bthread_worker_num=0
100+
99101
# the write throttle bps of fuseClient, default no limit
100102
fuseClient.throttle.avgWriteBytes=0
101103
# the write burst bps of fuseClient, default no limit
@@ -194,7 +196,6 @@ data_stream.slice.stay_in_memory_max_second=5
194196
data_stream.page.size=65536
195197
data_stream.page.total_size_mb=1024
196198
data_stream.page.use_pool=true
197-
data_stream.s3.async_upload_workers=256
198199
# }
199200

200201
#### block cache
@@ -276,6 +277,9 @@ s3.maxReadRetryIntervalMs = 1000
276277
# retry interval
277278
s3.readRetryIntervalMs = 100
278279

280+
s3.use_thread_pool=true
281+
s3.async_thread_num_in_thread_pool=256
282+
279283
s3.enableTelemetry=false
280284

281285
#### common

conf/mds.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ s3.requestTimeout=10000
123123
# Off = 0,Fatal = 1,Error = 2,Warn = 3,Info = 4,Debug = 5,Trace = 6
124124
s3.logLevel=4
125125
s3.logPrefix=/data/logs/dingofs/aws_ # __DINGOADM_TEMPLATE__ /dingofs/client/logs/aws_ __DINGOADM_TEMPLATE__
126-
data_stream.s3.async_upload_workers=30
126+
s3.async_thread_num_in_thread_pool=10
127127
# limit all inflight async requests' bytes, |0| means not limited
128128
s3.maxAsyncRequestInflightBytes=104857600
129129
# throttle

conf/metaserver.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ s3.requestTimeout=10000
2020
s3.logLevel=4
2121
s3.logPrefix=/tmp/dingofs/metaserver/aws_
2222
data_stream.s3.async_upload_workers=10
23+
s3.async_thread_num_in_thread_pool=10
2324
# throttle
2425
s3.throttle.iopsTotalLimit=0
2526
s3.throttle.iopsReadLimit=0

src/aws/s3_adapter.cpp

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,20 @@ void InitS3AdaptorOptionExceptS3InfoOption(Configuration* conf,
100100
!conf->GetIntValue("s3.connectTimeout", &s3_opt->connectTimeout));
101101
LOG_IF(FATAL,
102102
!conf->GetIntValue("s3.requestTimeout", &s3_opt->requestTimeout));
103-
LOG_IF(FATAL, !conf->GetIntValue("data_stream.s3.async_upload_workers",
103+
104+
if (!conf->GetBoolValue("s3.use_thread_pool", &s3_opt->use_thread_pool)) {
105+
LOG(INFO) << "Not found s3.use_thread_pool in conf, use default "
106+
<< (s3_opt->use_thread_pool ? "true" : "false");
107+
}
108+
109+
if (!conf->GetIntValue("s3.async_thread_num_in_thread_pool",
110+
&s3_opt->asyncThreadNum)) {
111+
LOG(INFO)
112+
<< "Not found s3.async_thread_num_in_thread_pool in conf, use default"
113+
<< s3_opt->asyncThreadNum;
114+
}
115+
116+
LOG_IF(FATAL, !conf->GetIntValue("s3.async_thread_num_in_thread_pool",
104117
&s3_opt->asyncThreadNum));
105118
LOG_IF(FATAL, !conf->GetUInt64Value("s3.throttle.iopsTotalLimit",
106119
&s3_opt->iopsTotalLimit));
@@ -173,11 +186,14 @@ void S3Adapter::Init(const S3AdapterOption& option) {
173186
clientCfg_->connectTimeoutMs = option.connectTimeout;
174187
clientCfg_->requestTimeoutMs = option.requestTimeout;
175188
clientCfg_->endpointOverride = s3Address_;
176-
int async_thread_num = option.asyncThreadNum;
177-
LOG(INFO) << "S3Adapter init thread num = " << async_thread_num << std::endl;
178-
clientCfg_->executor =
179-
Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(
180-
"S3Adapter.S3Client", async_thread_num);
189+
190+
if (option.use_thread_pool) {
191+
LOG(INFO) << "S3Adapter init async thread pool thread num = "
192+
<< option.asyncThreadNum;
193+
clientCfg_->executor =
194+
Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(
195+
"S3Adapter.S3Client", option.asyncThreadNum);
196+
}
181197

182198
if (option.enableTelemetry) {
183199
LOG(INFO) << "Enable telemetry for aws s3 adapter";

src/aws/s3_adapter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ struct S3AdapterOption {
8282
int maxConnections;
8383
int connectTimeout;
8484
int requestTimeout;
85-
int asyncThreadNum;
85+
bool use_thread_pool{true};
86+
int asyncThreadNum{256};
8687
uint64_t maxAsyncRequestInflightBytes;
8788
uint64_t iopsTotalLimit;
8889
uint64_t iopsReadLimit;

src/client/common/config.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,13 +429,19 @@ void InitFuseClientOption(Configuration* conf, FuseClientOption* clientOption) {
429429
conf->GetValueFatalIfFail("fuseClient.warmupThreadsNum",
430430
&clientOption->warmupThreadsNum);
431431

432-
if (!conf->GetBoolValue("fuseClient.in_time_warmup",
433-
&FLAGS_in_time_warmup)) {
432+
if (!conf->GetBoolValue("fuseClient.in_time_warmup", &FLAGS_in_time_warmup)) {
434433
LOG(INFO) << "Not found `fuseClient.in_time_warmup` in conf, default to "
435434
"false";
436435
FLAGS_in_time_warmup = false;
437436
}
438437

438+
if (!conf->GetIntValue("fuseClient.bthread_worker_num",
439+
&FLAGS_bthread_worker_num)) {
440+
FLAGS_bthread_worker_num = 0;
441+
LOG(INFO)
442+
<< "Not found `fuseClient.bthread_worker_num` in conf, default to 0";
443+
}
444+
439445
LOG_IF(WARNING, conf->GetBoolValue("fuseClient.enableSplice",
440446
&clientOption->enableFuseSplice))
441447
<< "Not found `fuseClient.enableSplice` in conf, use default value `"

src/client/common/dynamic_config.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ DEFINE_validator(s3_prefetch, &PassBool);
125125
DEFINE_bool(in_time_warmup, false, "in time warmup inode when enable");
126126
DEFINE_validator(in_time_warmup, &PassBool);
127127

128+
129+
DEFINE_int32(bthread_worker_num, 0, "bthread worker num");
130+
128131
} // namespace common
129132
} // namespace client
130133
} // namespace dingofs

src/client/common/dynamic_config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#define DINGOFS_SRC_CLIENT_COMMON_DYNAMIC_CONFIG_H_
2525

2626
#include <gflags/gflags.h>
27+
#include <gflags/gflags_declare.h>
2728

2829
namespace dingofs {
2930
namespace client {
@@ -80,6 +81,8 @@ DECLARE_bool(s3_prefetch);
8081

8182
DECLARE_bool(in_time_warmup);
8283

84+
DECLARE_int32(bthread_worker_num);
85+
8386
} // namespace common
8487
} // namespace client
8588
} // namespace dingofs

src/client/dingo_fuse_op.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@
2424

2525
#include <glog/logging.h>
2626

27+
#include <cstdint>
2728
#include <cstring>
2829
#include <memory>
2930
#include <string>
3031
#include <vector>
3132

3233
#include "aws/s3_access_log.h"
34+
#include "bthread/bthread.h"
3335
#include "client/blockcache/log.h"
3436
#include "client/common/common.h"
3537
#include "client/common/config.h"
38+
#include "client/common/dynamic_config.h"
3639
#include "client/filesystem/access_log.h"
3740
#include "client/filesystem/error.h"
3841
#include "client/filesystem/meta.h"
@@ -82,6 +85,10 @@ static FuseClient* g_client_instance = nullptr;
8285
static FuseClientOption* g_fuse_client_option = nullptr;
8386
static ClientOpMetric* g_clientOpMetric = nullptr;
8487

88+
namespace bthread {
89+
DECLARE_int32(bthread_concurrency);
90+
} // namespace bthread
91+
8592
namespace {
8693

8794
void InitFuseConnInfo(struct fuse_conn_info* conn, const FuseConnInfo& option) {
@@ -174,8 +181,8 @@ int InitLog(const char* conf_path, const char* argv0) {
174181

175182
bool succ = InitAccessLog(FLAGS_log_dir) &&
176183
InitBlockCacheLog(FLAGS_log_dir) &&
177-
dingofs::aws::InitS3AccessLog(FLAGS_log_dir)
178-
&& dingofs::stub::InitMetaAccessLog(FLAGS_log_dir);
184+
dingofs::aws::InitS3AccessLog(FLAGS_log_dir) &&
185+
dingofs::stub::InitMetaAccessLog(FLAGS_log_dir);
179186
if (!succ) {
180187
return -1;
181188
}
@@ -199,6 +206,13 @@ int InitFuseClient(const struct MountOption* mount_option) {
199206
g_fuse_client_option = new FuseClientOption();
200207
dingofs::client::common::InitFuseClientOption(&conf, g_fuse_client_option);
201208

209+
int32_t bthread_worker_num =
210+
dingofs::client::common::FLAGS_bthread_worker_num;
211+
if (bthread_worker_num > 0) {
212+
bthread::FLAGS_bthread_concurrency = bthread_worker_num;
213+
LOG(INFO) << "set bthread concurrency to " << bthread_worker_num;
214+
}
215+
202216
auto fs_info = std::make_shared<FsInfo>();
203217
if (GetFsInfo(mount_option->fsName, fs_info.get()) != 0) {
204218
return -1;

src/client/fuse_s3_client.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
#include "client/blockcache/block_cache.h"
2828
#include "client/blockcache/s3_client.h"
29+
#include "client/common/dynamic_config.h"
2930
#include "client/datastream/data_stream.h"
3031
#include "client/filesystem/meta.h"
3132
#include "client/kvclient/memcache_client.h"
@@ -566,6 +567,9 @@ DINGOFS_ERROR FuseS3Client::InitBrpcServer() {
566567
}
567568

568569
brpc::ServerOptions brpc_server_options;
570+
if (common::FLAGS_bthread_worker_num > 0) {
571+
brpc_server_options.num_threads = common::FLAGS_bthread_worker_num;
572+
}
569573

570574
uint32_t listen_port = 0;
571575
if (!StartBrpcServer(server_, &brpc_server_options,

0 commit comments

Comments
 (0)