Skip to content

Commit f3be45f

Browse files
authored
Merge pull request #73 from Jurrie/feature/Fix_parallel_read_and_write_for_UDPTransport
Fix parallel read/write for UDPTransport
2 parents 755995b + 01b2142 commit f3be45f

File tree

4 files changed

+247
-179
lines changed

4 files changed

+247
-179
lines changed

modules/core/src/main/java/com/illposed/osc/transport/channel/OSCDatagramChannel.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,58 +52,60 @@ public OSCDatagramChannel(
5252
this.serializerBuilder = serializerAndParserBuilder;
5353
}
5454

55-
public OSCPacket read(final ByteBuffer buffer) throws IOException, OSCParseException {
55+
public OSCPacket read(final ByteBuffer recvBuffer) throws IOException, OSCParseException {
5656

5757
boolean completed = false;
5858
OSCPacket oscPacket;
5959
try {
6060
begin();
6161

62-
buffer.clear();
62+
recvBuffer.clear();
6363
// NOTE From the doc of `read()` and `receive()`:
6464
// "If there are fewer bytes remaining in the buffer
6565
// than are required to hold the datagram
6666
// then the remainder of the datagram is silently discarded."
6767
if (underlyingChannel.isConnected()) {
68-
underlyingChannel.read(buffer);
68+
underlyingChannel.read(recvBuffer);
6969
} else {
70-
underlyingChannel.receive(buffer);
70+
underlyingChannel.receive(recvBuffer);
7171
}
7272
// final int readBytes = buffer.position();
7373
// if (readBytes == buffer.capacity()) {
7474
// // TODO In this case it is very likely that the buffer was actually too small, and the remainder of the datagram/packet was silently discarded. We might want to give a warning, like throw an exception in this case, but whether this happens should probably be user configurable.
7575
// }
76-
buffer.flip();
77-
if (buffer.limit() == 0) {
78-
throw new OSCParseException("Received a packet without any data", buffer);
76+
recvBuffer.flip();
77+
if (recvBuffer.limit() == 0) {
78+
throw new OSCParseException("Received a packet without any data", recvBuffer);
7979
} else {
80-
oscPacket = parser.convert(buffer);
80+
oscPacket = parser.convert(recvBuffer);
8181
completed = true;
8282
}
83+
recvBuffer.flip();
8384
} finally {
8485
end(completed);
8586
}
8687

8788
return oscPacket;
8889
}
8990

90-
public void send(final ByteBuffer buffer, final OSCPacket packet, final SocketAddress remoteAddress) throws IOException, OSCSerializeException {
91+
public void send(final ByteBuffer sendBuffer, final OSCPacket packet, final SocketAddress remoteAddress) throws IOException, OSCSerializeException {
9192

9293
boolean completed = false;
9394
try {
9495
begin();
9596

96-
final OSCSerializer serializer = serializerBuilder.buildSerializer(new BufferBytesReceiver(buffer));
97-
buffer.rewind();
97+
final OSCSerializer serializer = serializerBuilder.buildSerializer(new BufferBytesReceiver(sendBuffer));
98+
sendBuffer.rewind();
9899
serializer.write(packet);
99-
buffer.flip();
100+
sendBuffer.flip();
100101
if (underlyingChannel.isConnected()) {
101-
underlyingChannel.write(buffer);
102+
underlyingChannel.write(sendBuffer);
102103
} else if (remoteAddress == null) {
103104
throw new IllegalStateException("Not connected and no remote address is given");
104105
} else {
105-
underlyingChannel.send(buffer, remoteAddress);
106+
underlyingChannel.send(sendBuffer, remoteAddress);
106107
}
108+
sendBuffer.flip();
107109
completed = true;
108110
} finally {
109111
end(completed);

modules/core/src/main/java/com/illposed/osc/transport/udp/UDPTransport.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public class UDPTransport implements Transport {
3434
* incoming datagram data size.
3535
*/
3636
public static final int BUFFER_SIZE = 65507;
37-
private final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
37+
private final ByteBuffer recvBuffer = ByteBuffer.allocate(BUFFER_SIZE);
38+
private final ByteBuffer sendBuffer = ByteBuffer.allocate(BUFFER_SIZE);
3839

3940
private final SocketAddress local;
4041
private final SocketAddress remote;
@@ -131,12 +132,12 @@ public void close() throws IOException {
131132

132133
@Override
133134
public void send(final OSCPacket packet) throws IOException, OSCSerializeException {
134-
oscChannel.send(buffer, packet, remote);
135+
oscChannel.send(sendBuffer, packet, remote);
135136
}
136137

137138
@Override
138139
public OSCPacket receive() throws IOException, OSCParseException {
139-
return oscChannel.read(buffer);
140+
return oscChannel.read(recvBuffer);
140141
}
141142

142143
@Override
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// SPDX-FileCopyrightText: 2020 C. Ramakrishnan / Illposed Software
2+
// SPDX-FileCopyrightText: 2021 Robin Vobruba <hoijui.quaero@gmail.com>
3+
//
4+
// SPDX-License-Identifier: BSD-3-Clause
5+
6+
package com.illposed.osc.transport.udp;
7+
8+
import com.illposed.osc.OSCSerializeException;
9+
import com.illposed.osc.transport.OSCPort;
10+
11+
import java.io.IOException;
12+
import java.net.Inet4Address;
13+
import java.net.Inet6Address;
14+
import java.net.InetAddress;
15+
import java.net.InetSocketAddress;
16+
import java.net.SocketAddress;
17+
import java.net.StandardProtocolFamily;
18+
import java.net.StandardSocketOptions;
19+
import java.nio.ByteBuffer;
20+
import java.nio.channels.DatagramChannel;
21+
import java.util.Random;
22+
23+
import org.junit.jupiter.api.Assertions;
24+
import org.junit.jupiter.api.Test;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
public class DatagramChannelTest {
29+
30+
private static final long WAIT_FOR_SOCKET_CLOSE = 30;
31+
32+
private final Logger log = LoggerFactory.getLogger(DatagramChannelTest.class);
33+
34+
@Test
35+
public void readWriteReadData400() throws Exception {
36+
readWriteReadData(400);
37+
}
38+
39+
@Test
40+
public void readWriteReadData600() throws Exception {
41+
// common minimal maximum UDP buffer size (MTU) is 5xx Bytes
42+
readWriteReadData(600);
43+
}
44+
45+
@Test
46+
public void readWriteReadData1400() throws Exception {
47+
readWriteReadData(1400);
48+
}
49+
50+
@Test
51+
public void readWriteReadData2000() throws Exception {
52+
// default maximum UDP buffer size (MTU) is ~1500 Bytes
53+
readWriteReadData(2000);
54+
}
55+
56+
@Test
57+
public void readWriteReadData50000() throws Exception {
58+
readWriteReadData(50000);
59+
}
60+
61+
@Test
62+
public void readWriteReadData70000() throws Exception {
63+
// theoretical maximum UDP buffer size (MTU) is 2^16 - 1 = 65535 Bytes
64+
65+
Assertions.assertThrows(
66+
IOException.class,
67+
() -> readWriteReadData(70000)
68+
);
69+
}
70+
71+
private void readWriteReadData(final int sizeInBytes)
72+
throws Exception
73+
{
74+
final int portSender = 6666;
75+
final int portReceiver = 7777;
76+
77+
final SocketAddress senderSocket = new InetSocketAddress(InetAddress.getLocalHost(), portSender);
78+
final SocketAddress receiverSocket = new InetSocketAddress(InetAddress.getLocalHost(), portReceiver);
79+
80+
81+
DatagramChannel senderChannel = null;
82+
DatagramChannel receiverChannel = null;
83+
try {
84+
senderChannel = DatagramChannel.open();
85+
senderChannel.socket().bind(senderSocket);
86+
senderChannel.socket().setReuseAddress(true);
87+
senderChannel.socket().setSendBufferSize(UDPTransport.BUFFER_SIZE);
88+
89+
receiverChannel = DatagramChannel.open();
90+
receiverChannel.socket().bind(receiverSocket);
91+
receiverChannel.socket().setReuseAddress(true);
92+
93+
senderChannel.connect(receiverSocket);
94+
receiverChannel.connect(senderSocket);
95+
96+
final byte[] sourceArray = new byte[sizeInBytes];
97+
final byte[] targetArray = new byte[sizeInBytes];
98+
99+
new Random().nextBytes(sourceArray);
100+
101+
readWriteReadData(senderChannel, sourceArray, receiverChannel, targetArray, sizeInBytes);
102+
} finally {
103+
if (receiverChannel != null) {
104+
try {
105+
receiverChannel.close();
106+
} catch (final IOException ex) {
107+
log.error("Failed to close test OSC in channel", ex);
108+
}
109+
}
110+
if (senderChannel != null) {
111+
try {
112+
senderChannel.close();
113+
} catch (final IOException ex) {
114+
log.error("Failed to close test OSC out channel", ex);
115+
}
116+
}
117+
118+
// wait a bit after closing the receiver,
119+
// because (some) operating systems need some time
120+
// to actually close the underlying socket
121+
Thread.sleep(WAIT_FOR_SOCKET_CLOSE);
122+
}
123+
}
124+
125+
private void readWriteReadData(
126+
final DatagramChannel sender,
127+
final byte[] sourceArray,
128+
final DatagramChannel receiver,
129+
byte[] targetArray,
130+
final int dataSize)
131+
throws IOException
132+
{
133+
// write
134+
final ByteBuffer sourceBuf = ByteBuffer.wrap(sourceArray);
135+
Assertions.assertEquals(dataSize, sender.write(sourceBuf));
136+
137+
// read
138+
final ByteBuffer targetBuf = ByteBuffer.wrap(targetArray);
139+
140+
int count;
141+
int total = 0;
142+
final long beginTime = System.currentTimeMillis();
143+
while ((total < dataSize) && (((count = receiver.read(targetBuf))) != -1)) {
144+
total = total + count;
145+
// 3s timeout to avoid dead loop
146+
if ((System.currentTimeMillis() - beginTime) > 3000) {
147+
break;
148+
}
149+
}
150+
151+
Assertions.assertEquals(dataSize, total);
152+
Assertions.assertEquals(targetBuf.position(), total);
153+
targetBuf.flip();
154+
targetArray = targetBuf.array();
155+
for (int i = 0; i < targetArray.length; i++) {
156+
Assertions.assertEquals(sourceArray[i], targetArray[i]);
157+
}
158+
}
159+
160+
@Test
161+
public void testBindChannel() throws Exception {
162+
final InetSocketAddress bindAddress = new InetSocketAddress(OSCPort.defaultSCOSCPort());
163+
164+
final DatagramChannel channel;
165+
if (bindAddress.getAddress() instanceof Inet4Address) {
166+
channel = DatagramChannel.open(StandardProtocolFamily.INET);
167+
} else if (bindAddress.getAddress() instanceof Inet6Address) {
168+
channel = DatagramChannel.open(StandardProtocolFamily.INET6);
169+
} else {
170+
throw new IllegalArgumentException(
171+
"Unknown address type: "
172+
+ bindAddress.getAddress().getClass().getCanonicalName());
173+
}
174+
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
175+
channel.socket().bind(bindAddress);
176+
177+
Assertions.assertEquals(bindAddress, channel.getLocalAddress());
178+
}
179+
}

0 commit comments

Comments
 (0)