Skip to content

Commit bb4aab9

Browse files
committed
net: move message conversion to wire bytes from PushMessage to SocketSendData
This furthers transport abstraction by removing the assumption that a message can always immediately be converted to wire bytes. This assumption does not hold for the v2 transport proposed by BIP324, as no messages can be sent before the handshake completes. This is done by only keeping (complete) CSerializedNetMsg objects in vSendMsg, rather than the resulting bytes (for header and payload) that need to be sent. In SocketSendData, these objects are handed to the transport as permitted by it, and sending out the bytes the transport tells us to send. This also removes the nSendOffset member variable in CNode, as keeping track of how much has been sent is now a responsability of the transport. This is not a pure refactor, and has the following effects even for the current v1 transport: * Checksum calculation now happens in SocketSendData rather than PushMessage. For non-optimistic-send messages, that means this computation now happens in the network thread rather than the message handler thread (generally a good thing, as the message handler thread is more of a computational bottleneck). * Checksum calculation now happens while holding the cs_vSend lock. This is technically unnecessary for the v1 transport, as messages are encoded independent from one another, but is untenable for the v2 transport anyway. * Statistics updates about per-message sent bytes now happen when those bytes are actually handed to the OS, rather than at PushMessage time.
1 parent a1a1060 commit bb4aab9

File tree

6 files changed

+71
-58
lines changed

6 files changed

+71
-58
lines changed

src/net.cpp

