Skip to content

Commit b949759

Browse files
authored
Merge pull request #13 from oschulz/clusters-and-workers
Add flexible cluster and worker management and lots of other stuff
2 parents eb1cd88 + b8ad012 commit b949759

28 files changed

+3206
-627
lines changed

Project.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,30 @@ uuid = "8e8a01fc-6193-5ca1-a2f1-20776dae4199"
33
version = "0.4.3"
44

55
[deps]
6+
ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197"
67
ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e"
78
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
89
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
910
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
1011
Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a"
1112
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
13+
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
1214
ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042"
1315

16+
[weakdeps]
17+
ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042"
18+
19+
[extensions]
20+
ParallelProcessingToolsThreadPinningExt = "ThreadPinning"
21+
1422
[compat]
23+
ArgCheck = "1, 2"
1524
ClusterManagers = "0.4.6"
1625
Distributed = "1"
1726
LinearAlgebra = "1"
1827
Logging = "1"
1928
Parameters = "0.12, 0.13"
2029
Pkg = "1"
30+
Sockets = "1"
2131
ThreadPinning = "0.7.22"
2232
julia = "1.6"

docs/src/index.md

Lines changed: 117 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,56 +3,112 @@
33
This Julia package provides some tools to ease multithreaded and distributed programming.
44

55

6-
## Compute cluster management
6+
## Distributed computing
77

8-
ParallelProcessingTools helps spin-up Julia compute clusters. It currently has support for clusters on localhost and on SLURM (uses `ClusterManagers.ElasticManager` internally).
8+
Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier.
99

