@@ -29,7 +29,13 @@ import cats.syntax.all._
29
29
30
30
import com .comcast .ip4s ._
31
31
32
+ import scala .concurrent .duration ._
33
+
32
34
class UdpSuite extends Fs2Suite with UdpSuitePlatform {
35
+ def sendAndReceive (socket : DatagramSocket [IO ], toSend : Datagram ): IO [Datagram ] =
36
+ socket
37
+ .write(toSend) >> socket.read.timeoutTo(1 .second, IO .defer(sendAndReceive(socket, toSend)))
38
+
33
39
group(" udp" ) {
34
40
test(" echo one" ) {
35
41
val msg = Chunk .array(" Hello, world!" .getBytes)
@@ -38,15 +44,11 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform {
38
44
.flatMap { serverSocket =>
39
45
Stream .eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
40
46
val serverAddress = SocketAddress (ip " 127.0.0.1 " , serverPort)
41
- val server = serverSocket.reads
42
- .evalMap(packet => serverSocket.write(packet))
43
- .drain
44
- val client = Stream .resource(Network [IO ].openDatagramSocket()).flatMap { clientSocket =>
45
- Stream (Datagram (serverAddress, msg))
46
- .through(clientSocket.writes)
47
- .drain ++ Stream .eval(clientSocket.read)
47
+ val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
48
+ val client = Stream .resource(Network [IO ].openDatagramSocket()).evalMap { clientSocket =>
49
+ sendAndReceive(clientSocket, Datagram (serverAddress, msg))
48
50
}
49
- server.mergeHaltBoth(client )
51
+ client.concurrently(server )
50
52
}
51
53
}
52
54
.compile
@@ -69,21 +71,17 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform {
69
71
.flatMap { serverSocket =>
70
72
Stream .eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
71
73
val serverAddress = SocketAddress (ip " 127.0.0.1 " , serverPort)
72
- val server = serverSocket.reads
73
- .evalMap(packet => serverSocket.write(packet))
74
- .drain
74
+ val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
75
75
val client = Stream .resource(Network [IO ].openDatagramSocket()).flatMap { clientSocket =>
76
76
Stream
77
77
.emits(msgs.map(msg => Datagram (serverAddress, msg)))
78
- .flatMap { msg =>
79
- Stream .exec(clientSocket.write(msg)) ++ Stream .eval(clientSocket.read)
80
- }
78
+ .evalMap(msg => sendAndReceive(clientSocket, msg))
81
79
}
82
80
val clients = Stream
83
81
.constant(client)
84
82
.take(numClients.toLong)
85
83
.parJoin(numParallelClients)
86
- server.mergeHaltBoth(clients )
84
+ clients.concurrently(server )
87
85
}
88
86
}
89
87
.compile
@@ -110,15 +108,13 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform {
110
108
.exec(
111
109
v4Interfaces.traverse_(interface => serverSocket.join(groupJoin, interface))
112
110
) ++
113
- serverSocket.reads
114
- .evalMap(packet => serverSocket.write(packet))
115
- .drain
111
+ serverSocket.reads.foreach(packet => serverSocket.write(packet))
116
112
val client = Stream .resource(Network [IO ].openDatagramSocket()).flatMap { clientSocket =>
117
113
Stream (Datagram (SocketAddress (group.address, serverPort), msg))
118
114
.through(clientSocket.writes)
119
115
.drain ++ Stream .eval(clientSocket.read)
120
116
}
121
- server.mergeHaltBoth(client )
117
+ client.concurrently(server )
122
118
}
123
119
}
124
120
.compile
0 commit comments