diff --git a/Project.toml b/Project.toml index f3bdee7..58f1d1b 100644 --- a/Project.toml +++ b/Project.toml @@ -1,10 +1,10 @@ name = "ParallelProcessingTools" uuid = "8e8a01fc-6193-5ca1-a2f1-20776dae4199" -version = "0.4.7" +version = "0.4.8" [deps] ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197" -ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" +ElasticClusterManager = "547eee1f-27c8-4193-bfd6-9e092c8e3331" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" @@ -21,7 +21,7 @@ ParallelProcessingToolsThreadPinningExt = "ThreadPinning" [compat] ArgCheck = "1, 2" -ClusterManagers = "0.4.6, 1" +ElasticClusterManager = "2" Distributed = "1" LinearAlgebra = "1" Logging = "1" diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 21c49cf..210fc59 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -12,19 +12,18 @@ import Pkg import Random # Required by ThreadPinning extention import Sockets -import ClusterManagers - using Base: Process using Logging: @logmsg, LogLevel, Info, Debug using ArgCheck: @argcheck using Parameters: @with_kw -# # ToDo: Remove CustomClusterManagers once changes to ElasticManager have -# # have been upstreamed. -#using CustomClusterManagers: ElasticManager -include("custom_cluster_managers.jl") -using .CustomClusterManagers: ElasticManager +using ElasticClusterManager: ElasticManager, elastic_worker + +# For backward compatibility: +module CustomClusterManagers + import ElasticClusterManager.ElasticManager +end include("memory.jl") include("display.jl") diff --git a/src/custom_cluster_managers.jl b/src/custom_cluster_managers.jl deleted file mode 100644 index f93e5d8..0000000 --- a/src/custom_cluster_managers.jl +++ /dev/null @@ -1,180 +0,0 @@ -# This code is a modified version of ClusterManagers.ElasticManager, both -# original code and modifications are licensed under the MIT License (MIT): -# https://github.com/JuliaParallel/ClusterManagers.jl/blob/master/LICENSE.md - -# Modifications are planned to be upstreamed, once tested in the field. - -module CustomClusterManagers - -# ================================================================== -using Distributed -using Sockets -using Pkg - -import Distributed: launch, manage, kill, init_worker, connect -# ================================================================== - - -# The master process listens on a well-known port -# Launched workers connect to the master and redirect their STDOUTs to the same -# Workers can join and leave the cluster on demand. - -export ElasticManager, elastic_worker - -const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN - -struct ElasticManager <: ClusterManager - active::Dict{Int, WorkerConfig} # active workers - pending::Channel{TCPSocket} # to be added workers - terminated::Set{Int} # terminated worker ids - topology::Symbol - sockname - manage_callback - printing_kwargs - - function ElasticManager(; - addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, - topology=:all_to_all, manage_callback=elastic_no_op_callback, printing_kwargs=() - ) - Distributed.init_multi() - cookie !== nothing && cluster_cookie(cookie) - - # Automatically check for the IP address of the local machine - if addr == :auto - try - addr = Sockets.getipaddr(IPv4) - catch - error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.") - end - end - - l_sock = listen(addr, port) - - lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), manage_callback, printing_kwargs) - - @async begin - while true - let s = accept(l_sock) - @async process_worker_conn(lman, s) - end - end - end - - @async process_pending_connections(lman) - - lman - end -end - -ElasticManager(port) = ElasticManager(;port=port) -ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port) -ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie) - -elastic_no_op_callback(::ElasticManager, ::Integer, ::Symbol) = nothing - -function process_worker_conn(mgr::ElasticManager, s::TCPSocket) - @debug "ElasticManager got new worker connection" - # Socket is the worker's STDOUT - wc = WorkerConfig() - wc.io = s - - # Validate cookie - cookie = read(s, HDR_COOKIE_LEN) - if length(cookie) < HDR_COOKIE_LEN - error("Cookie read failed. Connection closed by peer.") - end - self_cookie = cluster_cookie() - for i in 1:HDR_COOKIE_LEN - if UInt8(self_cookie[i]) != cookie[i] - println(i, " ", self_cookie[i], " ", cookie[i]) - error("Invalid cookie sent by remote worker.") - end - end - - put!(mgr.pending, s) -end - -function process_pending_connections(mgr::ElasticManager) - while true - wait(mgr.pending) - try - addprocs(mgr; topology=mgr.topology) - catch e - showerror(stderr, e) - Base.show_backtrace(stderr, Base.catch_backtrace()) - end - end -end - -function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) - # The workers have already been started. - while isready(mgr.pending) - @debug "ElasticManager.launch new worker" - wc=WorkerConfig() - wc.io = take!(mgr.pending) - push!(launched, wc) - end - - notify(c) -end - -function manage(mgr::ElasticManager, id::Integer, config::WorkerConfig, op::Symbol) - if op == :register - @debug "ElasticManager registering process id $id" - mgr.active[id] = config - mgr.manage_callback(mgr, id, op) - elseif op == :deregister - @debug "ElasticManager deregistering process id $id" - mgr.manage_callback(mgr, id, op) - delete!(mgr.active, id) - push!(mgr.terminated, id) - end -end - -function Base.show(io::IO, mgr::ElasticManager) - iob = IOBuffer() - - println(iob, "ElasticManager:") - print(iob, " Active workers : [ ") - for id in sort(collect(keys(mgr.active))) - print(iob, id, ",") - end - seek(iob, position(iob)-1) - println(iob, "]") - - println(iob, " Number of workers to be added : ", Base.n_avail(mgr.pending)) - - print(iob, " Terminated workers : [ ") - for id in sort(collect(mgr.terminated)) - print(iob, id, ",") - end - seek(iob, position(iob)-1) - println(iob, "]") - - println(iob, " Worker connect command : ") - print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...)) - - print(io, String(take!(iob))) -end - -# Does not return. If executing from a REPL try -# @async elastic_worker(.....) -# addr, port that a ElasticManager on the master processes is listening on. -function elastic_worker( - cookie::AbstractString, addr::AbstractString="127.0.0.1", port::Integer = 9009; - stdout_to_master::Bool = true, - Base.@nospecialize(env::AbstractVector = [],) -) - @debug "ElasticManager.elastic_worker(cookie, $addr, $port; stdout_to_master=$stdout_to_master, env=$env)" - for (k, v) in env - ENV[k] = v - end - - c = connect(addr, port) - write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN]) - stdout_to_master && redirect_stdout(c) - start_worker(c, cookie) -end - - -end # module CustomClusterManagers diff --git a/src/runworkers.jl b/src/runworkers.jl index ae17c31..e45eb7b 100644 --- a/src/runworkers.jl +++ b/src/runworkers.jl @@ -141,7 +141,7 @@ function ppt_cluster_manager() end """ - ParallelProcessingTools.ppt_cluster_manager!(manager::CustomClusterManagers.ElasticManager) + ParallelProcessingTools.ppt_cluster_manager!(manager::ElasticClusterManager.ElasticManager) Set the default ParallelProcessingTools cluster manager. """ @@ -260,7 +260,7 @@ function _elastic_worker_startjl( socket_name = manager.sockname address = string(socket_name[1]) port = convert(Int, socket_name[2]) - """import ParallelProcessingTools; ParallelProcessingTools.CustomClusterManagers.elastic_worker("$cookie", "$address", $port, stdout_to_master=$redirect_output, env=$env_vec)""" + """import ParallelProcessingTools; ParallelProcessingTools.elastic_worker("$cookie", "$address", $port, forward_stdout=$redirect_output, env=$env_vec)""" end const _default_addprocs_params = Distributed.default_addprocs_params()