From 809eed2ddbf6237c2f45a453622659ea5b0366e1 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Fri, 28 Mar 2025 09:44:32 +0100 Subject: [PATCH 1/4] Change package imports in CustomClusterManagers --- src/custom_cluster_managers.jl | 40 +++++++++++++++++----------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/custom_cluster_managers.jl b/src/custom_cluster_managers.jl index f93e5d8..5636fe7 100644 --- a/src/custom_cluster_managers.jl +++ b/src/custom_cluster_managers.jl @@ -7,11 +7,11 @@ module CustomClusterManagers # ================================================================== -using Distributed -using Sockets -using Pkg +import Distributed +import Sockets +import Pkg -import Distributed: launch, manage, kill, init_worker, connect +using Distributed: launch, manage, kill, init_worker, connect # ================================================================== @@ -23,9 +23,9 @@ export ElasticManager, elastic_worker const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN -struct ElasticManager <: ClusterManager - active::Dict{Int, WorkerConfig} # active workers - pending::Channel{TCPSocket} # to be added workers +struct ElasticManager <: Distributed.ClusterManager + active::Dict{Int, Distributed.WorkerConfig} # active workers + pending::Channel{Sockets.TCPSocket} # to be added workers terminated::Set{Int} # terminated worker ids topology::Symbol sockname @@ -37,24 +37,24 @@ struct ElasticManager <: ClusterManager topology=:all_to_all, manage_callback=elastic_no_op_callback, printing_kwargs=() ) Distributed.init_multi() - cookie !== nothing && cluster_cookie(cookie) + cookie !== nothing && Distributed.cluster_cookie(cookie) # Automatically check for the IP address of the local machine if addr == :auto try - addr = Sockets.getipaddr(IPv4) + addr = Sockets.getipaddr(Sockets.IPv4) catch error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.") end end - l_sock = listen(addr, port) + l_sock = Distributed.listen(addr, port) - lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), manage_callback, printing_kwargs) + lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), manage_callback, printing_kwargs) @async begin while true - let s = accept(l_sock) + let s = Sockets.accept(l_sock) @async process_worker_conn(lman, s) end end @@ -72,10 +72,10 @@ ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cooki elastic_no_op_callback(::ElasticManager, ::Integer, ::Symbol) = nothing -function process_worker_conn(mgr::ElasticManager, s::TCPSocket) +function process_worker_conn(mgr::ElasticManager, s::Sockets.TCPSocket) @debug "ElasticManager got new worker connection" # Socket is the worker's STDOUT - wc = WorkerConfig() + wc = Distributed.WorkerConfig() wc.io = s # Validate cookie @@ -83,7 +83,7 @@ function process_worker_conn(mgr::ElasticManager, s::TCPSocket) if length(cookie) < HDR_COOKIE_LEN error("Cookie read failed. Connection closed by peer.") end - self_cookie = cluster_cookie() + self_cookie = Distributed.cluster_cookie() for i in 1:HDR_COOKIE_LEN if UInt8(self_cookie[i]) != cookie[i] println(i, " ", self_cookie[i], " ", cookie[i]) @@ -98,7 +98,7 @@ function process_pending_connections(mgr::ElasticManager) while true wait(mgr.pending) try - addprocs(mgr; topology=mgr.topology) + Distributed.addprocs(mgr; topology=mgr.topology) catch e showerror(stderr, e) Base.show_backtrace(stderr, Base.catch_backtrace()) @@ -106,11 +106,11 @@ function process_pending_connections(mgr::ElasticManager) end end -function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) +function Distributed.launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) # The workers have already been started. while isready(mgr.pending) @debug "ElasticManager.launch new worker" - wc=WorkerConfig() + wc=Distributed.WorkerConfig() wc.io = take!(mgr.pending) push!(launched, wc) end @@ -118,7 +118,7 @@ function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition notify(c) end -function manage(mgr::ElasticManager, id::Integer, config::WorkerConfig, op::Symbol) +function Distributed.manage(mgr::ElasticManager, id::Integer, config::Distributed.WorkerConfig, op::Symbol) if op == :register @debug "ElasticManager registering process id $id" mgr.active[id] = config @@ -173,7 +173,7 @@ function elastic_worker( c = connect(addr, port) write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN]) stdout_to_master && redirect_stdout(c) - start_worker(c, cookie) + Distributed.start_worker(c, cookie) end From 3f0c358384ba180853c2f8651b8617b0ec30013e Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Fri, 28 Mar 2025 10:02:53 +0100 Subject: [PATCH 2/4] Apply improvements from package ElasticClusterManager to CustomClusterManagers --- src/custom_cluster_managers.jl | 40 ++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/src/custom_cluster_managers.jl b/src/custom_cluster_managers.jl index 5636fe7..fe520e4 100644 --- a/src/custom_cluster_managers.jl +++ b/src/custom_cluster_managers.jl @@ -14,15 +14,22 @@ import Pkg using Distributed: launch, manage, kill, init_worker, connect # ================================================================== +export ElasticManager, elastic_worker + # The master process listens on a well-known port # Launched workers connect to the master and redirect their STDOUTs to the same # Workers can join and leave the cluster on demand. -export ElasticManager, elastic_worker - const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN +@static if Base.VERSION >= v"1.7-" + # Base.errormonitor() is only available in Julia 1.7+ + my_errormonitor(t) = Base.errormonitor(t) +else + my_errormonitor(t) = nothing +end + struct ElasticManager <: Distributed.ClusterManager active::Dict{Int, Distributed.WorkerConfig} # active workers pending::Channel{Sockets.TCPSocket} # to be added workers @@ -47,20 +54,23 @@ struct ElasticManager <: Distributed.ClusterManager error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.") end end - + l_sock = Distributed.listen(addr, port) lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), manage_callback, printing_kwargs) - @async begin + t1 = @async begin while true let s = Sockets.accept(l_sock) - @async process_worker_conn(lman, s) + t2 = @async process_worker_conn(lman, s) + my_errormonitor(t2) end end end + my_errormonitor(t1) - @async process_pending_connections(lman) + t3 = @async process_pending_connections(lman) + my_errormonitor(t3) lman end @@ -153,7 +163,7 @@ function Base.show(io::IO, mgr::ElasticManager) println(iob, " Worker connect command : ") print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...)) - + print(io, String(take!(iob))) end @@ -176,5 +186,21 @@ function elastic_worker( Distributed.start_worker(c, cookie) end +function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true, exeflags::Tuple=()) + ip = string(em.sockname[1]) + port = convert(Int,em.sockname[2]) + cookie = Distributed.cluster_cookie() + exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia" + project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : () + + join([ + exename, + exeflags..., + project..., + "-e 'import ElasticClusterManager; ElasticClusterManager.elastic_worker(\"$cookie\",\"$ip\",$port)'" + ]," ") + +end + end # module CustomClusterManagers From cd5c63d3bac1acfd210c7b513d2ebc3e51f8e185 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Fri, 28 Mar 2025 10:15:15 +0100 Subject: [PATCH 3/4] Use package ElasticClusterManager, remove CustomClusterManagers Improvements in internal module CustomClusterManagers have been ported to package ElasticClusterManager. --- Project.toml | 4 +- src/ParallelProcessingTools.jl | 13 +-- src/custom_cluster_managers.jl | 206 --------------------------------- src/runworkers.jl | 4 +- 4 files changed, 10 insertions(+), 217 deletions(-) delete mode 100644 src/custom_cluster_managers.jl diff --git a/Project.toml b/Project.toml index f3bdee7..7ba9b8d 100644 --- a/Project.toml +++ b/Project.toml @@ -4,7 +4,7 @@ version = "0.4.7" [deps] ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197" -ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" +ElasticClusterManager = "547eee1f-27c8-4193-bfd6-9e092c8e3331" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" @@ -21,7 +21,7 @@ ParallelProcessingToolsThreadPinningExt = "ThreadPinning" [compat] ArgCheck = "1, 2" -ClusterManagers = "0.4.6, 1" +ElasticClusterManager = "2" Distributed = "1" LinearAlgebra = "1" Logging = "1" diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 21c49cf..210fc59 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -12,19 +12,18 @@ import Pkg import Random # Required by ThreadPinning extention import Sockets -import ClusterManagers - using Base: Process using Logging: @logmsg, LogLevel, Info, Debug using ArgCheck: @argcheck using Parameters: @with_kw -# # ToDo: Remove CustomClusterManagers once changes to ElasticManager have -# # have been upstreamed. -#using CustomClusterManagers: ElasticManager -include("custom_cluster_managers.jl") -using .CustomClusterManagers: ElasticManager +using ElasticClusterManager: ElasticManager, elastic_worker + +# For backward compatibility: +module CustomClusterManagers + import ElasticClusterManager.ElasticManager +end include("memory.jl") include("display.jl") diff --git a/src/custom_cluster_managers.jl b/src/custom_cluster_managers.jl deleted file mode 100644 index fe520e4..0000000 --- a/src/custom_cluster_managers.jl +++ /dev/null @@ -1,206 +0,0 @@ -# This code is a modified version of ClusterManagers.ElasticManager, both -# original code and modifications are licensed under the MIT License (MIT): -# https://github.com/JuliaParallel/ClusterManagers.jl/blob/master/LICENSE.md - -# Modifications are planned to be upstreamed, once tested in the field. - -module CustomClusterManagers - -# ================================================================== -import Distributed -import Sockets -import Pkg - -using Distributed: launch, manage, kill, init_worker, connect -# ================================================================== - -export ElasticManager, elastic_worker - - -# The master process listens on a well-known port -# Launched workers connect to the master and redirect their STDOUTs to the same -# Workers can join and leave the cluster on demand. - -const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN - -@static if Base.VERSION >= v"1.7-" - # Base.errormonitor() is only available in Julia 1.7+ - my_errormonitor(t) = Base.errormonitor(t) -else - my_errormonitor(t) = nothing -end - -struct ElasticManager <: Distributed.ClusterManager - active::Dict{Int, Distributed.WorkerConfig} # active workers - pending::Channel{Sockets.TCPSocket} # to be added workers - terminated::Set{Int} # terminated worker ids - topology::Symbol - sockname - manage_callback - printing_kwargs - - function ElasticManager(; - addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, - topology=:all_to_all, manage_callback=elastic_no_op_callback, printing_kwargs=() - ) - Distributed.init_multi() - cookie !== nothing && Distributed.cluster_cookie(cookie) - - # Automatically check for the IP address of the local machine - if addr == :auto - try - addr = Sockets.getipaddr(Sockets.IPv4) - catch - error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.") - end - end - - l_sock = Distributed.listen(addr, port) - - lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), manage_callback, printing_kwargs) - - t1 = @async begin - while true - let s = Sockets.accept(l_sock) - t2 = @async process_worker_conn(lman, s) - my_errormonitor(t2) - end - end - end - my_errormonitor(t1) - - t3 = @async process_pending_connections(lman) - my_errormonitor(t3) - - lman - end -end - -ElasticManager(port) = ElasticManager(;port=port) -ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port) -ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie) - -elastic_no_op_callback(::ElasticManager, ::Integer, ::Symbol) = nothing - -function process_worker_conn(mgr::ElasticManager, s::Sockets.TCPSocket) - @debug "ElasticManager got new worker connection" - # Socket is the worker's STDOUT - wc = Distributed.WorkerConfig() - wc.io = s - - # Validate cookie - cookie = read(s, HDR_COOKIE_LEN) - if length(cookie) < HDR_COOKIE_LEN - error("Cookie read failed. Connection closed by peer.") - end - self_cookie = Distributed.cluster_cookie() - for i in 1:HDR_COOKIE_LEN - if UInt8(self_cookie[i]) != cookie[i] - println(i, " ", self_cookie[i], " ", cookie[i]) - error("Invalid cookie sent by remote worker.") - end - end - - put!(mgr.pending, s) -end - -function process_pending_connections(mgr::ElasticManager) - while true - wait(mgr.pending) - try - Distributed.addprocs(mgr; topology=mgr.topology) - catch e - showerror(stderr, e) - Base.show_backtrace(stderr, Base.catch_backtrace()) - end - end -end - -function Distributed.launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) - # The workers have already been started. - while isready(mgr.pending) - @debug "ElasticManager.launch new worker" - wc=Distributed.WorkerConfig() - wc.io = take!(mgr.pending) - push!(launched, wc) - end - - notify(c) -end - -function Distributed.manage(mgr::ElasticManager, id::Integer, config::Distributed.WorkerConfig, op::Symbol) - if op == :register - @debug "ElasticManager registering process id $id" - mgr.active[id] = config - mgr.manage_callback(mgr, id, op) - elseif op == :deregister - @debug "ElasticManager deregistering process id $id" - mgr.manage_callback(mgr, id, op) - delete!(mgr.active, id) - push!(mgr.terminated, id) - end -end - -function Base.show(io::IO, mgr::ElasticManager) - iob = IOBuffer() - - println(iob, "ElasticManager:") - print(iob, " Active workers : [ ") - for id in sort(collect(keys(mgr.active))) - print(iob, id, ",") - end - seek(iob, position(iob)-1) - println(iob, "]") - - println(iob, " Number of workers to be added : ", Base.n_avail(mgr.pending)) - - print(iob, " Terminated workers : [ ") - for id in sort(collect(mgr.terminated)) - print(iob, id, ",") - end - seek(iob, position(iob)-1) - println(iob, "]") - - println(iob, " Worker connect command : ") - print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...)) - - print(io, String(take!(iob))) -end - -# Does not return. If executing from a REPL try -# @async elastic_worker(.....) -# addr, port that a ElasticManager on the master processes is listening on. -function elastic_worker( - cookie::AbstractString, addr::AbstractString="127.0.0.1", port::Integer = 9009; - stdout_to_master::Bool = true, - Base.@nospecialize(env::AbstractVector = [],) -) - @debug "ElasticManager.elastic_worker(cookie, $addr, $port; stdout_to_master=$stdout_to_master, env=$env)" - for (k, v) in env - ENV[k] = v - end - - c = connect(addr, port) - write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN]) - stdout_to_master && redirect_stdout(c) - Distributed.start_worker(c, cookie) -end - -function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true, exeflags::Tuple=()) - ip = string(em.sockname[1]) - port = convert(Int,em.sockname[2]) - cookie = Distributed.cluster_cookie() - exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia" - project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : () - - join([ - exename, - exeflags..., - project..., - "-e 'import ElasticClusterManager; ElasticClusterManager.elastic_worker(\"$cookie\",\"$ip\",$port)'" - ]," ") - -end - - -end # module CustomClusterManagers diff --git a/src/runworkers.jl b/src/runworkers.jl index ae17c31..e45eb7b 100644 --- a/src/runworkers.jl +++ b/src/runworkers.jl @@ -141,7 +141,7 @@ function ppt_cluster_manager() end """ - ParallelProcessingTools.ppt_cluster_manager!(manager::CustomClusterManagers.ElasticManager) + ParallelProcessingTools.ppt_cluster_manager!(manager::ElasticClusterManager.ElasticManager) Set the default ParallelProcessingTools cluster manager. """ @@ -260,7 +260,7 @@ function _elastic_worker_startjl( socket_name = manager.sockname address = string(socket_name[1]) port = convert(Int, socket_name[2]) - """import ParallelProcessingTools; ParallelProcessingTools.CustomClusterManagers.elastic_worker("$cookie", "$address", $port, stdout_to_master=$redirect_output, env=$env_vec)""" + """import ParallelProcessingTools; ParallelProcessingTools.elastic_worker("$cookie", "$address", $port, forward_stdout=$redirect_output, env=$env_vec)""" end const _default_addprocs_params = Distributed.default_addprocs_params() From 893f0813ad245954629f35ff7d8aed973e3c7dec Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Tue, 8 Apr 2025 19:52:20 +0200 Subject: [PATCH 4/4] Increase package version to v0.4.8 --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 7ba9b8d..58f1d1b 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "ParallelProcessingTools" uuid = "8e8a01fc-6193-5ca1-a2f1-20776dae4199" -version = "0.4.7" +version = "0.4.8" [deps] ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197"