Skip to content

Commit 1d5aeb0

Browse files
committed
Simplify the code for sending data
1 parent b8d8ebf commit 1d5aeb0

File tree

8 files changed

+408
-390
lines changed

8 files changed

+408
-390
lines changed

CMakeLists.txt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ set(TRANTOR_SOURCES
104104
trantor/net/inner/Connector.cc
105105
trantor/net/inner/Poller.cc
106106
trantor/net/inner/Socket.cc
107+
trantor/net/inner/MemBufferNode.cc
108+
trantor/net/inner/StreamBufferNode.cc
107109
trantor/net/inner/TcpConnectionImpl.cc
108110
trantor/net/inner/Timer.cc
109111
trantor/net/inner/TimerQueue.cc
@@ -126,11 +128,16 @@ if(WIN32)
126128
set(TRANTOR_SOURCES
127129
${TRANTOR_SOURCES}
128130
third_party/wepoll/Wepoll.c
129-
trantor/utils/WindowsSupport.cc)
131+
trantor/utils/WindowsSupport.cc
132+
trantor/net/inner/FileBufferNodeWin.cc)
130133
set(private_headers
131134
${private_headers}
132135
third_party/wepoll/Wepoll.h
133136
trantor/utils/WindowsSupport.h)
137+
else(WIN32)
138+
set(TRANTOR_SOURCES
139+
${TRANTOR_SOURCES}
140+
trantor/net/inner/FileBufferNodeUnix.cc)
134141
endif(WIN32)
135142

136143
# Somehow the default value of TRANTOR_USE_TLS is OFF

