Skip to content

Commit 8da2d40

Browse files
authored
Refactor TcpServer I/O loop logic. (#239)
1 parent 41fa158 commit 8da2d40

File tree

3 files changed

+77
-58
lines changed

3 files changed

+77
-58
lines changed

trantor/net/EventLoop.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,8 @@ void EventLoop::loop()
241241
catch (std::exception &e)
242242
{
243243
LOG_WARN << "Exception thrown from event loop, rethrowing after "
244-
"running functions on quit";
244+
"running functions on quit: "
245+
<< e.what();
245246
loopException = std::current_exception();
246247
}
247248

trantor/net/TcpServer.cc

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,33 @@
1212
*
1313
*/
1414

15-
#include "Acceptor.h"
16-
#include "inner/TcpConnectionImpl.h"
1715
#include <trantor/net/TcpServer.h>
1816
#include <trantor/utils/Logger.h>
1917
#include <functional>
2018
#include <vector>
19+
#include "Acceptor.h"
20+
#include "inner/TcpConnectionImpl.h"
2121
using namespace trantor;
2222
using namespace std::placeholders;
2323

2424
TcpServer::TcpServer(EventLoop *loop,
2525
const InetAddress &address,
26-
const std::string &name,
26+
std::string name,
2727
bool reUseAddr,
2828
bool reUsePort)
2929
: loop_(loop),
3030
acceptorPtr_(new Acceptor(loop, address, reUseAddr, reUsePort)),
31-
serverName_(name),
31+
serverName_(std::move(name)),
3232
recvMessageCallback_([](const TcpConnectionPtr &, MsgBuffer *buffer) {
3333
LOG_ERROR << "unhandled recv message [" << buffer->readableBytes()
3434
<< " bytes]";
3535
buffer->retrieveAll();
36-
})
36+
}),
37+
ioLoops_({loop}),
38+
numIoLoops_(1)
3739
{
3840
acceptorPtr_->setNewConnectionCallback(
39-
std::bind(&TcpServer::newConnection, this, _1, _2));
41+
[this](int fd, const InetAddress &peer) { newConnection(fd, peer); });
4042
}
4143

