Skip to content

Commit 025e86e

Browse files
authored
Small refactor for Distributed:managers.jl (JuliaLang/julia#34556)
- update `Sockets.bind` to handle client sockets too - use `Sockets.bind` in `Distributed.bind_client_port` - make it compatible to IPv6
1 parent 9e4ba6e commit 025e86e

File tree

3 files changed

+34
-24
lines changed

3 files changed

+34
-24
lines changed

src/managers.jl

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -489,57 +489,52 @@ end
489489

490490
const client_port = Ref{Cushort}(0)
491491

492-
function socket_reuse_port()
492+
function socket_reuse_port(iptype)
493493
if ccall(:jl_has_so_reuseport, Int32, ()) == 1
494-
s = TCPSocket(delay = false)
494+
sock = TCPSocket(delay = false)
495495

496496
# Some systems (e.g. Linux) require the port to be bound before setting REUSEPORT
497497
bind_early = Sys.islinux()
498498

499-
bind_early && bind_client_port(s)
500-
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Cvoid},), s.handle)
499+
bind_early && bind_client_port(sock, iptype)
500+
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Cvoid},), sock.handle)
501501
if rc < 0
502502
# This is an issue only on systems with lots of client connections, hence delay the warning
503503
nworkers() > 128 && @warn "Error trying to reuse client port number, falling back to regular socket" maxlog=1
504504

505505
# provide a clean new socket
506506
return TCPSocket()
507507
end
508-
bind_early || bind_client_port(s)
509-
return s
508+
bind_early || bind_client_port(sock, iptype)
509+
return sock
510510
else
511511
return TCPSocket()
512512
end
513513
end
514514

515-
# TODO: this doesn't belong here, it belongs in Sockets
516-
function bind_client_port(s::TCPSocket)
517-
Sockets.iolock_begin()
518-
@assert s.status == Sockets.StatusInit
519-
host_in = Ref(hton(UInt32(0))) # IPv4 0.0.0.0
520-
err = ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint),
521-
s, hton(client_port[]), host_in, 0, false)
522-
Sockets.iolock_end()
523-
uv_error("tcp_bind", err)
524-
525-
_addr, port = getsockname(s)
515+
function bind_client_port(sock::TCPSocket, iptype)
516+
bind_host = iptype(0)
517+
Sockets.bind(sock, bind_host, client_port[])
518+
_addr, port = getsockname(sock)
526519
client_port[] = port
527-
return s
520+
return sock
528521
end
529522

530523
function connect_to_worker(host::AbstractString, port::Integer)
531-
s = socket_reuse_port()
532-
connect(s, host, UInt16(port))
533-
534524
# Avoid calling getaddrinfo if possible - involves a DNS lookup
535525
# host may be a stringified ipv4 / ipv6 address or a dns name
536526
bind_addr = nothing
537527
try
538-
bind_addr = string(parse(IPAddr,host))
528+
bind_addr = parse(IPAddr,host)
539529
catch
540-
bind_addr = string(getaddrinfo(host))
530+
bind_addr = getaddrinfo(host)
541531
end
542-
(s, bind_addr)
532+
533+
iptype = typeof(bind_addr)
534+
sock = socket_reuse_port(iptype)
535+
connect(sock, bind_addr, UInt16(port))
536+
537+
(sock, string(bind_addr))
543538
end
544539

545540

test/managers.jl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using Test
2+
3+
using Distributed: bind_client_port
4+
using Sockets
5+
6+
sock = bind_client_port(TCPSocket(), typeof(IPv4(0)))
7+
addr, port = getsockname(sock)
8+
@test addr == ip"0.0.0.0"
9+
10+
11+
sock = bind_client_port(TCPSocket(), typeof(IPv6(0)))
12+
addr, port = getsockname(sock)
13+
@test addr == ip"::"

test/runtests.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ cmd = `$test_exename $test_exeflags $disttestfile`
1010
if !success(pipeline(cmd; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on_valgrind,Cint,()) == 0
1111
error("Distributed test failed, cmd : $cmd")
1212
end
13+
14+
include("managers.jl")

0 commit comments

Comments
 (0)