trantor/net/inner/BufferNode.h

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
*
3+
* @file BufferNode.h
4+
* @author An Tao
5+
*
6+
* Public header file in trantor lib.
7+
*
8+
* Copyright 2018, An Tao. All rights reserved.
9+
* Use of this source code is governed by a BSD-style license
10+
* that can be found in the License file.
11+
*
12+
*
13+
*/
14+
15+
#pragma once
16+
#ifdef _WIN32
17+
#include <stdio.h>
18+
#endif
19+
#include <trantor/utils/MsgBuffer.h>
20+
#include <trantor/utils/NonCopyable.h>
21+
#include <trantor/utils/Logger.h>
22+
#include <functional>
23+
#include <memory>
24+
#include <string>
25+
26+
namespace trantor
27+
{
28+
class BufferNode;
29+
using BufferNodePtr = std::shared_ptr<BufferNode>;
30+
using StreamCallback = std::function<std::size_t(char *, std::size_t)>;
31+
class BufferNode : public NonCopyable
32+
{
33+
public:
34+
virtual bool isFile() const
35+
{
36+
return false;
37+
}
38+
virtual ~BufferNode() = default;
39+
virtual bool isStream() const
40+
{
41+
return false;
42+
}
43+
virtual void getData(const char *&data, size_t &len) = 0;
44+
virtual void append(const char *data, size_t len)
45+
{
46+
LOG_FATAL << "Not a memory buffer node";
47+
}
48+
virtual void retrieve(size_t len) = 0;
49+
virtual size_t remainingBytes() const = 0;
50+
virtual int getFd() const
51+
{
52+
LOG_FATAL << "Not a file buffer node";
53+
return -1;
54+
}
55+
void done()
56+
{
57+
isDone_ = true;
58+
}
59+
static BufferNodePtr newMemBufferNode();
60+
61+
static BufferNodePtr newStreamBufferNode(StreamCallback &&cb);
62+
#ifdef _WIN32
63+
static BufferNodePtr newFileBufferNode(FILE *fp,
64+
long long offset,
65+
size_t length);
66+
#else
67+
static BufferNodePtr newFileBufferNode(int fd, off_t offset, size_t length);
68+
#endif
69+
protected:
70+
bool isDone_{false};
71+
};
72+
73+
} // namespace trantor
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#include <trantor/net/inner/BufferNode.h>
2+
3+
namespace trantor
4+
{
5+
static const size_t kMaxSendFileBufferSize = 16 * 1024;
6+
class FileBufferNode : public BufferNode
7+
{
8+
public:
9+
FileBufferNode(int fd, off_t offset, size_t length)
10+
: sendFd_(fd), fileBytesToSend_(length)
11+
{
12+
assert(fd >= 0);
13+
lseek(sendFd_, offset, SEEK_SET);
14+
}
15+
bool isFile() const override
16+
{
17+
return true;
18+
}
19+
int getFd() const override
20+
{
21+
return sendFd_;
22+
}
23+
void getData(const char *&data, size_t &len) override
24+
{
25+
if (msgBuffer_.readableBytes() == 0)
26+
{
27+
msgBuffer_.ensureWritableBytes(
28+
std::min(kMaxSendFileBufferSize, fileBytesToSend_));
29+
auto n = read(sendFd_,
30+
msgBuffer_.beginWrite(),
31+
msgBuffer_.writableBytes());
32+
if (n > 0)
33+
{
34+
msgBuffer_.hasWritten(n);
35+
}
36+
else
37+
{
38+
LOG_SYSERR << "FileBufferNode::getData()";
39+
}
40+
}
41+
data = msgBuffer_.peek();
42+
len = msgBuffer_.readableBytes();
43+
}
44+
void retrieve(size_t len) override
45+
{
46+
msgBuffer_.retrieve(len);
47+
fileBytesToSend_ -= len;
48+
}
49+
size_t remainingBytes() const override
50+
{
51+
if (isDone_)
52+
return 0;
53+
return fileBytesToSend_;
54+
}
55+
~FileBufferNode() override;
56+
57+
private:
58+
int sendFd_{-1};
59+
ssize_t fileBytesToSend_{0};
60+
MsgBuffer msgBuffer_;
61+
};
62+
63+
BufferNodePtr BufferNode::newFileBufferNode(int fd, off_t offset, size_t length)
64+
{
65+
return std::make_shared<FileBufferNode>(fd, offset, length);
66+
}
67+
} // namespace trantor
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#include <trantor/net/inner/BufferNode.h>
2+
namespace trantor
3+
{
4+
static const size_t kMaxSendFileBufferSize = 16 * 1024;
5+
class FileBufferNode : public BufferNode
6+
{
7+
public:
8+
FileBufferNode(FILE *fp, long long offset, size_t length)
9+
: sendFp_(fp), fileBytesToSend_(length)
10+
{
11+
assert(fp);
12+
_fseeki64(sendFp_, offset, SEEK_SET);
13+
}
14+
15+
bool isFile() const override
16+
{
17+
return true;
18+
}
19+
20+
void getData(const char *&data, size_t &len) override
21+
{
22+
if (msgBuffer_.readableBytes() == 0)
23+
{
24+
msgBuffer_.ensureWritableBytes(kMaxSendFileBufferSize <
25+
fileBytesToSend_
26+
? kMaxSendFileBufferSize
27+
: fileBytesToSend_);
28+
auto n = fread(msgBuffer_.beginWrite(),
29+
1,
30+
msgBuffer_.writableBytes(),
31+
sendFp_);
32+
if (n > 0)
33+
{
34+
msgBuffer_.hasWritten(n);
35+
}
36+
else
37+
{
38+
LOG_SYSERR << "FileBufferNode::getData()";
39+
}
40+
}
41+
data = msgBuffer_.peek();
42+
len = msgBuffer_.readableBytes();
43+
}
44+
void retrieve(size_t len) override
45+
{
46+
msgBuffer_.retrieve(len);
47+
fileBytesToSend_ -= len;
48+
}
49+
size_t remainingBytes() const override
50+
{
51+
if (isDone_)
52+
return 0;
53+
return fileBytesToSend_;
54+
}
55+
~FileBufferNode() override
56+
{
57+
if (sendFp_)
58+
{
59+
fclose(sendFp_);
60+
}
61+
}
62+
int getFd() const override
63+
{
64+
LOG_ERROR << "getFd() is not supported on Windows";
65+
return 0;
66+
}
67+
68+
private:
69+
FILE *sendFp_{nullptr};
70+
long long offset_{0};
71+
72+
ssize_t fileBytesToSend_{0};
73+
74+
MsgBuffer msgBuffer_;
75+
};
76+
BufferNodePtr BufferNode::newFileBufferNode(FILE *fp,
77+
long long offset,
78+
size_t length)
79+
{
80+
return std::make_shared<FileBufferNode>(fp, offset, length);
81+
}
82+
} // namespace trantor

