Skip to content

Commit 24c8b75

Browse files
authored
KIKIMR-22403 fix s3 client thread pool creation (#12963)
1 parent aa12ad5 commit 24c8b75

File tree

1 file changed

+38
-37
lines changed

1 file changed

+38
-37
lines changed

ydb/core/wrappers/s3_storage_config.cpp

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,42 @@
66
#ifndef KIKIMR_DISABLE_S3_OPS
77
namespace NKikimr::NWrappers::NExternalStorage {
88

9+
class TS3ThreadsPoolByEndpoint {
10+
private:
11+
class TPool {
12+
public:
13+
std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> Executor;
14+
ui32 ThreadsCount = 0;
15+
16+
TPool(const std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>& executor, const ui32 threadsCount)
17+
: Executor(executor)
18+
, ThreadsCount(threadsCount)
19+
{
20+
}
21+
};
22+
23+
THashMap<TString, TPool> Pools;
24+
TMutex Mutex;
25+
26+
std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> GetPoolImpl(const TString& endpoint, const ui32 threadsCount) {
27+
TGuard<TMutex> g(Mutex);
28+
auto it = Pools.find(endpoint);
29+
if (it == Pools.end()) {
30+
TPool pool(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(threadsCount), threadsCount);
31+
it = Pools.emplace(endpoint, std::move(pool)).first;
32+
} else if (it->second.ThreadsCount < threadsCount) {
33+
TPool pool(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(threadsCount), threadsCount);
34+
it->second = std::move(pool);
35+
}
36+
return it->second.Executor;
37+
}
38+
39+
public:
40+
static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> GetPool(const TString& endpoint, const ui32 threadsCount) {
41+
return Singleton<TS3ThreadsPoolByEndpoint>()->GetPoolImpl(endpoint, threadsCount);
42+
}
43+
};
44+
945
namespace {
1046

1147
namespace NPrivate {
@@ -18,7 +54,8 @@ Aws::Client::ClientConfiguration ConfigFromSettings(const TSettings& settings) {
1854
auto threadsCount = NKikimrSchemeOp::TS3Settings::default_instance().GetExecutorThreadsCount();
1955

2056
config.endpointOverride = settings.endpoint();
21-
config.executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(threadsCount);
57+
config.executor = TS3ThreadsPoolByEndpoint::GetPool(settings.endpoint(), threadsCount);
58+
config.enableTcpKeepAlive = true;
2259
config.verifySSL = false;
2360
config.connectTimeoutMs = 10000;
2461
config.maxConnections = threadsCount;
@@ -46,42 +83,6 @@ Aws::Auth::AWSCredentials CredentialsFromSettings(const TSettings& settings) {
4683

4784
} // anonymous
4885

49-
class TS3ThreadsPoolByEndpoint {
50-
private:
51-
class TPool {
52-
public:
53-
std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> Executor;
54-
ui32 ThreadsCount = 0;
55-
56-
TPool(const std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>& executor, const ui32 threadsCount)
57-
: Executor(executor)
58-
, ThreadsCount(threadsCount)
59-
{
60-
}
61-
};
62-
63-
THashMap<TString, TPool> Pools;
64-
TMutex Mutex;
65-
66-
std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> GetPoolImpl(const TString& endpoint, const ui32 threadsCount) {
67-
TGuard<TMutex> g(Mutex);
68-
auto it = Pools.find(endpoint);
69-
if (it == Pools.end()) {
70-
TPool pool(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(threadsCount), threadsCount);
71-
it = Pools.emplace(endpoint, std::move(pool)).first;
72-
} else if (it->second.ThreadsCount < threadsCount) {
73-
TPool pool(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(threadsCount), threadsCount);
74-
it->second = std::move(pool);
75-
}
76-
return it->second.Executor;
77-
}
78-
79-
public:
80-
static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> GetPool(const TString& endpoint, const ui32 threadsCount) {
81-
return Singleton<TS3ThreadsPoolByEndpoint>()->GetPoolImpl(endpoint, threadsCount);
82-
}
83-
};
84-
8586
Aws::Client::ClientConfiguration TS3ExternalStorageConfig::ConfigFromSettings(const NKikimrSchemeOp::TS3Settings& settings) {
8687
Aws::Client::ClientConfiguration config;
8788

0 commit comments

Comments
 (0)