Lines changed: 46 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -915,35 +915,49 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
915915
{
916916
auto it = node.vSendMsg.begin();
917917
size_t nSentSize = 0;
918-
919-
while (it != node.vSendMsg.end()) {
920-
const auto& data = *it;
921-
assert(data.size() > node.nSendOffset);
918+
bool data_left{false}; //!< second return value (whether unsent data remains)
919+
920+
while (true) {
921+
if (it != node.vSendMsg.end()) {
922+
// If possible, move one message from the send queue to the transport. This fails when
923+
// there is an existing message still being sent.
924+
size_t memusage = it->GetMemoryUsage();
925+
if (node.m_transport->SetMessageToSend(*it)) {
926+
// Update memory usage of send buffer (as *it will be deleted).
927+
node.m_send_memusage -= memusage;
928+
++it;
929+
}
930+
}
931+
const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
932+
data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
922933
int nBytes = 0;
923-
{
934+
if (!data.empty()) {
924935
LOCK(node.m_sock_mutex);
936+
// There is no socket in case we've already disconnected, or in test cases without
937+
// real connections. In these cases, we bail out immediately and just leave things
938+
// in the send queue and transport.
925939
if (!node.m_sock) {
926940
break;
927941
}
928942
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
929943
#ifdef MSG_MORE
930-
if (it + 1 != node.vSendMsg.end()) {
944+
// We have more to send if either the transport itself has more, or if we have more
945+
// messages to send.
946+
if (more || it != node.vSendMsg.end()) {
931947
flags |= MSG_MORE;
932948
}
933949
#endif
934-
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags);
950+
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
935951
}
936952
if (nBytes > 0) {
937953
node.m_last_send = GetTime<std::chrono::seconds>();
938954
node.nSendBytes += nBytes;
939-
node.nSendOffset += nBytes;
955+
// Notify transport that bytes have been processed.
956+
node.m_transport->MarkBytesSent(nBytes);
957+
// Update statistics per message type.
958+
node.AccountForSentBytes(msg_type, nBytes);
940959
nSentSize += nBytes;
941-
if (node.nSendOffset == data.size()) {
942-
node.nSendOffset = 0;
943-
// Update memory usage of send buffer (as *it will be deleted).
944-
node.m_send_memusage -= sizeof(data) + memusage::DynamicUsage(data);
945-
it++;
946-
} else {
960+
if ((size_t)nBytes != data.size()) {
947961
// could not send full message; stop sending more
948962
break;
949963
}
@@ -956,19 +970,17 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
956970
node.CloseSocketDisconnect();
957971
}
958972
}
959-
// couldn't send anything at all
960973
break;
961974
}
962975
}
963976

964977
node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;
965978

966979
if (it == node.vSendMsg.end()) {
967-
assert(node.nSendOffset == 0);
968980
assert(node.m_send_memusage == 0);
969981
}
970982
node.vSendMsg.erase(node.vSendMsg.begin(), it);
971-
return {nSentSize, !node.vSendMsg.empty()};
983+
return {nSentSize, data_left};
972984
}
973985

974986
/** Try to find a connection to evict when the node is full.
@@ -1307,7 +1319,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
13071319

13081320
for (CNode* pnode : nodes) {
13091321
bool select_recv = !pnode->fPauseRecv;
1310-
bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
1322+
bool select_send;
1323+
{
1324+
LOCK(pnode->cs_vSend);
1325+
// Sending is possible if either there are bytes to send right now, or if there will be
1326+
// once a potential message from vSendMsg is handed to the transport.
1327+
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
1328+
select_send = !to_send.empty() || !pnode->vSendMsg.empty();
1329+
}
13111330
if (!select_recv && !select_send) continue;
13121331

13131332
LOCK(pnode->m_sock_mutex);
@@ -2988,42 +3007,19 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
29883007
size_t nBytesSent = 0;
29893008
{
29903009
LOCK(pnode->cs_vSend);
2991-
const bool queue_was_empty{pnode->vSendMsg.empty()};
2992-
2993-
// Give the message to the transport, and add all bytes it wants us to send out as byte
2994-
// vectors to vSendMsg. This is temporary code that exists to support the new transport
2995-
// sending interface using the old way of queueing data. In a future commit vSendMsg will
2996-
// be replaced with a queue of CSerializedNetMsg objects to be sent instead, and this code
2997-
// will disappear.
2998-
bool queued = pnode->m_transport->SetMessageToSend(msg);
2999-
assert(queued);
3000-
// In the current transport (V1Transport), GetBytesToSend first returns a header to send,
3001-
// and then the payload data (if any), necessitating a loop.
3002-
while (true) {
3003-
const auto& [bytes, _more, msg_type] = pnode->m_transport->GetBytesToSend();
3004-
if (bytes.empty()) break;
3005-
// Update statistics per message type.
3006-
pnode->AccountForSentBytes(msg_type, bytes.size());
3007-
pnode->vSendMsg.push_back({bytes.begin(), bytes.end()});
3008-
// Update memory usage of send buffer. For now, use static + dynamic memory usage of
3009-
// byte vectors in vSendMsg as send memory. In a future commit, vSendMsg will be
3010-
// replaced with a queue of CSerializedNetMsg objects, and we'll use their memory usage
3011-
// instead.
3012-
pnode->m_send_memusage += sizeof(pnode->vSendMsg.back()) + memusage::DynamicUsage(pnode->vSendMsg.back());
3013-
// Notify transport that bytes have been processed (they're not actually sent yet,
3014-
// but pushed onto the vSendMsg queue of bytes to send).
3015-
pnode->m_transport->MarkBytesSent(bytes.size());
3016-
}
3017-
// At this point, m_transport->GetSendMemoryUsage() isn't very interesting as the
3018-
// transport's message is fully flushed (and converted to byte arrays). It's still included
3019-
// here for correctness, and will become relevant in a future commit when a queued message
3020-
// inside the transport may survive PushMessage calls.
3010+
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
3011+
const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
3012+
3013+
// Update memory usage of send buffer.
3014+
pnode->m_send_memusage += msg.GetMemoryUsage();
30213015
if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;
3016+
// Move message to vSendMsg queue.
3017+
pnode->vSendMsg.push_back(std::move(msg));
30223018

3023-
// If the write queue was empty before and isn't now, attempt "optimistic write":
3019+
// If there was nothing to send before, attempt "optimistic write":
30243020
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
30253021
// doing a send, try sending from the calling thread if the queue was empty before.
3026-
if (queue_was_empty && !pnode->vSendMsg.empty()) {
3022+
if (queue_was_empty) {
30273023
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
30283024
}
30293025
}

src/net.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -436,13 +436,12 @@ class CNode
436436
*/
437437
std::shared_ptr<Sock> m_sock GUARDED_BY(m_sock_mutex);
438438

439-
/** Total memory usage of vSendMsg (counting the vectors and their dynamic usage, but not the
440-
* deque overhead). */
439+
/** Sum of GetMemoryUsage of all vSendMsg entries. */
441440
size_t m_send_memusage GUARDED_BY(cs_vSend){0};
442-
/** Offset inside the first vSendMsg already sent */
443-
size_t nSendOffset GUARDED_BY(cs_vSend){0};
441+
/** Total number of bytes sent on the wire to this peer. */
444442
uint64_t nSendBytes GUARDED_BY(cs_vSend){0};
445-
std::deque<std::vector<unsigned char>> vSendMsg GUARDED_BY(cs_vSend);
443+
/** Messages still to be fed to m_transport->SetMessageToSend. */
444+
std::deque<CSerializedNetMsg> vSendMsg GUARDED_BY(cs_vSend);
446445
Mutex cs_vSend;
447446
Mutex m_sock_mutex;
448447
Mutex cs_vRecv;

src/test/denialofservice_tests.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,19 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
8686

8787
{
8888
LOCK(dummyNode1.cs_vSend);
89-
BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
90-
dummyNode1.vSendMsg.clear();
89+
const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
90+
BOOST_CHECK(!to_send.empty());
9191
}
92+
connman.FlushSendBuffer(dummyNode1);
9293

9394
int64_t nStartTime = GetTime();
9495
// Wait 21 minutes
9596
SetMockTime(nStartTime+21*60);
9697
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
9798
{
9899
LOCK(dummyNode1.cs_vSend);
99-
BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
100+
const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
101+
BOOST_CHECK(!to_send.empty());
100102
}
101103
// Wait 3 more minutes
102104
SetMockTime(nStartTime+24*60);

src/test/fuzz/process_messages.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)
6767