trantor/net/inner/MemBufferNode.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include <trantor/net/inner/BufferNode.h>
2+
namespace trantor
3+
{
4+
class MemBufferNode : public BufferNode
5+
{
6+
public:
7+
MemBufferNode() = default;
8+
9+
void getData(const char *&data, size_t &len) override
10+
{
11+
data = buffer_.peek();
12+
len = buffer_.readableBytes();
13+
}
14+
void retrieve(size_t len) override
15+
{
16+
buffer_.retrieve(len);
17+
}
18+
size_t remainingBytes() const override
19+
{
20+
if (isDone_)
21+
return 0;
22+
return buffer_.readableBytes();
23+
}
24+
void append(const char *data, size_t len) override
25+
{
26+
buffer_.append(data, len);
27+
}
28+
29+
private:
30+
trantor::MsgBuffer buffer_;
31+
};
32+
BufferNodePtr BufferNode::newMemBufferNode()
33+
{
34+
return std::make_shared<MemBufferNode>();
35+
}
36+
} // namespace trantor

trantor/net/inner/StreamBufferNode.cc

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#include <trantor/net/inner/BufferNode.h>
2+
namespace trantor
3+
{
4+
static const size_t kMaxSendFileBufferSize = 16 * 1024;
5+
class StreamBufferNode : public BufferNode
6+
{
7+
public:
8+
StreamBufferNode(std::function<std::size_t(char *, std::size_t)> &&callback)
9+
: streamCallback_(std::move(callback))
10+
{
11+
}
12+
bool isStream() const override
13+
{
14+
return true;
15+
}
16+
void getData(const char *&data, size_t &len) override
17+
{
18+
if (msgBuffer_.readableBytes() == 0)
19+
{
20+
msgBuffer_.ensureWritableBytes(kMaxSendFileBufferSize);
21+
auto n = streamCallback_(msgBuffer_.beginWrite(),
22+
msgBuffer_.writableBytes());
23+
if (n > 0)
24+
{
25+
msgBuffer_.hasWritten(n);
26+
}
27+
else
28+
{
29+
isDone_ = true;
30+
}
31+
}
32+
data = msgBuffer_.peek();
33+
len = msgBuffer_.readableBytes();
34+
}
35+
void retrieve(size_t len) override
36+
{
37+
msgBuffer_.retrieve(len);
38+
#ifndef NDEBUG
39+
dataWritten_ += len;
40+
LOG_TRACE << "send stream in loop: bytes written: " << dataWritten_
41+
<< " / total bytes written: " << dataWritten_;
42+
#endif
43+
}
44+
size_t remainingBytes() const override
45+
{
46+
if (isDone_)
47+
return 0;
48+
return 1;
49+
}
50+
~StreamBufferNode() override
51+
{
52+
if (streamCallback_)
53+
streamCallback_(nullptr, 0); // cleanup callback internals
54+
}
55+
56+
private:
57+
std::function<std::size_t(char *, std::size_t)> streamCallback_;
58+
#ifndef NDEBUG // defined by CMake for release build
59+
std::size_t dataWritten_{0};
60+
#endif
61+
MsgBuffer msgBuffer_;
62+
};
63+
BufferNodePtr BufferNode::newStreamBufferNode(StreamCallback &&callback)
64+
{
65+
return std::make_shared<StreamBufferNode>(std::move(callback));
66+
}
67+
} // namespace trantor

0 commit comments

Comments
 (0)