Skip to content

Commit d329a00

Browse files
incorporating last commits of the original version: af89e6c (Aug 14, 2024) -> 3b889ee (Apr 9, 2024)
1 parent 61bc078 commit d329a00

File tree

7 files changed

+55
-16
lines changed

7 files changed

+55
-16
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ jobs:
3434
- os: macOS-latest
3535
arch: x86
3636
steps:
37-
- uses: actions/checkout@v2
38-
- uses: julia-actions/setup-julia@v1
37+
- uses: actions/checkout@v4
38+
- uses: julia-actions/setup-julia@v2
3939
with:
4040
version: ${{ matrix.version }}
4141
arch: ${{ matrix.arch }}
42-
- uses: actions/cache@v1
42+
- uses: actions/cache@v4
4343
env:
4444
cache-name: cache-artifacts
4545
with:
@@ -54,13 +54,14 @@ jobs:
5454
env:
5555
JULIA_DISTRIBUTED_TESTING_STANDALONE: 1
5656
- uses: julia-actions/julia-processcoverage@v1
57-
- uses: codecov/codecov-action@v1
57+
- uses: codecov/codecov-action@v4
5858
with:
5959
file: lcov.info
60+
token: ${{ secrets.CODECOV_TOKEN }}
6061
docs:
6162
runs-on: ubuntu-latest
6263
steps:
63-
- uses: actions/checkout@v2
64+
- uses: actions/checkout@v4
6465
- uses: julia-actions/setup-julia@latest
6566
with:
6667
# version: '1.6'

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Distributed (with a multiscale parallelism extension)
22

3-
The `Distributed` package provides functionality for creating and controlling multiple Julia processes remotely, and for performing distributed and parallel computing. It uses network sockets or other supported interfaces to communicate between Julia processes, and relies on Julia's `Serialization` stdlib package to transform Julia objects into a format that can be transferred between processes efficiently. It provides a full set of utilities to create and destroy new Julia processes and add them to a "cluster" (a collection of Julia processes connected together), as well as functions to perform Remote Procedure Calls (RPC) between the processes within a cluster. See [`API`](@ref) for details.
3+
The `Distributed` package provides functionality for creating and controlling multiple Julia processes remotely, and for performing distributed and parallel computing. It uses network sockets or other supported interfaces to communicate between Julia processes, and relies on Julia's `Serialization` stdlib package to transform Julia objects into a format that can be transferred between processes efficiently. It provides a full set of utilities to create and destroy new Julia processes and add them to a "cluster" (a collection of Julia processes connected together), as well as functions to perform Remote Procedure Calls (RPC) between the processes within a cluster. See the `API` section for details.
44

55
This package ships as part of the Julia stdlib.
66

@@ -64,5 +64,4 @@ For controlling multiple processes at once:
6464

6565
Julia processes connected with `Distributed` are all assigned a cluster-unique `Int` identifier, starting from `1`. The first Julia process within a cluster is given ID `1`, while other processes added via `addprocs` get incrementing IDs (`2`, `3`, etc.). Functions and macros which communicate from one process to another usually take one or more identifiers to determine which process they target - for example, `remotecall_fetch(myid, 2)` calls `myid()` on process 2.
6666

67-
!!! note
68-
Only process 1 (often called the "head", "primary", or "master") may add or remove processes, and manages the rest of the cluster. Other processes (called "workers" or "worker processes") may still call functions on each other and send and receive data, but `addprocs`/`rmprocs` on worker processes will fail with an error.
67+
**Note:** Only process 1 (often called the "head", "primary", or "master") may add or remove processes, and manages the rest of the cluster. Other processes (called "workers" or "worker processes") may still call functions on each other and send and receive data, but `addprocs`/`rmprocs` on worker processes will fail with an error.

src/Distributed.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ using Base: Process, Semaphore, JLOptions, buffer_writes, @async_unwrap,
1515
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
1616
shell_escape_posixly, shell_escape_csh,
1717
shell_escape_wincmd, escape_microsoft_c_args,
18-
uv_error, something, notnothing, isbuffered, mapany
18+
uv_error, something, notnothing, isbuffered, mapany, SizeUnknown
1919
using Base.Threads: Event
2020

2121
using Serialization, Sockets

src/clusterserialize.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,11 @@ function deserialize_global_from_main(s::ClusterSerializer, sym)
167167
return nothing
168168
end
169169
end
170+
Core.eval(Main, Expr(:global, sym))
170171
if sym_isconst
171172
ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v)
172173
else
173-
setglobal!(Main, sym, v)
174+
invokelatest(setglobal!, Main, sym, v)
174175
end
175176
return nothing
176177
end