10-
On SLURM, `addworkers` will automatically try to perform a sensible thread-pinning (using the [ThreadPinning](https://github.com/carstenbauer/ThreadPinning.jl) package internally).
10+
An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)) or via SLURM ([`SlurmRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).
11+
12+
The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool ([`ppt_worker_pool`](@ref)) of type [`FlexWorkerPool`](@ref) that optionally supports oversubscription. Users can `take!` workers from the pool and `put!` them back, or use [`onworker`](@ref) to send work to workers in the pool without exceeding their maximum occupancy.
13+
14+
Since workers can appear and disappear dynamically, initializing them (loading packages, etc.) via the standard `Distributed.@everywhere` macro is problematic, as workers added afterwards won't be initialized. Parallel processing tools provides the macro [`@always_everywhere`](@ref) to run code globally on all current processes, but also store the code so it can be run again on future new worker processes. Workers that are part of a [`FlexWorkerPool`](@ref) will be updated automatically on `take!` and `onworker`. You can also use [`ensure_procinit`](@ref) to manually update all workers
15+
to all `@always_everywhere` used so far.
16+
17+
The function [`pinthreads_auto`](@ref) (used inside of `@always_everywhere`) provides a convenient way to perform some automatic thread pinning on all processes. Note that it needs to follow an [`import ThreadPinning`](https://github.com/carstenbauer/ThreadPinning.jl/), and that more complex use cased may require customized thread pinning for best performance.
18+
19+
For example:
1120

1221
```julia
22+
ENV["JULIA_DEBUG"] = "ParallelProcessingTools"
23+
ENV["JULIA_WORKER_TIMEOUT"] = "120"
24+
1325
using ParallelProcessingTools, Distributed
1426

1527
@always_everywhere begin
16-
using Distributions
28+
using ParallelProcessingTools
29+
using Statistics
30+
31+
import ThreadPinning
32+
pinthreads_auto()
1733
end
1834

19-
mode = ParallelProcessingTools.SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
20-
#ParallelProcessingTools.worker_start_command(mode)
35+
runmode = OnLocalhost(n = 4)
36+
# runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
37+
38+
display(worker_start_command(runmode))
2139

22-
# Add some workers:
23-
addworkers(mode)
40+
# Add some workers and initialize with all `@always_everywhere` code:
41+
old_nprocs = nprocs()
42+
_, n = runworkers(runmode)
43+
@wait_while nprocs() < old_nprocs + n
44+
ensure_procinit()
2445

25-
# List resources:
26-
ParallelProcessingTools.worker_resources()
46+
# Show worker resources:
47+
pool = ppt_worker_pool()
48+
display(pool)
49+
display(worker_resources())
2750

28-
# Confirm that Distributions is loaded on workers:
51+
# Confirm that Distributions is loaded on a worker:
2952
worker = last(workers())
30-
@fetchfrom worker Normal()
53+
@fetchfrom worker mean(rand(100))
3154

32-
# Add some more workers:
33-
addworkers(mode)
34-
Table(ParallelProcessingTools.worker_resources())
55+
# Some more init code
56+
@always_everywhere begin
57+
X = rand(100)
58+
end
3559

36-
# Add even more workers:
37-
addworkers(mode)
38-
Table(ParallelProcessingTools.worker_resources())
60+
# Add some more workers, we won't run `ensure_procinit()` manually this time:
61+
old_nprocs = nprocs()
62+
_, n = runworkers(runmode)
63+
@wait_while nprocs() < old_nprocs + n
64+
65+
# Worker hasn't run @always_everywhere code yet, so it doesn't have `mean`:
66+
worker = last(workers())
67+
display(@return_exceptions @userfriendly_exceptions begin
68+
@fetchfrom worker mean(X)
69+
end)
70+
71+
# Using `take!` on a `FlexWorkerPool` automatically runs init code as necessary:
72+
pid = take!(pool)
73+
try
74+
remotecall_fetch(() -> mean(X), pid)
75+
finally
76+
put!(pool, pid)
77+
end
78+
79+
# `onworker` (using the default `FlexWorkerPool` here) does the same:
80+
onworker(mean, X)
81+
82+
# If we don't need workers processes for a while, let's stop them:
83+
stopworkers()
3984
```
4085

41-
And we can do SLURM batch scripts like this (e.g. "batchtest.jl"):
86+
We can also use SLURM batch scripts, like this (e.g. "batchtest.jl"):
4287

4388
```julia
44-
#!/usr/bin/env -S julia --project=@SOME_JULIA_ENVIRONMENT --threads=8
45-
#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G
89+
#!/usr/bin/env julia
90+
#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G --time=00:15:00
91+
92+
using Pkg; pkg"activate @SOME_JULIA_ENVIRONMENT"
93+
94+
ENV["JULIA_DEBUG"] = "ParallelProcessingTools"
95+
ENV["JULIA_WORKER_TIMEOUT"] = "120"
4696

4797
using ParallelProcessingTools, Distributed
4898

4999
@always_everywhere begin
50100
using ParallelProcessingTools
101+
import ThreadPinning
102+
pinthreads_auto()
51103
end
52104

53-
addworkers(SlurmRun())
54-
resources = ParallelProcessingTools.worker_resources()
55-
show(stdout, MIME"text/plain"(), ParallelProcessingTools.worker_resources())
105+
_, n = runworkers(SlurmRun(slurm_flags = `--cpu-bind=cores --mem-bind=local`))
106+
@wait_while maxtime=240 nprocs() < n + 1
107+
108+
resources = worker_resources()
109+
display(resources)
110+
111+
stopworkers()
56112
```
57113

58114
This should run with a simple
@@ -61,4 +117,40 @@ This should run with a simple
61117
sbatch -o out.txt batchtest.jl
62118
```
63119

64-
and "out.txt" should then contain a list of the worker resources.
120+
and "out.txt" should then contain debugging output and a list of the worker
121+
resources.
122+
123+
124+
## Multithreading
125+
126+
To test multithreading performance and help debug and optimize multithreaded
127+
code, ParallelProcessingTools provides the utility macros [`@onthreads`](@ref)
128+
to run code explicitly on the selected Julia threads (all threads can be
129+
listed using [`allthreads`](@ref)).
130+
131+
You can use the macro [`@critical`](@ref) to prevent code that may suffer from race conditions in parallel to other code fenced by `@critical`.
132+
133+
The macro [`@mt_out_of_order`](@ref) is useful to run different code on in parallel on Julia threads.
134+
135+
136+
# Waiting and sleeping
137+
138+
In a parallel computing scenario, on threads, distributed processes or both, or when dealing with I/O operations, code often needs to wait. In addition a timeout mechanism is often necessary. Julia's standard `wait` function can only waits a single object without a timeout. (`waitany`, requires Julia >= v1.12, can be used to wait for multiple tasks).
139+
140+
ParallelProcessingTools provides a very flexible macro [`@wait_while`](@ref) to wait for custom conditions with an optional timeout, as well as the functions [`wait_for_all`](@ref) and [`wait_for_any`](@ref) that can wait for different kinds of objects, also with an optional timeout.
141+
142+
The functions [`sleep_ns`](@ref) and [`idle_sleep`](@ref) can be used to implement custom scenarios that require precise sleeping for both very short and long intervals.
143+
144+
145+
# Exception handling
146+
147+
Exceptions throws during remote code execution can be complex, nested and sometimes hard to understand. You can use the functions [`inner_exception`](@ref), [`onlyfirst_exception`](@ref) and [`original_exception`](@ref) to get to the underlying reason of a failure more easily. The macro [`@userfriendly_exceptions`](@ref) automatizes this to some extent for a given piece of code.
148+
149+
To get an exception "in hand" for further analysis, you can use the macro [`@return_exceptions`](@ref) to make (possibly failing) code return the exceptions instead of throwing it.
150+
151+
152+
# File I/O
153+
154+
File handling can become more challenging when working in a parallel and possibly distributed fashion. Code or whole workers can crash, resulting in corrupt files, or workers may become disconnected, but still write files and clash with restarted code (resulting in race conditions and may also result in corrupt files).
155+
156+
ParallelProcessingTools provides the functions [`create_files`](@ref), [`read_files`](@ref) and [`modify_files`](@ref) to implement atomic file operations, on a best-effort basis (depending on the operating system and underlying file systems).
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
module ParallelProcessingToolsThreadPinningExt
2+
3+
import ParallelProcessingTools
4+
import LinearAlgebra
5+
import Distributed
6+
import ThreadPinning
7+
8+
# ThreadPinning.jl does not support all operating systems, currently:
9+
const _threadpinning_supported = isdefined(ThreadPinning, :affinitymask2cpuids)
10+
11+
@static if _threadpinning_supported
12+
13+
14+
function ParallelProcessingTools._pinthreads_auto_impl(::Val{true})
15+
pid = Distributed.myid()
16+
if Distributed.myid() == 1
17+
@debug "On process $pid, leaving Julia threads unpinned"
18+
let n_juliathreads = Threads.nthreads()
19+
if n_juliathreads > 1
20+
LinearAlgebra.BLAS.set_num_threads(n_juliathreads)
21+
end
22+
end
23+
else
24+
@debug "On process $pid, pinning threads according to affinity mask"
25+
let available_cpus = ThreadPinning.affinitymask2cpuids(ThreadPinning.get_affinity_mask())
26+
ThreadPinning.pinthreads(:affinitymask)
27+
LinearAlgebra.BLAS.set_num_threads(length(available_cpus))
28+
end
29+
end
30+
end
31+
32+
33+
ParallelProcessingTools._getcpuids_impl(::Val{true}) = ThreadPinning.getcpuids()
34+
35+
36+
end # if _threadpinning_supported
37+
38+
end # module ChangesOfVariablesInverseFunctionsExt

src/ParallelProcessingTools.jl

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,41 @@ using Distributed
99

1010
import LinearAlgebra
1111
import Pkg
12+
import Sockets
1213

1314
import ClusterManagers
14-
import ThreadPinning
1515

16+
using Base: Process
1617
using Logging: @logmsg, LogLevel, Info, Debug
1718

19+
using ArgCheck: @argcheck
1820
using Parameters: @with_kw
1921

22+
# # ToDo: Remove CustomClusterManagers once changes to ElasticManager have
23+
# # have been upstreamed.
24+
#using CustomClusterManagers: ElasticManager
25+
include("custom_cluster_managers.jl")
26+
using .CustomClusterManagers: ElasticManager
27+
28+
include("display.jl")
29+
include("waiting.jl")
2030
include("exceptions.jl")
31+
include("states.jl")
2132
include("fileio.jl")
2233
include("threadsafe.jl")
2334
include("threadlocal.jl")
2435
include("onthreads.jl")
2536
include("onprocs.jl")
2637
include("workpartition.jl")
27-
include("addworkers.jl")
38+
include("procinit.jl")
39+
include("workerpool.jl")
40+
include("onworkers.jl")
41+
include("runworkers.jl")
2842
include("slurm.jl")
2943
include("deprecated.jl")
3044

45+
@static if !isdefined(Base, :get_extension)
46+
include("../ext/ParallelProcessingToolsThreadPinningExt.jl")
47+
end
48+
3149
end # module

0 commit comments

Comments
 (0)