Skip to content

Commit 9e8a41b

Browse files
authored
Merge pull request #253 from JuliaInterop/multipart
Implement send_multipart() and recv_multipart()
2 parents 66cb119 + ab5a73f commit 9e8a41b

File tree

5 files changed

+77
-14
lines changed

5 files changed

+77
-14
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "ZMQ"
22
uuid = "c2297ded-f4af-51ae-bb23-16f91089e4e1"
3-
version = "1.3.0"
3+
version = "1.4.0"
44

55
[deps]
66
FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee"

docs/src/_changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ CurrentModule = ZMQ
77
This documents notable changes in ZMQ.jl. The format is based on [Keep a
88
Changelog](https://keepachangelog.com).
99

10+
## [v1.4.0] - 2024-11-30
11+
12+
### Added
13+
- Implemented [`send_multipart()`](@ref) and [`recv_multipart()`](@ref) for
14+
working with multipart messages ([#253]).
15+
1016
## [v1.3.0] - 2024-08-03
1117

1218
### Added

docs/src/reference.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ close
2727
bind
2828
connect
2929
recv
30+
recv_multipart
3031
send
32+
send_multipart
3133
```
3234

3335
ZMQ socket types (note: some of these are aliases; e.g. `XREQ = DEALER`):

src/comm.jl

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@ function Sockets.send(f::Function, socket::Socket; more::Bool=false)
6060
send(socket, take!(io); more=more)
6161
end
6262

63+
"""
64+
send_multipart(socket::Socket, parts)
65+
66+
Send a multipart message composed of the elements in `parts`. `parts` may be any
67+
object that supports `getindex()`, `eachindex()`, and `lastindex()`.
68+
"""
69+
function send_multipart(socket::Socket, parts)
70+
for i in eachindex(parts)
71+
is_last = i == lastindex(parts)
72+
send(socket, parts[i]; more=!is_last)
73+
end
74+
end
75+
6376
############################################################################
6477

6578
function _recv!(socket::Socket, zmsg)
@@ -107,3 +120,29 @@ function Sockets.recv(socket::Socket, ::Type{T}) where {T}
107120
close(zmsg)
108121
end
109122
end
123+
124+
# Specialization so that recv(::Socket, ::Message) works
125+
Sockets.recv(socket::Socket, ::Type{Message}) = recv(socket)
126+
127+
"""
128+
recv_multipart(socket::Socket, ::Type{T}) -> Vector{T}
129+
130+
Receive a multipart message of a specific type `T`. This behaves in the same way
131+
as [`recv(::Socket, ::Type)`](@ref).
132+
"""
133+
function recv_multipart(socket::Socket, ::Type{T}) where {T}
134+
parts = T[recv(socket, T)]
135+
while socket.rcvmore
136+
push!(parts, recv(socket, T))
137+
end
138+
139+
return parts
140+
end
141+
142+
"""
143+
recv_multipart(socket::Socket) -> Vector{Message}
144+
145+
Receive a multipart message as a sequence of zero-copy [`Message`](@ref)'s. See
146+
[`recv(::Socket)`](@ref).
147+
"""
148+
recv_multipart(socket::Socket) = recv_multipart(socket, Message)

test/runtests.jl

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,22 @@ end
137137
finalize(m)
138138
end
139139

140+
# Test multipart messages
141+
data = ["foo", "bar", "baz"]
142+
ZMQ.send_multipart(s2, data)
143+
144+
# Test receiving Message's
145+
msgs = ZMQ.recv_multipart(s1)
146+
@test msgs isa Vector{Message}
147+
@test String.(msgs) == data
148+
149+
# Test receiving a specific type
150+
data = Int[1, 2, 3]
151+
ZMQ.send_multipart(s1, data)
152+
msgs = ZMQ.recv_multipart(s2, Int)
153+
@test msgs isa Vector{Int}
154+
@test msgs == data
155+
140156
# ZMQ.close(s1); ZMQ.close(s2) # should happen when context is closed
141157
ZMQ.close(ZMQ._context) # immediately close global context rather than waiting for exit
142158
@test !isopen(s1)
@@ -223,29 +239,29 @@ end
223239
@testset "ZMQ resource management" begin
224240
local leaked_req_socket, leaked_rep_socket
225241
ZMQ.Socket(ZMQ.REQ) do req_socket
226-
leaked_req_socket = req_socket
242+
leaked_req_socket = req_socket
227243

228-
ZMQ.Socket(ZMQ.REP) do rep_socket
229-
leaked_rep_socket = rep_socket
244+
ZMQ.Socket(ZMQ.REP) do rep_socket
245+
leaked_rep_socket = rep_socket
230246

231-
ZMQ.bind(rep_socket, "inproc://tester")
232-
ZMQ.connect(req_socket, "inproc://tester")
247+
ZMQ.bind(rep_socket, "inproc://tester")
248+
ZMQ.connect(req_socket, "inproc://tester")
233249

234-
ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.")
235-
@test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you."
236-
ZMQ.send(rep_socket, "Coming, Mr. Bell.")
237-
@test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell."
238-
end
250+
ZMQ.send(req_socket, "Mr. Watson, come here, I want to see you.")
251+
@test unsafe_string(ZMQ.recv(rep_socket)) == "Mr. Watson, come here, I want to see you."
252+
ZMQ.send(rep_socket, "Coming, Mr. Bell.")
253+
@test unsafe_string(ZMQ.recv(req_socket)) == "Coming, Mr. Bell."
254+
end
239255

240-
@test !ZMQ.isopen(leaked_rep_socket)
256+
@test !ZMQ.isopen(leaked_rep_socket)
241257
end
242258
@test !ZMQ.isopen(leaked_req_socket)
243259

244260
local leaked_ctx
245261
ZMQ.Context() do ctx
246-
leaked_ctx = ctx
262+
leaked_ctx = ctx
247263

248-
@test isopen(ctx)
264+
@test isopen(ctx)
249265
end
250266
@test !isopen(leaked_ctx)
251267
end

0 commit comments

Comments
 (0)