src/managers.jl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ addprocs([
111111
version is used on all remote machines because serialization and code distribution might
112112
fail otherwise.
113113
114-
* `exeflags`: additional flags passed to the worker processes.
114+
* `exeflags`: additional flags passed to the worker processes. It can either be a `Cmd`, a `String`
115+
holding one flag, or a collection of strings, with one element per flag.
116+
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.
115117
116118
* `topology`: Specifies how the workers connect to each other. Sending a message between
117119
unconnected workers results in an error.
@@ -770,12 +772,12 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou
770772

771773
# Check to see if our child exited, and if not, send an actual kill signal
772774
if !process_exited(config.process)
773-
@warn("Failed to gracefully kill worker $(pid), sending SIGTERM")
774-
kill(config.process, Base.SIGTERM)
775+
@warn("Failed to gracefully kill worker $(pid), sending SIGQUIT")
776+
kill(config.process, Base.SIGQUIT)
775777

776778
sleep(term_timeout)
777779
if !process_exited(config.process)
778-
@warn("Worker $(pid) ignored SIGTERM, sending SIGKILL")
780+
@warn("Worker $(pid) ignored SIGQUIT, sending SIGKILL")
779781
kill(config.process, Base.SIGKILL)
780782
end
781783
end

src/workerpool.jl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ An `AbstractWorkerPool` should implement:
88
- [`push!`](@ref) - add a new worker to the overall pool (available + busy)
99
- [`put!`](@ref) - put back a worker to the available pool
1010
- [`take!`](@ref) - take a worker from the available pool (to be used for remote function execution)
11+
- [`wait`](@ref) - block until a worker is available
1112
- [`length`](@ref) - number of workers available in the overall pool
1213
- [`isready`](@ref) - return false if a `take!` on the pool would block, else true
1314
@@ -120,6 +121,11 @@ function wp_local_take!(pool::AbstractWorkerPool; role= :default)
120121
return worker
121122
end
122123

124+
function wp_local_wait(pool::AbstractWorkerPool)
125+
wait(pool.channel)
126+
return nothing
127+
end
128+
123129
function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; role= :default, kwargs...)
124130
worker = take!(pool; role=role)
125131
try
@@ -133,7 +139,7 @@ end
133139
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
134140
# it avoids the overhead associated with a local remotecall.
135141

136-
for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int))
142+
for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int), (:wait, Nothing))
137143
func_local = Symbol(string("wp_local_", func))
138144
@eval begin
139145
function ($func)(pool::WorkerPool; role= :default)

test/distributed_exec.jl

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,8 @@ test_iteration(RemoteChannel(() -> Channel(10)), RemoteChannel(() -> Channel(10)
518518
return count
519519
end
520520

521+
@everywhere test_iteration_collect(ch) = length(collect(ch))
522+
521523
@everywhere function test_iteration_put(ch, total)
522524
for i in 1:total
523525
put!(ch, i)
@@ -528,10 +530,16 @@ end
528530
let ch = RemoteChannel(() -> Channel(1))
529531
@async test_iteration_put(ch, 10)
530532
@test 10 == @fetchfrom id_other test_iteration_take(ch)
533+
ch = RemoteChannel(() -> Channel(1))
534+
@async test_iteration_put(ch, 10)
535+
@test 10 == @fetchfrom id_other test_iteration_collect(ch)
531536
# now reverse
532537
ch = RemoteChannel(() -> Channel(1))
533538
@spawnat id_other test_iteration_put(ch, 10)
534539
@test 10 == test_iteration_take(ch)
540+
ch = RemoteChannel(() -> Channel(1))
541+
@spawnat id_other test_iteration_put(ch, 10)
542+
@test 10 == test_iteration_collect(ch)
535543
end
536544

537545
# make sure exceptions propagate when waiting on Tasks
@@ -751,6 +759,28 @@ wp = WorkerPool(workers())
751759
wp = WorkerPool(2:3)
752760
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]
753761

762+
# wait on worker pool
763+
wp = WorkerPool(2:2)
764+
w = take!(wp)
765+
766+
# local call to _wait
767+
@test !isready(wp)
768+
t = @async wait(wp)
769+
@test !istaskdone(t)
770+
put!(wp, w)
771+
status = timedwait(() -> istaskdone(t), 10)
772+
@test status == :ok
773+
774+
# remote call to _wait
775+
take!(wp)
776+
@test !isready(wp)
777+
f = @spawnat w wait(wp)
778+
@test !isready(f)
779+
put!(wp, w)
780+
status = timedwait(() -> isready(f), 10)
781+
@test status == :ok
782+
783+
754784
# CachingPool tests
755785
wp = CachingPool(workers())
756786
@test [1:100...] == pmap(x->x, wp, 1:100)
@@ -2037,7 +2067,7 @@ begin
20372067

20382068
# Next, ensure we get a log message when a worker does not cleanly exit
20392069
w = only(addprocs(1))
2040-
@test_logs (:warn, r"sending SIGTERM") begin
2070+
@test_logs (:warn, r"sending SIGQUIT") begin
20412071
remote_do(w) do
20422072
# Cause the 'exit()' message that `rmprocs()` sends to do nothing
20432073
Core.eval(Base, :(exit() = nothing))

0 commit comments

Comments
 (0)