Skip to content

Commit 3a4bdf4

Browse files
committed
add integration tests
1 parent fd96d91 commit 3a4bdf4

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed

test/persistent_workers.jl

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
include("testhelpers/PersistentWorkers.jl")
2+
using .PersistentWorkers
3+
using Test
4+
using Random
5+
using Distributed
6+
7+
@testset "PersistentWorkers.jl" begin
8+
cookie = randstring(16)
9+
port = rand(9128:9999) # TODO: make sure port is available?
10+
worker = run(
11+
`$(Base.julia_exename()) --startup=no --project=$(dirname(@__DIR__)) -L testhelpers/PersistentWorkers.jl
12+
-e "using .PersistentWorkers; wait(start_worker_loop($port; cluster_cookie=$(repr(cookie)))[1])"`;
13+
wait=false)
14+
try
15+
cluster_cookie(cookie)
16+
sleep(1)
17+
18+
p = addprocs(PersistentWorkerManager(port))[]
19+
@test procs() == [1, p]
20+
@test workers() == [p]
21+
@test remotecall_fetch(myid, p) == p
22+
rmprocs(p)
23+
@test procs() == [1]
24+
@test workers() == [1]
25+
@test process_running(worker)
26+
# this shouldn't error
27+
@everywhere 1+1
28+
29+
# try the same thing again for the same worker
30+
p = addprocs(PersistentWorkerManager(port))[]
31+
@test procs() == [1, p]
32+
@test workers() == [p]
33+
@test remotecall_fetch(myid, p) == p
34+
rmprocs(p)
35+
@test procs() == [1]
36+
@test workers() == [1]
37+
@test process_running(worker)
38+
# this shouldn't error
39+
@everywhere 1+1
40+
finally
41+
kill(worker)
42+
end
43+
end

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ if !success(pipeline(cmd; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on
1212
end
1313

1414
include("managers.jl")
15+
include("persistent_workers.jl")

test/testhelpers/PersistentWorkers.jl

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
module PersistentWorkers
2+
3+
using Distributed: Distributed, ClusterManager, WorkerConfig, worker_from_id, set_worker_state, W_TERMINATED
4+
using Sockets: InetAddr, localhost
5+
6+
export PersistentWorkerManager, start_worker_loop
7+
8+
struct PersistentWorkerManager{IP} <: Distributed.ClusterManager
9+
addr::InetAddr{IP}
10+
end
11+
12+
PersistentWorkerManager(host, port::Integer) = PersistentWorkerManager(InetAddr(host, port))
13+
PersistentWorkerManager(port::Integer) = PersistentWorkerManager(localhost, port)
14+
15+
function Distributed.launch(cm::PersistentWorkerManager, ::Dict, launched::Array, launch_ntfy::Base.GenericCondition{Base.AlwaysLockedST})
16+
(; host, port) = cm.addr
17+
wc = WorkerConfig()
18+
wc.io = nothing
19+
wc.host = string(host)
20+
wc.bind_addr = string(host)
21+
wc.port = Int(port)
22+
push!(launched, wc)
23+
notify(launch_ntfy)
24+
return nothing
25+
end
26+
27+
function Distributed.manage(::PersistentWorkerManager, ::Int, ::WorkerConfig, ::Symbol) end
28+
29+
# don't actually kill the worker, just close the streams
30+
function Base.kill(::PersistentWorkerManager, pid::Int, ::WorkerConfig)
31+
w = worker_from_id(pid)
32+
close(w.r_stream)
33+
close(w.w_stream)
34+
set_worker_state(w, W_TERMINATED)
35+
return nothing
36+
end
37+
38+
using Distributed: LPROC, init_worker, process_messages, cluster_cookie
39+
using Sockets: IPAddr, listen, listenany, accept
40+
41+
function start_worker_loop(host::IPAddr, port::Union{Nothing, Integer}; cluster_cookie=cluster_cookie())
42+
init_worker(cluster_cookie)
43+
LPROC.bind_addr = string(host)
44+
if port === nothing
45+
port_hint = 9000 + (getpid() % 1000)
46+
port, sock = listenany(host, UInt16(port_hint))
47+
else
48+
sock = listen(host, port)
49+
end
50+
LPROC.bind_port = port
51+
t = let sock=sock
52+
@async while isopen(sock)
53+
client = accept(sock)
54+
process_messages(client, client, true)
55+
end
56+
end
57+
errormonitor(t)
58+
@info "Listening on $host:$port, cluster_cookie=$cluster_cookie"
59+
return t, host, port
60+
end
61+
62+
function start_worker_loop((; host, port)::InetAddr; cluster_cookie=cluster_cookie())
63+
return start_worker_loop(host, port; cluster_cookie)
64+
end
65+
66+
function start_worker_loop(port::Union{Nothing, Integer}=nothing; cluster_cookie=cluster_cookie())
67+
return start_worker_loop(localhost, port; cluster_cookie)
68+
end
69+
70+
end

0 commit comments

Comments
 (0)