Skip to content

Commit b14a4fb

Browse files
committed
feature: proxy connection pool (#978)
1 parent 6519e86 commit b14a4fb

File tree

5 files changed

+206
-4
lines changed

5 files changed

+206
-4
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# vim temp files
2+
*.swp
13
# Compiled Object files
24
*.slo
35
*.lo

include/pika_proxy_conn.h

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
#define PIKA_PROXY_CONN_H_
88

99
#include "pink/include/redis_conn.h"
10-
10+
#include "pink/include/backend_thread.h"
1111
#include "include/pika_client_conn.h"
12-
12+
#include <memory>
1313
class ProxyCli;
1414
class PikaProxyConn;
1515

@@ -27,14 +27,77 @@ class PikaProxyConn: public pink::RedisConn {
2727
pink::Thread *server_thread,
2828
pink::PinkEpoll* pink_epoll,
2929
std::shared_ptr<ProxyCli> proxy_cli);
30+
bool IsAuthed() { return isAuthed_; }
31+
bool IsSelected() { return isSelected_; }
32+
bool IsClosed() { return closed_; }
33+
void SetClose() { closed_ = true;}
3034
virtual ~PikaProxyConn() {}
3135

3236
private:
3337
int DealMessage(
3438
const pink::RedisCmdArgsType& argv,
3539
std::string* response) override;
36-
3740
std::shared_ptr<ProxyCli> proxy_cli_;
41+
std::string auth_;
42+
bool isAuthed_;
43+
int table_;
44+
bool isSelected_;
45+
bool closed_;
46+
};
47+
48+
struct ConnConfig {
49+
ConnConfig( int table, const std::string& auth, int parallel)
50+
: table_(table), auth_(auth), parallel_(parallel) {}
51+
int table_ = 0;
52+
std::string auth_;
53+
int parallel_ = 10;
54+
};
55+
56+
class ParallelConn {
57+
public:
58+
ParallelConn(const std::string& addr, ConnConfig& config,
59+
std::shared_ptr<pink::BackendThread> client);
60+
61+
Status Connect();
62+
Status Start();
63+
void Close();
64+
void Retain();
65+
bool Release();
66+
std::string Addr() { return addr_; }
67+
int GetTable() { return config_.table_; }
68+
Status PrepareConn();
69+
private:
70+
std::shared_ptr<pink::PinkConn> GetConn(int fd);
71+
void VerifyAuth(int fd);
72+
void SelectConn(int fd);
73+
void KeepAlive();
74+
void KeepAliveConn(int fd);
75+
76+
77+
//std::vector<std::shared_ptr<PikaClientConn>> parallelConn_;
78+
std::map<int, int> parallelConn_;
79+
std::set<int> tmpConns_;
80+
std::string addr_;
81+
ConnConfig config_;
82+
std::atomic<int> refCount_;
83+
std::shared_ptr<pink::BackendThread> client_;
3884
};
3985

86+
class ConnectionPool {
87+
public:
88+
ConnectionPool(const ConnConfig& config,
89+
std::shared_ptr<pink::BackendThread> client)
90+
: config_(config), client_(client) { }
91+
void Retain(std::string addr);
92+
void Release(std::string addr);
93+
void AddParallel(const std::string& addr);
94+
private:
95+
// addr and ptr
96+
ConnConfig config_;
97+
std::unordered_map<std::string, ParallelConn *> pool_;
98+
std::shared_ptr<pink::BackendThread> client_;
99+
};
100+
101+
102+
40103
#endif // PIKA_PROXY_CONN_H_

