diff --git a/Project.toml b/Project.toml index bb30760..48d85f2 100644 --- a/Project.toml +++ b/Project.toml @@ -3,6 +3,7 @@ uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b" version = "1.11.0" [deps] +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" diff --git a/README.md b/README.md index da2c75a..427c65c 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ For controlling other processes via RPC: For communicating between processes in the style of a channel or stream: - `RemoteChannel` - a `Channel`-like object that can be `put!` to or `take!` from any process +- `RemoteLogger` - an `AbstractLogger` forwarding logs to a given worker For controlling multiple processes at once: diff --git a/docs/src/index.md b/docs/src/index.md index 22d63ce..88fccd9 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -48,6 +48,7 @@ Distributed.channel_from_id Distributed.worker_id_from_socket Distributed.cluster_cookie() Distributed.cluster_cookie(::Any) +Distributed.RemoteLogger ``` ## Cluster Manager Interface diff --git a/src/Distributed.jl b/src/Distributed.jl index dc2d9d4..b7e8059 100644 --- a/src/Distributed.jl +++ b/src/Distributed.jl @@ -21,6 +21,7 @@ using Base.Threads: Event using Serialization, Sockets import Serialization: serialize, deserialize import Sockets: connect, wait_connected +import Logging # NOTE: clusterserialize.jl imports additional symbols from Serialization for use @@ -60,6 +61,7 @@ export WorkerConfig, RemoteException, ProcessExitedException, + RemoteLogger, process_messages, remoteref_id, @@ -107,6 +109,7 @@ include("messages.jl") include("process_messages.jl") # process incoming messages include("remotecall.jl") # the remotecall* api include("macros.jl") # @spawn and friends +include("logger.jl") include("workerpool.jl") include("pmap.jl") include("managers.jl") # LocalManager and SSHManager diff --git a/src/logger.jl b/src/logger.jl new file mode 100644 index 0000000..3673cd4 --- /dev/null +++ b/src/logger.jl @@ -0,0 +1,27 @@ +""" + RemoteLogger(pid=1, min_level=Info) + +Logger that forwards all logging to worker `pid` via `remote_do` along with +adding the current worker `id` as a `pid` kwarg. +""" +struct RemoteLogger <: Logging.AbstractLogger + pid::Int + min_level::Logging.LogLevel +end +function RemoteLogger(pid=1) + RemoteLogger(pid, Logging.Info) +end + +Logging.min_enabled_level(logger::RemoteLogger) = logger.min_level +Logging.shouldlog(logger::RemoteLogger, level, _module, group, id) = true + +# TODO: probably should live in base/logging.jl? +function logmsg(level::Logging.LogLevel, message, _module, _group, _id, _file, _line; kwargs...) + Logging.@logmsg level message _module = _module _group = _group _id = _id _file = _file _line = _line kwargs... +end + +function Logging.handle_message(logger::RemoteLogger, level::Logging.LogLevel, message, _module, _group, _id, + _file, _line; kwargs...) + @nospecialize + remote_do(logmsg, logger.pid, level, message, _module, _group, _id, _file, _line; pid=myid(), kwargs...) +end diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 71b6b36..159c5d3 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1,6 +1,6 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license -using Test, Distributed, Random, Serialization, Sockets +using Test, Distributed, Random, Serialization, Sockets, Logging import Distributed: launch, manage sharedir = normpath(joinpath(Sys.BINDIR, "..", "share")) @@ -1956,6 +1956,21 @@ begin end end +# test logging +w = only(addprocs(1)) +@everywhere using Logging +@test_logs (:info, "from pid $w") begin + prev_logger = global_logger(current_logger()) + try + wait(@spawnat w with_logger(RemoteLogger(1)) do + @info("from pid $(myid())") + end) + finally + global_logger(prev_logger) + end +end +wait(rmprocs([w])) + # Run topology tests last after removing all workers, since a given # cluster at any time only supports a single topology. nprocs() > 1 && rmprocs(workers())