Skip to content

Commit e33dd7d

Browse files
an-taotripleslashmbleis
authored
Remove mutex (#312)
Co-authored-by: tripleslash <development.slash@gmail.com> Co-authored-by: Michael Bleis <michael.bleis@aracom.de>
1 parent 609f150 commit e33dd7d

20 files changed

+1208
-882
lines changed

CMakeLists.txt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ set(TRANTOR_SOURCES
104104
trantor/net/inner/Connector.cc
105105
trantor/net/inner/Poller.cc
106106
trantor/net/inner/Socket.cc
107+
trantor/net/inner/MemBufferNode.cc
108+
trantor/net/inner/StreamBufferNode.cc
109+
trantor/net/inner/AsyncStreamBufferNode.cc
107110
trantor/net/inner/TcpConnectionImpl.cc
108111
trantor/net/inner/Timer.cc
109112
trantor/net/inner/TimerQueue.cc
@@ -126,11 +129,16 @@ if(WIN32)
126129
set(TRANTOR_SOURCES
127130
${TRANTOR_SOURCES}
128131
third_party/wepoll/Wepoll.c
129-
trantor/utils/WindowsSupport.cc)
132+
trantor/utils/WindowsSupport.cc
133+
trantor/net/inner/FileBufferNodeWin.cc)
130134
set(private_headers
131135
${private_headers}
132136
third_party/wepoll/Wepoll.h
133137
trantor/utils/WindowsSupport.h)
138+
else(WIN32)
139+
set(TRANTOR_SOURCES
140+
${TRANTOR_SOURCES}
141+
trantor/net/inner/FileBufferNodeUnix.cc)
134142
endif(WIN32)
135143

136144
# Somehow the default value of TRANTOR_USE_TLS is OFF
@@ -283,6 +291,7 @@ set(public_net_headers
283291
trantor/net/TcpClient.h
284292
trantor/net/TcpConnection.h
285293
trantor/net/TcpServer.h
294+
trantor/net/AsyncStream.h
286295
trantor/net/callbacks.h
287296
trantor/net/Resolver.h
288297
trantor/net/Channel.h

trantor/net/AsyncStream.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
*
3+
* @file AsyncStream.h
4+
* @author An Tao
5+
*
6+
* Public header file in trantor lib.
7+
*
8+
* Copyright 2023, An Tao. All rights reserved.
9+
* Use of this source code is governed by a BSD-style license
10+
* that can be found in the License file.
11+
*
12+
*
13+
*/
14+
15+
#pragma once
16+
17+
#include <trantor/utils/NonCopyable.h>
18+
#include <memory>
19+
20+
namespace trantor
21+
{
22+
/**
23+
* @brief This class represents a data stream that can be sent asynchronously.
24+
* The data is sent in chunks, and the chunks are sent in order, and all the
25+
* chunks are sent continuously.
26+
*/
27+
class TRANTOR_EXPORT AsyncStream : public NonCopyable
28+
{
29+
public:
30+
virtual ~AsyncStream() = default;
31+
/**
32+
* @brief Send data asynchronously.
33+
*
34+
* @param data The data to be sent
35+
* @param len The length of the data
36+
* @return true if the data is sent successfully or at least is put in the
37+
* send buffer.
38+
* @return false if the connection is closed.
39+
*/
40+
virtual bool send(const char *data, size_t len) = 0;
41+
bool send(const std::string &data)
42+
{
43+
return send(data.data(), data.length());
44+
}
45+
/**
46+
* @brief Terminate the stream.
47+
*/
48+
virtual void close() = 0;
49+
};
50+
using AsyncStreamPtr = std::unique_ptr<AsyncStream>;
51+
} // namespace trantor

trantor/net/TcpConnection.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <trantor/net/callbacks.h>
2222
#include <trantor/net/Certificate.h>
2323
#include <trantor/net/TLSPolicy.h>
24+
#include <trantor/net/AsyncStream.h>
2425
#include <memory>
2526
#include <functional>
2627
#include <string>
@@ -69,8 +70,8 @@ class TRANTOR_EXPORT TcpConnection
6970
* @param length
7071
*/
7172
virtual void sendFile(const char *fileName,
72-
size_t offset = 0,
73-
size_t length = 0) = 0;
73+
long long offset = 0,
74+
long long length = 0) = 0;
7475
/**
7576
* @brief Send a file to the peer.
7677
*
@@ -79,8 +80,8 @@ class TRANTOR_EXPORT TcpConnection
7980
* @param length
8081
*/
8182
virtual void sendFile(const wchar_t *fileName,
82-
size_t offset = 0,
83-
size_t length = 0) = 0;
83+
long long offset = 0,
84+
long long length = 0) = 0;
8485
/**
8586
* @brief Send a stream to the peer.
8687
*
@@ -95,11 +96,20 @@ class TRANTOR_EXPORT TcpConnection
9596
callback) = 0; // (buffer, buffer size) -> size
9697
// of data put in buffer
9798

99+
/**
100+
* @brief Send a stream to the peer asynchronously.
101+
* @param disableKickoff Disable the kickoff mechanism. If this parameter is
102+
* enabled, the connection will not be closed after the inactive timeout.
103+
* @note The subsequent data sent after the async stream will be sent after
104+
* the stream is closed.
105+
*/
106+
virtual AsyncStreamPtr sendAsyncStream(bool disableKickoff = false) = 0;
98107
/**
99108
* @brief Get the local address of the connection.
100109
*
101110
* @return const InetAddress&
102111
*/
112+
103113
virtual const InetAddress &localAddr() const = 0;
104114

