Skip to content

Commit b3c5268

Browse files
chejingecheniujhwangshao1
authored
atom->3.5 (#1887)
* Revised Timer Task Thread (#1862) * add TimertaskManager, removed TimedscanThread. * Rsync client support multi thread (#1866) * define rsync related header file and proto --------- Co-authored-by: cheniujh <41671101+cheniujh@users.noreply.github.com> Co-authored-by: wangshao1 <30471730+wangshao1@users.noreply.github.com>
1 parent 61aa47b commit b3c5268

16 files changed

+582
-238
lines changed

include/pika_define.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ const std::string kPikaPidFile = "pika.pid";
4040
const std::string kPikaSecretFile = "rsync.secret";
4141
const std::string kDefaultRsyncAuth = "default";
4242

43+
/* Rsync */
44+
const int kMaxRsyncParallelNum = 4;
45+
4346
struct DBStruct {
4447
DBStruct(std::string tn, const uint32_t pn, std::set<uint32_t> pi)
4548
: db_name(std::move(tn)), slot_num(pn), slot_ids(std::move(pi)) {}

include/pika_rm.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ class SyncSlaveSlot : public SyncSlot {
157157

158158
std::string LocalIp();
159159

160+
void StopRsync();
161+
160162
void ActivateRsync();
161163

162164
bool IsRsyncRunning() {return rsync_cli_->IsRunning();}

include/rsync_client.h

Lines changed: 93 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#ifndef RSYNC_CLIENT_H_
77
#define RSYNC_CLIENT_H_
88

9-
#include <glog/logging.h>
109
#include <fcntl.h>
1110
#include <sys/stat.h>
1211
#include <sys/types.h>
@@ -16,13 +15,16 @@
1615
#include <thread>
1716
#include <condition_variable>
1817

18+
#include <glog/logging.h>
19+
1920
#include "net/include/bg_thread.h"
2021
#include "net/include/net_cli.h"
2122
#include "pstd/include/env.h"
2223
#include "pstd/include/pstd_status.h"
2324
#include "pstd/include/pstd_hash.h"
2425
#include "pstd/include/pstd_string.h"
2526
#include "pstd/include/pstd_status.h"
27+
#include "include/pika_define.h"
2628
#include "include/rsync_client_thread.h"
2729
#include "include/throttle.h"
2830
#include "rsync_service.pb.h"
@@ -35,6 +37,7 @@ namespace rsync {
3537
class RsyncWriter;
3638
class Session;
3739
class WaitObject;
40+
class WaitObjectManager;
3841

3942
class RsyncClient : public net::Thread {
4043
public:
@@ -45,41 +48,51 @@ class RsyncClient : public net::Thread {
4548
};
4649
RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id);
4750
void* ThreadMain() override;
51+
void Copy(const std::set<std::string>& file_set, int index);
4852
bool Init();
4953
Status Start();
5054
Status Stop();
5155
bool IsRunning() {
5256
return state_.load() == RUNNING;
5357
}
58+
bool IsStop() {
59+
return state_.load() == STOP;
60+
}
5461
bool IsIdle() { return state_.load() == IDLE;}
5562
void OnReceive(RsyncService::RsyncResponse* resp);
5663

5764
private:
5865
bool Recover();
59-
Status Wait(RsyncService::RsyncResponse*& resp);
60-
Status CopyRemoteFile(const std::string& filename);
66+
Status CopyRemoteFile(const std::string& filename, int index);
6167
Status CopyRemoteMeta(std::string* snapshot_uuid, std::set<std::string>* file_set);
6268
Status LoadLocalMeta(std::string* snapshot_uuid, std::map<std::string, std::string>* file_map);
6369
std::string GetLocalMetaFilePath();
6470
Status FlushMetaTable();
6571
Status CleanUpExpiredFiles(bool need_reset_path, const std::set<std::string>& files);
66-
Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set<std::string>& expired_files, std::map<std::string, std::string>* localFileMap);
72+
Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set<std::string>& expired_files,
73+
std::map<std::string, std::string>* localFileMap);
6774
void HandleRsyncMetaResponse(RsyncService::RsyncResponse* response);
6875

6976
private:
77+
typedef std::unique_ptr<RsyncClientThread> NetThreadUPtr;
78+
7079
std::map<std::string, std::string> meta_table_;
71-
int flush_period_ = 10;
7280
std::set<std::string> file_set_;
7381
std::string snapshot_uuid_;
7482
std::string dir_;
7583
std::string db_name_;
7684
uint32_t slot_id_ = 0;
77-
std::unique_ptr<RsyncClientThread> client_thread_;
85+
86+
NetThreadUPtr client_thread_;
87+
std::vector<std::thread> work_threads_;
88+
std::atomic<int> finished_work_cnt_ = 0;
89+
7890
std::atomic<State> state_;
7991
int max_retries_ = 10;
80-
std::unique_ptr<WaitObject> wo_;
92+
std::unique_ptr<WaitObjectManager> wo_mgr_;
8193
std::condition_variable cond_;
8294
std::mutex mu_;
95+
8396
std::unique_ptr<Throttle> throttle_;
8497
std::string master_ip_;
8598
int master_port_;
@@ -129,22 +142,90 @@ class WaitObject {
129142
public:
130143
WaitObject() : filename_(""), type_(RsyncService::kRsyncMeta), offset_(0), resp_(nullptr) {}
131144
~WaitObject() {}
145+
132146
void Reset(const std::string& filename, RsyncService::Type t, size_t offset) {
147+
std::lock_guard<std::mutex> guard(mu_);
133148
resp_ = nullptr;
134149
filename_ = filename;
135150
type_ = t;
136151
offset_ = offset;
137152
}
138-
void Reset(RsyncService::Type t) {
139-
resp_ = nullptr;
140-
filename_ = "";
141-
type_ = t;
142-
offset_ = 0xFFFFFFFF;
153+
154+
pstd::Status Wait(RsyncService::RsyncResponse*& resp) {
155+
pstd::Status s = Status::Timeout("rsync timeout", "timeout");
156+
{
157+
std::unique_lock<std::mutex> lock(mu_);
158+
auto cv_s = cond_.wait_for(lock, std::chrono::seconds(3), [this] {
159+
return resp_ != nullptr;
160+
});
161+
if (!cv_s) {
162+
return s;
163+
}
164+
resp = resp_;
165+
s = Status::OK();
166+
}
167+
return s;
143168
}
169+
170+
void WakeUp(RsyncService::RsyncResponse* resp) {
171+
std::unique_lock<std::mutex> lock(mu_);
172+
resp_ = resp;
173+
cond_.notify_all();
174+
}
175+
176+
RsyncService::RsyncResponse* Response() {return resp_;}
177+
std::string Filename() {return filename_;}
178+
RsyncService::Type Type() {return type_;}
179+
size_t Offset() {return offset_;}
180+
private:
144181
std::string filename_;
145182
RsyncService::Type type_;
146183
size_t offset_ = 0xFFFFFFFF;
147184
RsyncService::RsyncResponse* resp_ = nullptr;
185+
std::condition_variable cond_;
186+
std::mutex mu_;
187+
};
188+
189+
class WaitObjectManager {
190+
public:
191+
WaitObjectManager() {
192+
wo_vec_.resize(kMaxRsyncParallelNum);
193+
for (int i = 0; i < kMaxRsyncParallelNum; i++) {
194+
wo_vec_[i] = new WaitObject();
195+
}
196+
}
197+
~WaitObjectManager() {
198+
for (int i = 0; i < wo_vec_.size(); i++) {
199+
delete wo_vec_[i];
200+
wo_vec_[i] = nullptr;
201+
}
202+
}
203+
204+
WaitObject* UpdateWaitObject(int worker_index, const std::string& filename,
205+
RsyncService::Type type, size_t offset) {
206+
std::lock_guard<std::mutex> guard(mu_);
207+
wo_vec_[worker_index]->Reset(filename, type, offset);
208+
return wo_vec_[worker_index];
209+
}
210+
211+
void WakeUp(RsyncService::RsyncResponse* resp) {
212+
std::lock_guard<std::mutex> guard(mu_);
213+
int index = resp->reader_index();
214+
if (wo_vec_[index] == nullptr || resp->type() != wo_vec_[index]->Type()) {
215+
delete resp;
216+
return;
217+
}
218+
if (resp->type() == RsyncService::kRsyncFile &&
219+
(resp->file_resp().filename() != wo_vec_[index]->Filename())) {
220+
delete resp;
221+
return;
222+
}
223+
wo_vec_[index]->WakeUp(resp);
224+
}
225+
226+
private:
227+
std::vector<WaitObject*> wo_vec_;
228+
std::mutex mu_;
148229
};
149230

150231
} // end namespace rsync

include/rsync_server.h

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
#include "rsync_service.pb.h"
2222

2323
namespace rsync {
24+
class RsyncServerConn;
2425
struct RsyncServerTaskArg {
2526
std::shared_ptr<RsyncService::RsyncRequest> req;
26-
std::shared_ptr<net::PbConn> conn;
27-
RsyncServerTaskArg(std::shared_ptr<RsyncService::RsyncRequest> _req, std::shared_ptr<net::PbConn> _conn)
27+
std::shared_ptr<RsyncServerConn> conn;
28+
RsyncServerTaskArg(std::shared_ptr<RsyncService::RsyncRequest> _req, std::shared_ptr<RsyncServerConn> _conn)
2829
: req(std::move(_req)), conn(std::move(_conn)) {}
2930
};
3031
class RsyncReader;
@@ -52,6 +53,8 @@ class RsyncServerConn : public net::PbConn {
5253
static void HandleMetaRsyncRequest(void* arg);
5354
static void HandleFileRsyncRequest(void* arg);
5455
private:
56+
std::vector<std::shared_ptr<RsyncReader> > readers_;
57+
std::mutex mu_;
5558
void* data_ = nullptr;
5659
};
5760

@@ -86,6 +89,95 @@ class RsyncServerThread : public net::HolyThread {
8689
RsyncServerHandle handle_;
8790
};
8891

92+
class RsyncReader {
93+
public:
94+
RsyncReader() {
95+
block_data_ = new char[kBlockSize];
96+
}
97+
~RsyncReader() {
98+
if (!filepath_.empty()) {
99+
Reset();
100+
}
101+
delete []block_data_;
102+
}
103+
pstd::Status Read(const std::string filepath, const size_t offset,
104+
const size_t count, char* data, size_t* bytes_read,
105+
std::string* checksum, bool* is_eof) {
106+
std::lock_guard<std::mutex> guard(mu_);
107+
pstd::Status s = Seek(filepath, offset);
108+
if (!s.ok()) {
109+
return s;
110+
}
111+
size_t offset_in_block = offset % kBlockSize;
112+
size_t copy_count = count > (end_offset_ - offset) ? end_offset_ - offset : count;
113+
memcpy(data, block_data_ + offset_in_block, copy_count);
114+
*bytes_read = copy_count;
115+
*is_eof = (offset + copy_count == total_size_);
116+
return pstd::Status::OK();
117+
}
118+
private:
119+
pstd::Status Seek(const std::string filepath, const size_t offset) {
120+
if (filepath == filepath_ && offset >= start_offset_ && offset < end_offset_) {
121+
return pstd::Status::OK();
122+
}
123+
if (filepath != filepath_) {
124+
Reset();
125+
fd_ = open(filepath.c_str(), O_RDONLY);
126+
if (fd_ < 0) {
127+
return pstd::Status::IOError("fd open failed");
128+
}
129+
filepath_ = filepath;
130+
struct stat buf;
131+
stat(filepath.c_str(), &buf);
132+
total_size_ = buf.st_size;
133+
}
134+
start_offset_ = (offset / kBlockSize) * kBlockSize;
135+
136+
size_t read_offset = start_offset_;
137+
size_t read_count = kBlockSize > (total_size_ - read_offset) ? (total_size_ - read_offset) : kBlockSize;
138+
ssize_t bytesin = 0;
139+
char* ptr = block_data_;
140+
while ((bytesin = pread(fd_, ptr, read_count, read_offset)) > 0) {
141+
read_count -= bytesin;
142+
read_offset += bytesin;
143+
ptr += bytesin;
144+
if (read_count <= 0) {
145+
break;
146+
}
147+
}
148+
if (bytesin < 0) {
149+
LOG(ERROR) << "unable to read from " << filepath_;
150+
Reset();
151+
return pstd::Status::IOError("unable to read from " + filepath);
152+
}
153+
end_offset_ = start_offset_ + (ptr - block_data_);
154+
return pstd::Status::OK();
155+
}
156+
void Reset() {
157+
total_size_ = -1;
158+
start_offset_ = 0xFFFFFFFF;
159+
end_offset_ = 0xFFFFFFFF;
160+
memset(block_data_, 0, kBlockSize);
161+
md5_.reset(new pstd::MD5());
162+
filepath_ = "";
163+
close(fd_);
164+
fd_ = -1;
165+
}
166+
167+
private:
168+
std::mutex mu_;
169+
const size_t kBlockSize = 16 << 20;
170+
171+
char* block_data_;
172+
size_t start_offset_ = -1;
173+
size_t end_offset_ = -1;
174+
size_t total_size_ = -1;
175+
176+
int fd_ = -1;
177+
std::string filepath_;
178+
std::unique_ptr<pstd::MD5> md5_;
179+
};
180+
89181
} //end namespace rsync
90182
#endif
91183

src/net/include/server_thread.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class ServerThread : public Thread {
175175

176176
// process events in notify_queue
177177
virtual void ProcessNotifyEvents(const NetFiredEvent* pfe);
178+
178179

179180
const ServerHandle* handle_;
180181
bool own_handle_ = false;

src/net/src/dispatch_thread.cc

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ DispatchThread::DispatchThread(int port, int work_num, ConnFactory* conn_factory
2323
for (int i = 0; i < work_num_; i++) {
2424
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
2525
}
26-
timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();});
2726
}
2827

2928
DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, ConnFactory* conn_factory,
@@ -35,7 +34,6 @@ DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, Co
3534
for (int i = 0; i < work_num_; i++) {
3635
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
3736
}
38-
timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();});
3937
}
4038

