Skip to content

Commit 13ce55b

Browse files
authored
Merge pull request #9 from JuliaParallel/default-interface
Default interfaces and other things
2 parents 2e52ffe + 915eb61 commit 13ce55b

File tree

7 files changed

+196
-17
lines changed

7 files changed

+196
-17
lines changed

docs/src/_changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,7 @@ This documents notable changes in DistributedNext.jl. The format is based on
1818
### Changed
1919
- Added a `project` argument to [`addprocs(::AbstractVector)`](@ref) to specify
2020
the project of a remote worker ([#2]).
21+
- Workers will now attempt to pick the fastest available interface to
22+
communicate over ([#9]).
23+
- The `SSHManager` now passes all `JULIA_*` environment variables by default to
24+
the workers, instead of only `JULIA_WORKER_TIMEOUT` ([#9]).

src/DistributedNext.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ end
102102
hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))
103103
==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id)
104104

105+
include("network_interfaces.jl")
105106
include("clusterserialize.jl")
106107
include("cluster.jl") # cluster setup and management, addprocs
107108
include("messages.jl")

src/cluster.jl

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ end
214214
mutable struct LocalProcess
215215
id::Int
216216
bind_addr::String
217-
bind_port::UInt16
217+
bind_port::Int
218218
cookie::String
219219
LocalProcess() = new(1)
220220
end
@@ -237,6 +237,11 @@ The function reads the cookie from stdin if required, and listens on a free por
237237
tasks to process incoming TCP connections and requests. It also (optionally)
238238
closes stdin and redirects stderr to stdout.
239239
240+
If a specific interface is not specified through `--bind-to` it will make a
241+
best-effort attempt to pick the fastest available network interface to listen
242+
on. The heuristics it uses for this depend on the system configuration and
243+
should not be relied upon to always pick the fastest interface.
244+
240245
It does not return.
241246
"""
242247
start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...)
@@ -253,8 +258,8 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
253258
interface = IPv4(LPROC.bind_addr)
254259
if LPROC.bind_port == 0
255260
port_hint = 9000 + (getpid() % 1000)
256-
(port, sock) = listenany(interface, UInt16(port_hint))
257-
LPROC.bind_port = port
261+
(port, sock) = listenany(interface, port_hint)
262+
LPROC.bind_port = Int(port)
258263
else
259264
sock = listen(interface, LPROC.bind_port)
260265
end
@@ -263,7 +268,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
263268
process_messages(client, client, true)
264269
end)
265270
print(out, "julia_worker:") # print header
266-
print(out, "$(string(LPROC.bind_port))#") # print port
271+
print(out, "$(LPROC.bind_port)#") # print port
267272
print(out, LPROC.bind_addr)
268273
print(out, '\n')
269274
flush(out)
@@ -1308,17 +1313,33 @@ function init_bind_addr()
13081313
end
13091314
else
13101315
bind_port = 0
1311-
try
1312-
bind_addr = string(getipaddr())
1313-
catch
1314-
# All networking is unavailable, initialize bind_addr to the loopback address
1315-
# Will cause an exception to be raised only when used.
1316+
1317+
interfaces = _get_interfaces(IPv4)
1318+
if isempty(interfaces)
1319+
# Include IPv6 interfaces if there are no IPv4 ones
1320+
interfaces = _get_interfaces()
1321+
end
1322+
1323+
if isempty(interfaces)
1324+
# All networking is unavailable, initialize bind_addr to the loopback address.
1325+
# An exception will be raised later if even that is unavailable.
13161326
bind_addr = "127.0.0.1"
1327+
else
1328+
# Pick the interface with the highest negotiated speed, if any
1329+
interfaces_with_speed = filter(x -> !isnothing(x.speed), interfaces)
1330+
if isempty(interfaces_with_speed)
1331+
# If none of them report speed just pick the first one
1332+
bind_addr = string(interfaces[1].ip)
1333+
else
1334+
idx = findmax(x -> x.speed, interfaces)[2]
1335+
bind_addr = string(interfaces[idx].ip)
1336+
end
13171337
end
13181338
end
1339+
13191340
global LPROC
13201341
LPROC.bind_addr = bind_addr
1321-
LPROC.bind_port = UInt16(bind_port)
1342+
LPROC.bind_port = bind_port
13221343
end
13231344

13241345
using Random: randstring

src/managers.jl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,8 @@ addprocs([
138138
139139
* `env`: provide an array of string pairs such as
140140
`env=["JULIA_DEPOT_PATH"=>"/depot"]` to request that environment variables
141-
are set on the remote machine. By default only the environment variable
142-
`JULIA_WORKER_TIMEOUT` is passed automatically from the local to the remote
143-
environment.
141+
are set on the remote machine. By default all `JULIA_*` environment variables
142+
are passed automatically from the local to the remote environment.
144143
145144
* `cmdline_cookie`: pass the authentication cookie via the `--worker` commandline
146145
option. The (more secure) default behaviour of passing the cookie via ssh stdio
@@ -176,7 +175,7 @@ default_addprocs_params(::SSHManager) =
176175
:tunnel => false,
177176
:multiplex => false,
178177
:max_parallel => 10,
179-
:project => Base.current_project()))
178+
:project => Base.active_project()))
180179

181180
function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition)
182181
# Launch one worker on each unique host in parallel. Additional workers are launched later.
@@ -186,7 +185,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
186185
@async try
187186
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
188187
catch e
189-
print(stderr, "exception launching on machine $(machine) : $(e)\n")
188+
@error "Exception launching on machine $(machine)" exception=(e, catch_backtrace())
190189
end
191190
end
192191
end
@@ -294,8 +293,9 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa
294293
# Build up the ssh command
295294

296295
# pass on some environment variables by default
297-
for var in ["JULIA_WORKER_TIMEOUT"]
298-
if !haskey(env, var) && haskey(ENV, var)
296+
julia_vars = filter(startswith("JULIA_"), keys(ENV))
297+
for var in julia_vars
298+
if !haskey(env, var)
299299
env[var] = ENV[var]
300300
end
301301
end

src/network_interfaces.jl

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
## This is a minimal version of NetworkInterfaceControllers.jl, licensed under MIT
2+
3+
# uv_interface_address_t has a few fields, but we don't support accessing all of
4+
# them because `name` is the first field and it's a pointer:
5+
# https://docs.libuv.org/en/v1.x/misc.html#c.uv_interface_address_t
6+
#
7+
# To safely access the other fields we would have to account for their
8+
# offset changing on 32/64bit platforms, which we are too lazy to do (and
9+
# don't need anyway since we only want the name).
10+
const uv_interface_address_t = Cvoid
11+
12+
const sizeof_uv_interface_address_t = @ccall jl_uv_sizeof_interface_address()::Cint
13+
14+
function uv_interface_addresses(addresses, count)
15+
@ccall jl_uv_interface_addresses(addresses::Ptr{Ptr{uv_interface_address_t}}, count::Ptr{Cint})::Cint
16+
end
17+
18+
function uv_free_interface_addresses(addresses, count)
19+
@ccall uv_free_interface_addresses(addresses::Ptr{uv_interface_address_t}, count::Cint)::Cvoid
20+
end
21+
22+
function _next(r::Base.RefValue{Ptr{uv_interface_address_t}})
23+
next_addr = r[] + sizeof_uv_interface_address_t
24+
Ref(Ptr{uv_interface_address_t}(next_addr))
25+
end
26+
27+
_is_loopback(addr) = 1 == @ccall jl_uv_interface_address_is_internal(addr::Ptr{uv_interface_address_t})::Cint
28+
29+
_sockaddr(addr) = @ccall jl_uv_interface_address_sockaddr(addr::Ptr{uv_interface_address_t})::Ptr{Cvoid}
30+
31+
_sockaddr_is_ip4(sockaddr::Ptr{Cvoid}) = 1 == @ccall jl_sockaddr_is_ip4(sockaddr::Ptr{Cvoid})::Cint
32+
33+
_sockaddr_is_ip6(sockaddr::Ptr{Cvoid}) = 1 == @ccall jl_sockaddr_is_ip6(sockaddr::Ptr{Cvoid})::Cint
34+
35+
_sockaddr_to_ip4(sockaddr::Ptr{Cvoid}) = IPv4(ntoh(@ccall jl_sockaddr_host4(sockaddr::Ptr{Cvoid})::Cuint))
36+
37+
function _sockaddr_to_ip6(sockaddr::Ptr{Cvoid})
38+
addr6 = Ref{UInt128}()
39+
@ccall jl_sockaddr_host6(sockaddr::Ptr{Cvoid}, addr6::Ptr{UInt128})::Cuint
40+
IPv6(ntoh(addr6[]))
41+
end
42+
43+
# Define a selection of hardware types that we're interested in. Values taken from:
44+
# https://github.com/torvalds/linux/blob/28eb75e178d389d325f1666e422bc13bbbb9804c/include/uapi/linux/if_arp.h#L29
45+
@enum ARPHardware begin
46+
ARPHardware_Ethernet = 1
47+
ARPHardware_Infiniband = 32
48+
ARPHardware_Loopback = 772
49+
end
50+
51+
struct Interface
52+
name::String
53+
version::Symbol
54+
ip::IPAddr
55+
56+
# These two fields are taken from the sysfs /type and /speed files if available:
57+
# https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-net
58+
type::Union{ARPHardware, Nothing}
59+
speed::Union{Float64, Nothing}
60+
end
61+
62+
function _get_interfaces(
63+
::Type{T}=IPAddr; loopback::Bool=false
64+
) where T <: IPAddr
65+
addr_ref = Ref{Ptr{uv_interface_address_t}}(C_NULL)
66+
count_ref = Ref{Int32}(1)
67+
68+
err = uv_interface_addresses(addr_ref, count_ref)
69+
if err != 0
70+
error("Call to uv_interface_addresses() to list network interfaces failed: $(err)")
71+
end
72+
73+
interface_data = Interface[]
74+
current_addr = addr_ref
75+
for i = 0:(count_ref[]-1)
76+
# Skip loopback devices, if so required
77+
if (!loopback) && _is_loopback(current_addr[])
78+
# Don't don't forget to iterate the address pointer though!
79+
current_addr = _next(current_addr)
80+
continue
81+
end
82+
83+
# Interface name string. The name is the first field of the struct so we
84+
# just cast the struct pointer to a Ptr{Cstring} and load it.
85+
name_ptr = unsafe_load(Ptr{Cstring}(current_addr[]))
86+
name = unsafe_string(name_ptr)
87+
88+
# Sockaddr used to load IPv4, or IPv6 addresses
89+
sockaddr = _sockaddr(current_addr[])
90+
91+
# Load IP addresses
92+
(ip_type, ip_address) = if IPv4 <: T && _sockaddr_is_ip4(sockaddr)
93+
(:v4, _sockaddr_to_ip4(sockaddr))
94+
elseif IPv6 <: T && _sockaddr_is_ip6(sockaddr)
95+
(:v6, _sockaddr_to_ip6(sockaddr))
96+
else
97+
(:skip, nothing)
98+
end
99+
100+
type = nothing
101+
speed = nothing
102+
103+
@static if Sys.isunix()
104+
# Load sysfs info
105+
sysfs_path = "/sys/class/net/$(name)"
106+
type_path = "$(sysfs_path)/type"
107+
speed_path = "$(sysfs_path)/speed"
108+
109+
if isfile(type_path)
110+
try
111+
type_code = parse(Int, read(type_path, String))
112+
if type_code in Int.(instances(ARPHardware))
113+
type = ARPHardware(type_code)
114+
end
115+
catch
116+
# Do nothing on any failure to read or parse the file
117+
end
118+
end
119+
120+
if isfile(speed_path)
121+
try
122+
reported_speed = parse(Float64, read(speed_path, String))
123+
if reported_speed > 0
124+
speed = reported_speed
125+
end
126+
catch
127+
end
128+
end
129+
end
130+
131+
# Append to data vector and iterate address pointer
132+
if ip_type != :skip
133+
push!(interface_data, Interface(name, ip_type, ip_address, type, speed))
134+
end
135+
current_addr = _next(current_addr)
136+
end
137+
138+
uv_free_interface_addresses(addr_ref[], count_ref[])
139+
140+
return interface_data
141+
end

test/distributed_exec.jl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl"))
1515
id_me = nothing
1616
id_other = nothing
1717

18+
@testset "Network interface info" begin
19+
# Smoke test
20+
@test !isempty(DistributedNext._get_interfaces(; loopback=true))
21+
end
22+
1823
# Test a few "remote" invocations when no workers are present
1924
@testset "Remote invocations with no workers" begin
2025
@test remote(myid)() == 1

test/sshmanager.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ end
7171
test_n_remove_pids(new_pids)
7272
@test :ok == timedwait(()->!issocket(controlpath), 10.0; pollint=0.5)
7373

74+
print("\nTest SSH addprocs() passing environment variables\n")
75+
withenv("JULIA_FOO" => "foo") do
76+
new_pids = addprocs_with_testenv(["localhost"]; sshflags)
77+
@test remotecall_fetch(() -> ENV["JULIA_FOO"], only(new_pids)) == "foo"
78+
test_n_remove_pids(new_pids)
79+
end
80+
7481
print("\nAll supported formats for hostname\n")
7582
h1 = "localhost"
7683
user = ENV["USER"]

0 commit comments

Comments
 (0)