105115
/**
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#include <trantor/net/inner/BufferNode.h>
2+
3+
namespace trantor
4+
{
5+
class AsyncBufferNode : public BufferNode
6+
{
7+
public:
8+
AsyncBufferNode() = default;
9+
~AsyncBufferNode() override = default;
10+
bool isAsync() const override
11+
{
12+
return true;
13+
}
14+
bool isStream() const override
15+
{
16+
return true;
17+
}
18+
long long remainingBytes() const override
19+
{
20+
if (msgBufferPtr_)
21+
return static_cast<long long>(msgBufferPtr_->readableBytes());
22+
return 0;
23+
}
24+
bool available() const override
25+
{
26+
return !isDone_;
27+
}
28+
void getData(const char *&data, size_t &len) override
29+
{
30+
if (msgBufferPtr_)
31+
{
32+
data = msgBufferPtr_->peek();
33+
len = msgBufferPtr_->readableBytes();
34+
}
35+
else
36+
{
37+
data = nullptr;
38+
len = 0;
39+
}
40+
}
41+
void retrieve(size_t len) override
42+
{
43+
assert(msgBufferPtr_);
44+
if (msgBufferPtr_)
45+
{
46+
msgBufferPtr_->retrieve(len);
47+
}
48+
}
49+
void append(const char *data, size_t len) override
50+
{
51+
if (!msgBufferPtr_)
52+
{
53+
msgBufferPtr_ = std::make_unique<MsgBuffer>(len);
54+
}
55+
msgBufferPtr_->append(data, len);
56+
}
57+
58+
private:
59+
std::unique_ptr<MsgBuffer> msgBufferPtr_;
60+
};
61+
BufferNodePtr BufferNode::newAsyncStreamBufferNode()
62+
{
63+
return std::make_shared<AsyncBufferNode>();
64+
}
65+
} // namespace trantor

trantor/net/inner/BufferNode.h

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
*
3+
* @file BufferNode.h
4+
* @author An Tao
5+
*
6+
* Public header file in trantor lib.
7+
*
8+
* Copyright 2018, An Tao. All rights reserved.
9+
* Use of this source code is governed by a BSD-style license
10+
* that can be found in the License file.
11+
*
12+
*
13+
*/
14+
15+
#pragma once
16+
#ifdef _WIN32
17+
#include <stdio.h>
18+
#endif
19+
#include <trantor/utils/MsgBuffer.h>
20+
#include <trantor/utils/NonCopyable.h>
21+
#include <trantor/utils/Logger.h>
22+
#include <functional>
23+
#include <memory>
24+
#include <string>
25+
26+
namespace trantor
27+
{
28+
class BufferNode;
29+
using BufferNodePtr = std::shared_ptr<BufferNode>;
30+
using StreamCallback = std::function<std::size_t(char *, std::size_t)>;
31+
class BufferNode : public NonCopyable
32+
{
33+
public:
34+
virtual bool isFile() const
35+
{
36+
return false;
37+
}
38+
virtual ~BufferNode() = default;
39+
virtual bool isStream() const
40+
{
41+
return false;
42+
}
43+
virtual void getData(const char *&data, size_t &len) = 0;
44+
virtual void append(const char *, size_t)
45+
{
46+
LOG_FATAL << "Not a memory buffer node";
47+
}
48+
virtual void retrieve(size_t len) = 0;
49+
virtual long long remainingBytes() const = 0;
50+
virtual int getFd() const
51+
{
52+
LOG_FATAL << "Not a file buffer node";
53+
return -1;
54+
}
55+
virtual bool available() const
56+
{
57+
return true;
58+
}
59+
virtual bool isAsync() const
60+
{
61+
return false;
62+
}
63+
64+
void done()
65+
{
66+
isDone_ = true;
67+
}
68+
static BufferNodePtr newMemBufferNode();
69+
70+
static BufferNodePtr newStreamBufferNode(StreamCallback &&cb);
71+
#ifdef _WIN32
72+
static BufferNodePtr newFileBufferNode(const wchar_t *fileName,
73+
long long offset,
74+
long long length);
75+
#else
76+
static BufferNodePtr newFileBufferNode(const char *fileName,
77+
long long offset,
78+
long long length);
79+
#endif
80+
static BufferNodePtr newAsyncStreamBufferNode();
81+
82+
protected:
83+
bool isDone_{false};
84+
};
85+
86+
} // namespace trantor

0 commit comments

Comments
 (0)