6868
CNode& random_node = *PickValue(fuzzed_data_provider, peers);
6969

70+
connman.FlushSendBuffer(random_node);
7071
(void)connman.ReceiveMsgFrom(random_node, std::move(net_msg));
7172
random_node.fPauseSend = false;
7273

src/test/util/net.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
2525
const CNetMsgMaker mm{0};
2626

2727
peerman.InitializeNode(node, local_services);
28+
FlushSendBuffer(node); // Drop the version message added by InitializeNode.
2829

2930
CSerializedNetMsg msg_version{
3031
mm.Make(NetMsgType::VERSION,
@@ -45,6 +46,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
4546
node.fPauseSend = false;
4647
connman.ProcessMessagesOnce(node);
4748
peerman.SendMessages(&node);
49+
FlushSendBuffer(node); // Drop the verack message added by SendMessages.
4850
if (node.fDisconnect) return;
4951
assert(node.nVersion == version);
5052
assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
@@ -70,6 +72,18 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
7072
}
7173
}
7274

75+
void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
76+
{
77+
LOCK(node.cs_vSend);
78+
node.vSendMsg.clear();
79+
node.m_send_memusage = 0;
80+
while (true) {
81+
const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend();
82+
if (to_send.empty()) break;
83+
node.m_transport->MarkBytesSent(to_send.size());
84+
}
85+
}
86+
7387
bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
7488
{
7589
bool queued = node.m_transport->SetMessageToSend(ser_msg);

src/test/util/net.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct ConnmanTestMsg : public CConnman {
5555
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const;
5656

5757
bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const;
58+
void FlushSendBuffer(CNode& node) const;
5859
};
5960

6061
constexpr ServiceFlags ALL_SERVICE_FLAGS[]{

0 commit comments

Comments
 (0)