4244
TcpServer::~TcpServer()
@@ -49,21 +51,12 @@ void TcpServer::newConnection(int sockfd, const InetAddress &peer)
4951
{
5052
LOG_TRACE << "new connection:fd=" << sockfd
5153
<< " address=" << peer.toIpPort();
52-
// test code for blocking or nonblocking
53-
// std::vector<char> str(1024*1024*100);
54-
// for(int i=0;i<str.size();i++)
55-
// str[i]='A';
56-
// LOG_TRACE<<"vector size:"<<str.size();
57-
// size_t n=write(sockfd,&str[0],str.size());
58-
// LOG_TRACE<<"write "<<n<<" bytes";
5954
loop_->assertInLoopThread();
60-
EventLoop *ioLoop = NULL;
61-
if (loopPoolPtr_ && loopPoolPtr_->size() > 0)
55+
EventLoop *ioLoop = ioLoops_[nextLoopIdx_];
56+
if (++nextLoopIdx_ >= numIoLoops_)
6257
{
63-
ioLoop = loopPoolPtr_->getNextLoop();
58+
nextLoopIdx_ = 0;
6459
}
65-
if (ioLoop == NULL)
66-
ioLoop = loop_;
6760
std::shared_ptr<TcpConnectionImpl> newPtr;
6861
if (sslCtxPtr_)
6962
{
@@ -102,7 +95,10 @@ void TcpServer::newConnection(int sockfd, const InetAddress &peer)
10295
if (writeCompleteCallback_)
10396
writeCompleteCallback_(connectionPtr);
10497
});
105-
newPtr->setCloseCallback(std::bind(&TcpServer::connectionClosed, this, _1));
98+
99+
newPtr->setCloseCallback([this](const TcpConnectionPtr &closeConnPtr) {
100+
connectionClosed(closeConnPtr);
101+
});
106102
connSet_.insert(newPtr);
107103
newPtr->connectEstablished();
108104
}
@@ -114,29 +110,15 @@ void TcpServer::start()
114110
started_ = true;
115111
if (idleTimeout_ > 0)
116112
{
117-
timingWheelMap_[loop_] =
118-
std::make_shared<TimingWheel>(loop_,
119-
idleTimeout_,
120-
1.0F,
121-
idleTimeout_ < 500
122-
? idleTimeout_ + 1
123-
: 100);
124-
if (loopPoolPtr_)
113+
for (EventLoop *loop : ioLoops_)
125114
{
126-
auto loopNum = loopPoolPtr_->size();
127-
while (loopNum > 0)
128-
{
129-
// LOG_TRACE << "new Wheel loopNum=" << loopNum;
130-
auto poolLoop = loopPoolPtr_->getNextLoop();
131-
timingWheelMap_[poolLoop] =
132-
std::make_shared<TimingWheel>(poolLoop,
133-
idleTimeout_,
134-
1.0F,
135-
idleTimeout_ < 500
136-
? idleTimeout_ + 1
137-
: 100);
138-
--loopNum;
139-
}
115+
timingWheelMap_[loop] =
116+
std::make_shared<TimingWheel>(loop,
117+
idleTimeout_,
118+
1.0F,
119+
idleTimeout_ < 500
120+
? idleTimeout_ + 1
121+
: 100);
140122
}
141123
}
142124
LOG_TRACE << "map size=" << timingWheelMap_.size();
@@ -156,7 +138,7 @@ void TcpServer::stop()
156138
{
157139
connPtrs.push_back(conn);
158140
}
159-
for (auto connection : connPtrs)
141+
for (auto &connection : connPtrs)
160142
{
161143
connection->forceClose();
162144
}
@@ -173,7 +155,7 @@ void TcpServer::stop()
173155
{
174156
connPtrs.push_back(conn);
175157
}
176-
for (auto connection : connPtrs)
158+
for (auto &connection : connPtrs)
177159
{
178160
connection->forceClose();
179161
}
@@ -223,7 +205,7 @@ void TcpServer::connectionClosed(const TcpConnectionPtr &connectionPtr)
223205
}
224206
}
225207

226-
const std::string TcpServer::ipPort() const
208+
std::string TcpServer::ipPort() const
227209
{
228210
return acceptorPtr_->addr().toIpPort();
229211
}

trantor/net/TcpServer.h

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,19 @@
1313
*/
1414

1515
#pragma once
16-
#include <trantor/net/callbacks.h>
17-
#include <trantor/utils/NonCopyable.h>
18-
#include <trantor/utils/Logger.h>
16+
#include <trantor/exports.h>
1917
#include <trantor/net/EventLoopThreadPool.h>
2018
#include <trantor/net/InetAddress.h>
2119
#include <trantor/net/TcpConnection.h>
20+
#include <trantor/net/callbacks.h>
21+
#include <trantor/utils/Logger.h>
22+
#include <trantor/utils/NonCopyable.h>
2223
#include <trantor/utils/TimingWheel.h>
23-
#include <trantor/exports.h>
24-
#include <string>
24+
#include <csignal>
2525
#include <memory>
2626
#include <set>
27-
#include <signal.h>
27+
#include <string>
28+
2829
namespace trantor
2930
{
3031
class Acceptor;
@@ -48,7 +49,7 @@ class TRANTOR_EXPORT TcpServer : NonCopyable
4849
*/
4950
TcpServer(EventLoop *loop,
5051
const InetAddress &address,
51-
const std::string &name,
52+
std::string name,
5253
bool reUseAddr = true,
5354
bool reUsePort = true);
5455
~TcpServer();
@@ -68,6 +69,7 @@ class TRANTOR_EXPORT TcpServer : NonCopyable
6869
/**
6970
* @brief Set the number of event loops in which the I/O of connections to
7071
* the server is handled.
72+
* An EventLoopThreadPool is created and managed by TcpServer.
7173
*
7274
* @param num
7375
*/
@@ -76,11 +78,14 @@ class TRANTOR_EXPORT TcpServer : NonCopyable
7678
assert(!started_);
7779
loopPoolPtr_ = std::make_shared<EventLoopThreadPool>(num);
7880
loopPoolPtr_->start();
81+
ioLoops_ = loopPoolPtr_->getLoops();
82+
numIoLoops_ = ioLoops_.size();
7983
}
8084

