Skip to content

Commit 0df3fe7

Browse files
NHDalyJeffBezanson
authored andcommitted
Adds interpolation syntax to Threads.@spawn, to evaluate arguments immediately (#33119)
Adds $-interpolation syntax to `@async` and `Threads.@spawn`, to evaluate arguments immediately. Add the ability to evaluate some parts of a `@spawn`/`@async` immediately, in the current thread context. This prevents variables being "boxed" in order to capture them in the closure, exactly the same as wrapping them in a let-block locally. For example, `$x` expands like this: ```julia julia> @macroexpand @async $x + 2 quote #= task.jl:361 =# let var"##454" = x #= task.jl:362 =# local var"#9#task" = Base.Task((()->begin #= task.jl:358 =# var"##454" + 2 end)) #= task.jl:363 =# if $(Expr(:islocal, Symbol("##sync#95"))) #= task.jl:364 =# Base.push!(var"##sync#95", var"#9#task") end #= task.jl:366 =# Base.schedule(var"#9#task") #= task.jl:367 =# var"#9#task" end end ```
1 parent 5da74be commit 0df3fe7

File tree

4 files changed

+145
-11
lines changed

4 files changed

+145
-11
lines changed

NEWS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ Language changes
2929
Multi-threading changes
3030
-----------------------
3131

32+
* Values can now be interpolated into `@async` and `@spawn` via `$`, which copies the value directly into the constructed
33+
underlying closure. ([#33119])
3234

3335
Build system changes
3436
--------------------

base/task.jl

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,20 +344,58 @@ end
344344
@async
345345
346346
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
347+
348+
Values can be interpolated into `@async` via `\$`, which copies the value directly into the
349+
constructed underlying closure. This allows you to insert the _value_ of a variable,
350+
isolating the aysnchronous code from changes to the variable's value in the current task.
351+
352+
!!! compat "Julia 1.4"
353+
Interpolating values via `\$` is available as of Julia 1.4.
347354
"""
348355
macro async(expr)
356+
letargs = Base._lift_one_interp!(expr)
357+
349358
thunk = esc(:(()->($expr)))
350359
var = esc(sync_varname)
351360
quote
352-
local task = Task($thunk)
353-
if $(Expr(:islocal, var))
354-
push!($var, task)
361+
let $(letargs...)
362+
local task = Task($thunk)
363+
if $(Expr(:islocal, var))
364+
push!($var, task)
365+
end
366+
schedule(task)
367+
task
368+
end
369+
end
370+
end
371+
372+
# Capture interpolated variables in $() and move them to let-block
373+
function _lift_one_interp!(e)
374+
letargs = Any[] # store the new gensymed arguments
375+
_lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false)
376+
letargs
377+
end
378+
_lift_one_interp_helper(v, _, _) = v
379+
function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs)
380+
if expr.head == :$
381+
if in_quote_context # This $ is simply interpolating out of the quote
382+
# Now, we're out of the quote, so any _further_ $ is ours.
383+
in_quote_context = false
384+
else
385+
newarg = gensym()
386+
push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1]))))
387+
return newarg # Don't recurse into the lifted $() exprs
355388
end
356-
schedule(task)
357-
task
389+
elseif expr.head == :quote
390+
in_quote_context = true # Don't try to lift $ directly out of quotes
391+
end
392+
for (i,e) in enumerate(expr.args)
393+
expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs)
358394
end
395+
expr
359396
end
360397

398+
361399
# add a wait-able object to the sync pool
362400
macro sync_add(expr)
363401
var = esc(sync_varname)

base/threadingconstructs.jl

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,22 +107,33 @@ Create and run a [`Task`](@ref) on any available thread. To wait for the task to
107107
finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref)
108108
to wait and then obtain its return value.
109109
110+
Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the
111+
constructed underlying closure. This allows you to insert the _value_ of a variable,
112+
isolating the aysnchronous code from changes to the variable's value in the current task.
113+
110114
!!! note
111115
This feature is currently considered experimental.
112116
113117
!!! compat "Julia 1.3"
114118
This macro is available as of Julia 1.3.
119+
120+
!!! compat "Julia 1.4"
121+
Interpolating values via `\$` is available as of Julia 1.4.
115122
"""
116123
macro spawn(expr)
124+
letargs = Base._lift_one_interp!(expr)
125+
117126
thunk = esc(:(()->($expr)))
118127
var = esc(Base.sync_varname)
119128
quote
120-
local task = Task($thunk)
121-
task.sticky = false
122-
if $(Expr(:islocal, var))
123-
push!($var, task)
129+
let $(letargs...)
130+
local task = Task($thunk)
131+
task.sticky = false
132+
if $(Expr(:islocal, var))
133+
push!($var, task)
134+
end
135+
schedule(task)
136+
task
124137
end
125-
schedule(task)
126-
task
127138
end
128139
end

test/threads_exec.jl

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,3 +703,86 @@ catch ex
703703
@test ex isa LoadError
704704
@test ex.error isa ArgumentError
705705
end
706+
707+
@testset "@spawn interpolation" begin
708+
# Issue #30896: evaluating argumentss immediately
709+
begin
710+
outs = zeros(5)
711+
@sync begin
712+
local i = 1
713+
while i <= 5
714+
Threads.@spawn setindex!(outs, $i, $i)
715+
i += 1
716+
end
717+
end
718+
@test outs == 1:5
719+
end
720+
721+
# Args
722+
@test fetch(Threads.@spawn 2+$2) == 4
723+
@test fetch(Threads.@spawn Int($(2.0))) == 2
724+
a = 2
725+
@test fetch(Threads.@spawn *($a,$a)) == a^2
726+
# kwargs
727+
@test fetch(Threads.@spawn sort($([3 2; 1 0]), dims=2)) == [2 3; 0 1]
728+
@test fetch(Threads.@spawn sort([3 $2; 1 $0]; dims=$2)) == [2 3; 0 1]
729+
730+
# Supports multiple levels of interpolation
731+
@test fetch(Threads.@spawn "$($a)") == "$a"
732+
let a = 1
733+
# Interpolate the current value of `a` vs the value of `a` in the closure
734+
t = Threads.@spawn :(+($$a, $a, a))
735+
a = 2 # update `a` after spawning
736+
@test fetch(t) == Expr(:call, :+, 1, 2, :a)
737+
end
738+
739+
# Test the difference between different levels of interpolation
740+
let
741+
oneinterp = Vector{Any}(undef, 5)
742+
twointerps = Vector{Any}(undef, 5)
743+
@sync begin
744+
local i = 1
745+
while i <= 5
746+
Threads.@spawn setindex!(oneinterp, :($i), $i)
747+
Threads.@spawn setindex!(twointerps, :($($i)), $i)
748+
i += 1
749+
end
750+
end
751+
# The first definition _didn't_ escape i
752+
@test oneinterp == fill(6, 5)
753+
# The second definition _did_ escape i
754+
@test twointerps == 1:5
755+
end
756+
end
757+
758+
@testset "@async interpolation" begin
759+
# Args
760+
@test fetch(@async 2+$2) == 4
761+
@test fetch(@async Int($(2.0))) == 2
762+
a = 2
763+
@test fetch(@async *($a,$a)) == a^2
764+
# kwargs
765+
@test fetch(@async sort($([3 2; 1 0]), dims=2)) == [2 3; 0 1]
766+
@test fetch(@async sort([3 $2; 1 $0]; dims=$2)) == [2 3; 0 1]
767+
768+
# Supports multiple levels of interpolation
769+
@test fetch(@async :($a)) == a
770+
@test fetch(@async :($($a))) == a
771+
@test fetch(@async "$($a)") == "$a"
772+
end
773+
774+
# errors inside @threads
775+
function _atthreads_with_error(a, err)
776+
Threads.@threads for i in eachindex(a)
777+
if err
778+
error("failed")
779+
end
780+
a[i] = Threads.threadid()
781+
end
782+
a
783+
end
784+
@test_throws TaskFailedException _atthreads_with_error(zeros(nthreads()), true)
785+
let a = zeros(nthreads())
786+
_atthreads_with_error(a, false)
787+
@test a == [1:nthreads();]
788+
end

0 commit comments

Comments
 (0)