src/pika_proxy_cli.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ Status ProxyCli::ForwardToBackend(ProxyTask* task) {
126126
std::string ip_port = node.Ip() + ":" + std::to_string(node.Port());
127127
backend_task_queue_[ip_port].push_back(cli_task);
128128
}
129+
//TODO to be pipline
129130
std::string ip_port = conn_ptr->ip_port();
130131
if (task_queue_.find(ip_port) != task_queue_.end()) {
131132
ProxyTask* tmp_task = task_queue_[ip_port];

src/pika_proxy_conn.cc

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,140 @@ int PikaProxyConn::DealMessage(
3333
return 0;
3434
}
3535

36+
ParallelConn::ParallelConn(const std::string& addr, ConnConfig& config,
37+
std::shared_ptr<pink::BackendThread> client)
38+
: addr_(addr), config_(config), client_(client) {
39+
refCount_ = 1;
40+
}
41+
42+
43+
Status ParallelConn::Connect() {
44+
int num = parallelConn_.size() + tmpConns_.size();
45+
if (num > config_.parallel_) {
46+
return Status::OK();
47+
}
48+
for (int i = 0; i < num; i++) {
49+
std::string ip;
50+
int port, fd;
51+
if (!slash::ParseIpPortString(addr_, ip, port)) {
52+
LOG(INFO) << "parser addr " << addr_ << " error";
53+
return Status::InvalidArgument("paser addr error, addr: ", addr_);
54+
}
55+
Status s = client_->Connect(ip, port, &fd);
56+
if (!s.ok()) {
57+
LOG(INFO) << "connect addr: " << addr_ << "error: " << s.ToString();
58+
return s;
59+
}
60+
LOG(INFO) << "connect addr: " << addr_ << " fd: " << std::to_string(fd);
61+
tmpConns_.insert(fd);
62+
}
63+
return Status::OK();
64+
}
65+
66+
std::shared_ptr<pink::PinkConn> ParallelConn::GetConn(int fd) {
67+
return client_->GetConn(fd);
68+
}
69+
70+
void ParallelConn::VerifyAuth(int fd) {
71+
72+
}
73+
74+
void ParallelConn::SelectConn(int fd) {
75+
76+
}
77+
78+
void ParallelConn::KeepAlive() {
79+
80+
}
81+
82+
void ParallelConn::KeepAliveConn(int fd) {
83+
84+
}
85+
86+
Status ParallelConn::PrepareConn() {
87+
for(auto item : tmpConns_) {
88+
auto conn = std::dynamic_pointer_cast<PikaProxyConn>(GetConn(item));
89+
if (conn->IsAuthed()) {
90+
SelectConn(item);
91+
} else {
92+
VerifyAuth(item);
93+
SelectConn(item);
94+
}
95+
}
96+
return Status::OK();
97+
}
3698

99+
Status ParallelConn::Start() {
100+
Status s = Connect();
101+
if (!s.ok()) {
102+
return s;
103+
}
104+
s = PrepareConn();
105+
if (!s.ok()) {
106+
return s;
107+
}
108+
return Status::OK();
109+
}
110+
111+
void ParallelConn::Close() {
112+
for (auto item : parallelConn_) {
113+
client_->Close(item.second);
114+
}
115+
parallelConn_.clear();
116+
for (auto item : tmpConns_) {
117+
client_->Close(item);
118+
}
119+
tmpConns_.clear();
120+
}
121+
122+
void ParallelConn::Retain() {
123+
int expect = 0;
124+
if (refCount_.compare_exchange_strong(expect, -1)) {
125+
LOG(INFO) << "retain parallel conn ref count error";
126+
return;
127+
}
128+
refCount_++;
129+
}
130+
131+
bool ParallelConn::Release() {
132+
int expect = 0;
133+
if (refCount_.compare_exchange_strong(expect, -1)) {
134+
LOG(INFO) << "release parallel conn ref count error";
135+
return true;
136+
}
137+
refCount_--;
138+
if (refCount_.compare_exchange_strong(expect, -1)) {
139+
return true;
140+
}
141+
return false;
142+
}
143+
144+
145+
void ConnectionPool::Release(std::string addr) {
146+
if (pool_.find(addr) == pool_.end()) {
147+
return;
148+
}
149+
auto parallel = pool_.find(addr)->second;
150+
if (parallel->Release()) {
151+
parallel->Close();
152+
delete parallel;
153+
pool_.erase(addr);
154+
LOG(INFO) << "release parallel conn :" << parallel->Addr() << " table :"
155+
<< std::to_string(parallel->GetTable());
156+
}
157+
}
158+
159+
void ConnectionPool::AddParallel(const std::string& addr) {
160+
auto conns = new ParallelConn(addr, config_, client_);
161+
pool_.insert(make_pair(addr, conns));
162+
conns->Start();
163+
}
164+
165+
void ConnectionPool::Retain(std::string addr) {
166+
auto iter = pool_.find(addr);
167+
if (iter != pool_.end()) {
168+
iter->second->Retain();
169+
return;
170+
}
171+
AddParallel(addr);
172+
}

0 commit comments

Comments
 (0)