8185
/**
8286
* @brief Set the event loops pool in which the I/O of connections to
8387
* the server is handled.
88+
* A shared_ptr of EventLoopThreadPool is copied.
8489
*
8590
* @param pool
8691
*/
@@ -89,7 +94,26 @@ class TRANTOR_EXPORT TcpServer : NonCopyable
8994
assert(pool->size() > 0);
9095
assert(!started_);
9196
loopPoolPtr_ = pool;
92-
loopPoolPtr_->start();
97+
loopPoolPtr_->start(); // TODO: should not start by TcpServer
98+
ioLoops_ = loopPoolPtr_->getLoops();
99+
numIoLoops_ = ioLoops_.size();
100+
}
101+
102+
/**
103+
* @brief Set the event loops in which the I/O of connections to
104+
* the server is handled.
105+
* The loops are managed by caller. Caller should ensure that ioLoops
106+
* lives longer than TcpServer.
107+
*
108+
* @param ioLoops
109+
*/
110+
void setIoLoops(const std::vector<trantor::EventLoop *> &ioLoops)
111+
{
112+
assert(!ioLoops.empty());
113+
assert(!started_);
114+
ioLoops_ = ioLoops;
115+
numIoLoops_ = ioLoops_.size();
116+
loopPoolPtr_.reset();
93117
}
94118

95119
/**
@@ -152,7 +176,7 @@ class TRANTOR_EXPORT TcpServer : NonCopyable
152176
*
153177
* @return const std::string
154178
*/
155-
const std::string ipPort() const;
179+
std::string ipPort() const;
156180

157181
/**
158182
* @brief Get the address of the server.
@@ -178,7 +202,7 @@ class TRANTOR_EXPORT TcpServer : NonCopyable
178202
*/
179203
std::vector<EventLoop *> getIoLoops() const
180204
{
181-
return loopPoolPtr_->getLoops();
205+
return ioLoops_;
182206
}
183207

184208
/**
@@ -215,10 +239,12 @@ class TRANTOR_EXPORT TcpServer : NonCopyable
215239
const std::string &caPath = "");
216240

217241
private:
218-
EventLoop *loop_;
219242
void handleCloseInLoop(const TcpConnectionPtr &connectionPtr);
220-
std::unique_ptr<Acceptor> acceptorPtr_;
221243
void newConnection(int fd, const InetAddress &peer);
244+
void connectionClosed(const TcpConnectionPtr &connectionPtr);
245+
246+
EventLoop *loop_;
247+
std::unique_ptr<Acceptor> acceptorPtr_;
222248
std::string serverName_;
223249
std::set<TcpConnectionPtr> connSet_;
224250

@@ -228,8 +254,18 @@ class TRANTOR_EXPORT TcpServer : NonCopyable
228254

229255
size_t idleTimeout_{0};
230256
std::map<EventLoop *, std::shared_ptr<TimingWheel>> timingWheelMap_;
231-
void connectionClosed(const TcpConnectionPtr &connectionPtr);
257+
258+
// `loopPoolPtr_` may and may not hold the internal thread pool.
259+
// We should not access it directly in codes.
260+
// Instead, we should use its delegation variable `ioLoops_`.
232261
std::shared_ptr<EventLoopThreadPool> loopPoolPtr_;
262+
// If one of `setIoLoopNum()`, `setIoLoopThreadPool()` and `setIoLoops()` is
263+
// called, `ioLoops_` will hold the loops passed in.
264+
// Otherwise, it should contain only one element, which is `loop_`.
265+
std::vector<EventLoop *> ioLoops_;
266+
size_t nextLoopIdx_{0};
267+
size_t numIoLoops_{0};
268+
233269
#ifndef _WIN32
234270
class IgnoreSigPipe
235271
{

0 commit comments

Comments
 (0)