4139
DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int work_num, ConnFactory* conn_factory,
@@ -47,7 +45,6 @@ DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int w
4745
for (int i = 0; i < work_num_; i++) {
4846
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
4947
}
50-
timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();});
5148
}
5249

5350
DispatchThread::~DispatchThread() = default;
@@ -67,7 +64,13 @@ int DispatchThread::StartThread() {
6764
return ret;
6865
}
6966
}
70-
timed_scan_thread.StartThread();
67+
68+
// Adding timer tasks and run timertaskThread
69+
timerTaskThread_.AddTimerTask(
70+
"blrpop_blocking_info_scan", 250, true, [this] { this->ScanExpiredBlockedConnsOfBlrpop();});
71+
72+
73+
timerTaskThread_.StartThread();
7174
return ServerThread::StartThread();
7275
}
7376

@@ -88,7 +91,7 @@ int DispatchThread::StopThread() {
8891
worker_thread_[i]->private_data_ = nullptr;
8992
}
9093
}
91-
timed_scan_thread.StopThread();
94+
timerTaskThread_.StopThread();
9295
return ServerThread::StopThread();
9396
}
9497

@@ -258,6 +261,7 @@ void DispatchThread::ScanExpiredBlockedConnsOfBlrpop() {
258261

259262
void DispatchThread::SetQueueLimit(int queue_limit) { queue_limit_ = queue_limit; }
260263

264+
261265
extern ServerThread* NewDispatchThread(int port, int work_num, ConnFactory* conn_factory, int cron_interval,
262266
int queue_limit, const ServerHandle* handle) {
263267
return new DispatchThread(port, work_num, conn_factory, cron_interval, queue_limit, handle);

0 commit comments

Comments
 (0)