Files
julia/base/threadingconstructs.jl
Andy Dienes b5a9c2aa62 (mostly) NFC: make Base dogfood its own APIs more sometimes (#61713)
`Base` is, of course, allowed to depend on its own internals. but that
doesn't mean it always should. when possible I think we should prefer to
use public alternatives vs private when equivalents of each are
available.

includes an unintentional bugfix to `help?>` of any intrinsic, which on
master leads you to the docstring for `Core.IntrinsicFunction` itself.

candidates generated by claude, which I then audited
2026-05-05 18:01:32 -04:00

919 lines
32 KiB
Julia
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# This file is a part of Julia. License is MIT: https://julialang.org/license
export threadid, nthreads, @threads, @spawn,
threadpool, nthreadpools
public Condition, threadpoolsize, ngcthreads
"""
Threads.threadid([t::Task])::Int
Get the ID number of the current thread of execution, or the thread of task
`t`. The master thread has ID `1`.
# Examples
```julia-repl
julia> Threads.threadid()
1
julia> Threads.@threads for i in 1:4
println(Threads.threadid())
end
4
2
5
4
julia> Threads.threadid(Threads.@spawn "foo")
2
```
!!! note
The thread that a task runs on may change if the task yields, which is known as [`Task Migration`](@ref man-task-migration).
For this reason in most cases it is not safe to use `threadid([task])` to index into, say, a vector of buffers or stateful
objects.
"""
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)
# lower bound on the largest threadid()
"""
Threads.maxthreadid()::Int
Get a lower bound on the number of threads (across all thread pools) available
to the Julia process, with atomic-acquire semantics. The result will always be
greater than or equal to [`threadid()`](@ref) as well as `threadid(task)` for
any task you were able to observe before calling `maxthreadid`.
"""
maxthreadid() = Int(unsafe_load(cglobal(:jl_n_threads, Cint), :acquire))
"""
Threads.nthreads(:default | :interactive)::Int
Get the current number of threads within the specified thread pool. The threads in `:interactive`
have id numbers `1:nthreads(:interactive)`, and the threads in `:default` have id numbers in
`nthreads(:interactive) .+ (1:nthreads(:default))`.
See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref
man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed)
standard library and [`Threads.maxthreadid()`](@ref).
"""
nthreads(pool::Symbol) = threadpoolsize(pool)
function _nthreads_in_pool(tpid::Int8)
p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
return Int(unsafe_load(p, tpid + 1))
end
function _tpid_to_sym(tpid::Int8)
if tpid == 0
return :interactive
elseif tpid == 1
return :default
elseif tpid == -1
return :foreign
else
throw(ArgumentError(LazyString("Unrecognized threadpool id ", tpid)))
end
end
function _sym_to_tpid(tp::Symbol)
if tp === :interactive
return Int8(0)
elseif tp === :default
return Int8(1)
elseif tp == :foreign
return Int8(-1)
else
throw(ArgumentError(LazyString("Unrecognized threadpool name `", tp, "`")))
end
end
"""
Threads.threadpool(tid = threadid())::Symbol
Return the specified thread's threadpool; either `:default`, `:interactive`, or `:foreign`.
"""
function threadpool(tid = threadid())
tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
return _tpid_to_sym(tpid)
end
"""
Threads.threadpooldescription(tid = threadid())::String
Return the specified thread's threadpool name with extended description where appropriate.
"""
function threadpooldescription(tid = threadid())
threadpool_name = threadpool(tid)
if threadpool_name == :foreign
# TODO: extend tls to include a field to add a description to a foreign thread and make this more general
n_others = nthreads(:interactive) + nthreads(:default)
# Assumes GC threads come first in the foreign thread pool
if tid > n_others && tid <= n_others + ngcthreads()
return "foreign: gc"
end
end
return string(threadpool_name)
end
"""
Threads.nthreadpools()::Int
Return the number of threadpools currently configured.
"""
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
"""
Threads.threadpoolsize(pool::Symbol = :default)::Int
Get the number of threads available to the default thread pool (or to the
specified thread pool).
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
[`Distributed`](@ref man-distributed) standard library.
"""
function threadpoolsize(pool::Symbol = :default)
if pool === :default || pool === :interactive
tpid = _sym_to_tpid(pool)
elseif pool == :foreign
error("Threadpool size of `:foreign` is indeterminant")
else
error("invalid threadpool specified")
end
return _nthreads_in_pool(tpid)
end
"""
threadpooltids(pool::Symbol)
Return a vector of IDs of threads in the given pool.
"""
function threadpooltids(pool::Symbol)
ni = _nthreads_in_pool(Int8(0))
if pool === :interactive
return collect(1:ni)
elseif pool === :default
return collect(ni+1:ni+_nthreads_in_pool(Int8(1)))
else
error("invalid threadpool specified")
end
end
"""
Threads.ngcthreads()::Int
Return the number of GC threads currently configured.
This includes both mark threads and concurrent sweep threads.
"""
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1
function threading_run(fun, static)
ccall(:jl_enter_threaded_region, Cvoid, ())
n = threadpoolsize()
tid_offset = threadpoolsize(:interactive)
tasks = Vector{Task}(undef, n)
try
for i = 1:n
t = Task(() -> fun(i)) # pass in tid
t.sticky = static
if static
ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid_offset + i-1)
else
# TODO: this should be the current pool (except interactive) if there
# are ever more than two pools.
_result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, _sym_to_tpid(:default))
@assert _result == 1 "_result != 1"
end
tasks[i] = t
schedule(t)
end
for i = 1:n
Base._wait(tasks[i])
end
finally
ccall(:jl_exit_threaded_region, Cvoid, ())
end
failed_tasks = filter!(istaskfailed, tasks)
if !isempty(failed_tasks)
throw(CompositeException(map(TaskFailedException, failed_tasks)))
end
end
# Helper to generate threading run code with schedule checking
function _threading_run_expr(schedule)
quote
if $(schedule === :greedy || schedule === :dynamic || schedule === :default)
threading_run(threadsfor_fun, false)
elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
error("`@threads :static` cannot be used concurrently or nested")
else # :static
threading_run(threadsfor_fun, true)
end
end
end
function _threadsfor(iter, lbody, schedule)
lidx = iter.args[1] # index
range = iter.args[2]
esc_range = esc(range)
func = if schedule === :greedy
greedy_func(esc_range, lidx, lbody)
else
default_func(esc_range, lidx, lbody)
end
quote
local threadsfor_fun
$func
$(_threading_run_expr(schedule))
nothing
end
end
function _threadsfor_multi_iterator(body, iterators, condition, schedule, dims, result_type)
vars = [iter.args[1] for iter in iterators]
ranges = [iter.args[2] for iter in iterators]
tuple_var = gensym("iter_tuple")
assignments = [:($(vars[i]) = $(tuple_var)[$i]) for i in 1:length(vars)]
# Use let blocks so destructured variables are local to each iteration,
# avoiding data races when multiple threads execute the body concurrently.
new_body = Expr(:let, Expr(:block, assignments...), body)
new_condition = if condition === true
true
else
Expr(:let, Expr(:block, assignments...), condition)
end
product_expr = :(Iterators.product($(ranges...)))
synthetic_iter = :($(tuple_var) = $(product_expr))
return _threadsfor_single_iterator(new_body, synthetic_iter, new_condition, schedule, dims; result_type)
end
function _threadsfor_comprehension(gen::Expr, schedule, result_type=nothing)
@assert gen.head === :generator
body = gen.args[1]
# Check if the second arg is a filter (handles both single and multi-loop with filters)
iter_or_filter = gen.args[2]
if isa(iter_or_filter, Expr) && iter_or_filter.head === :filter
condition = iter_or_filter.args[1]
iterators = iter_or_filter.args[2:end]
if length(iterators) == 1
return _threadsfor_single_iterator(body, iterators[1], condition, schedule; result_type)
else
return _threadsfor_multi_iterator(body, iterators, condition, schedule, nothing, result_type)
end
elseif length(gen.args) > 2
iterators = gen.args[2:end]
ranges = [iter.args[2] for iter in iterators]
# Use axes to preserve offset index spaces (e.g. OffsetArrays)
dims_expr = :(tuple($([:(axes($(esc(r)), 1)) for r in ranges]...)))
return _threadsfor_multi_iterator(body, iterators, true, schedule, dims_expr, result_type)
else
return _threadsfor_single_iterator(body, iter_or_filter, true, schedule; result_type)
end
end
function _threadsfor_single_iterator(body, iterator, condition, schedule, dims=nothing; result_type=nothing)
lidx = iterator.args[1]
range = iterator.args[2]
esc_range = esc(range)
esc_lidx = esc(lidx)
esc_body = esc(body)
esc_condition = condition === true ? true : esc(condition)
# Fast path: no filter and not greedy — pre-allocate and write directly
if condition === true && schedule !== :greedy
return _threadsfor_comprehension_fast(esc_range, esc_lidx, esc_body, schedule, dims, result_type)
end
func = if schedule === :greedy
greedy_comprehension_func(esc_range, esc_lidx, esc_body, esc_condition)
else
default_comprehension_func(esc_range, esc_lidx, esc_body, esc_condition, result_type)
end
result_expr = if schedule === :greedy
# Greedy: collect values in arrival order (no ordering guarantee)
if result_type !== nothing
esc_result_type = esc(result_type)
quote
close(result_channel)
vals = collect(result_channel)
isempty(vals) ? $esc_result_type[] : $esc_result_type[v for v in vals]
end
else
quote
close(result_channel)
collect(result_channel)
end
end
else
# Default/static: thread-local buffers, vcat in tid order preserves iteration order.
# For the untyped case, we try vcat first (fast bulk copies). If the buffers have
# a Union or Any element type — indicating a type-unstable body — we fall back to
# grow_to! which replicates serial's promote_typejoin widening.
if result_type !== nothing
esc_result_type = esc(result_type)
quote
vcat(result_channel...)::Vector{$esc_result_type}
end
else
quote
let _bufs = result_channel
_ET = eltype(eltype(_bufs))
if isconcretetype(_ET)
# All buffers have a concrete element type (from promote_op inference
# or a uniform body). Use bulk vcat — same result as grow_to! here.
vcat(_bufs...)
else
# Type-unstable body: grow_to! discovers the correct widened type,
# matching the return type of the equivalent serial comprehension.
Base.grow_to!(Any[], Iterators.flatten(_bufs))
end
end
end
end
end
# If dims is provided, reshape the result to match original comprehension dimensions
if dims !== nothing
result_expr = quote
let flat_result = $result_expr
reshape(flat_result, $(dims))
end
end
end
quote
local threadsfor_fun
local result_channel = $func
$(_threading_run_expr(schedule))
$result_expr
end
end
# Fast path for non-filtered, non-greedy comprehensions: pre-allocate and write directly.
# Non-AbstractArray iterators (e.g. Iterators.flatten) are collected into a Vector
# because the parallel work distribution indexes into items with r[i].
# AbstractArrays and Tuples are used directly to preserve their index space (e.g. OffsetArrays).
function _threadsfor_comprehension_fast(esc_range, esc_lidx, esc_body, schedule, dims, result_type)
work_dist = _work_distribution_code()
wrap_final = dims !== nothing ? (x -> :(reshape($x, $dims))) : identity
if result_type !== nothing
# Typed path: pre-allocate with known element type
esc_result_type = esc(result_type)
return quote
let iter = $esc_range
local items = iter isa Union{Tuple, AbstractArray} ? iter : collect(iter)
local niter = length(items)
local result = similar(Vector{$esc_result_type}, axes(items))
if niter > 0
let items = items, result = result
local threadsfor_fun
function threadsfor_fun(tid = 1)
# Reads: items, tid. Defines: r, loop_first, loop_last.
$work_dist
for i = loop_first:loop_last
local $esc_lidx = @inbounds r[i]
@inbounds result[i] = $esc_body
end
end
$(_threading_run_expr(schedule))
end
end
$(wrap_final(:(result)))
end
end
else
# Untyped path: evaluate first element to determine result type,
# then fill in parallel with the body expression inlined directly
# in the closure. This avoids boxing that occurs when calling a
# lambda whose return type is a Union across closure boundaries.
return quote
let iter = $esc_range
local items = iter isa Union{Tuple, AbstractArray} ? iter : collect(iter)
local niter = length(items)
if niter == 0
$(wrap_final(:(similar(Vector{Any}, axes(items)))))
else
local _skip = firstindex(items)
local $esc_lidx = @inbounds items[_skip]
local _probe_val = $esc_body
local result = similar(Vector{typeof(_probe_val)}, axes(items))
@inbounds result[_skip] = _probe_val
if niter > 1
local _npool = threadpoolsize()
local _widen_buffers = [Pair{Int,Any}[] for _ in 1:_npool]
let items = items, result = result, _widen_buffers = _widen_buffers,
_skip = _skip
local threadsfor_fun
function threadsfor_fun(tid = 1)
# Reads: items, tid. Defines: r, loop_first, loop_last.
$work_dist
local _T = eltype(result)
local _my_widen = _widen_buffers[tid]
for i = loop_first:loop_last
i == _skip && continue
local $esc_lidx = @inbounds r[i]
local _val = $esc_body
if _val isa _T
@inbounds result[i] = _val
else
push!(_my_widen, i => _val)
end
end
end
$(_threading_run_expr(schedule))
end
result = Base.setindices_widen_up_to(result, _widen_buffers)
end
$(wrap_final(:(result)))
end
end
end
end
end
function greedy_func(itr, lidx, lbody)
quote
let c = Channel{eltype($itr)}(threadpoolsize(), spawn=true) do ch
for item in $itr
put!(ch, item)
end
end
function threadsfor_fun(tid)
for item in c
local $(esc(lidx)) = item
$(esc(lbody))
end
end
end
end
end
function greedy_comprehension_func(itr, esc_lidx, esc_body, esc_condition)
quote
let c = Channel{eltype($itr)}(threadpoolsize(), spawn=true) do ch
for item in $itr
put!(ch, item)
end
end
result_channel = Channel{Any}(Inf)
function threadsfor_fun(tid)
for item in c
local $esc_lidx = item
if $esc_condition
put!(result_channel, $esc_body)
end
end
end
result_channel
end
end
end
# Helper function to generate work distribution code
function _work_distribution_code()
quote
r = items # Load into local variable
lenr = length(r)
# divide loop iterations among threads
len, rem = divrem(lenr, threadpoolsize())
# not enough iterations for all the threads?
if len == 0
if tid > rem
return
end
len, rem = 1, 0
end
# compute this thread's iterations
loop_first = firstindex(r) + ((tid-1) * len)
loop_last = loop_first + len - 1
# distribute remaining iterations evenly
if rem > 0
if tid <= rem
loop_first = loop_first + (tid-1)
loop_last = loop_last + tid
else
loop_first = loop_first + rem
loop_last = loop_last + rem
end
end
end
end
function default_func(itr, lidx, lbody)
work_dist = _work_distribution_code()
quote
let items = $itr
function threadsfor_fun(tid = 1)
# Reads: items, tid. Defines: r, loop_first, loop_last.
$work_dist
for i = loop_first:loop_last
local $(esc(lidx)) = @inbounds r[i]
$(esc(lbody))
end
end
end
end
end
function default_comprehension_func(itr, esc_lidx, esc_body, esc_condition, result_type=nothing)
work_dist = _work_distribution_code()
if result_type !== nothing
# Typed comprehension: element type known at macro expansion time.
buf_init = :($(esc(result_type))[])
buf_type_setup = :()
else
# Untyped comprehension: use promote_op to pre-type the per-task buffers,
# avoiding boxing in the parallel phase for type-stable bodies.
# The result is flattened through grow_to! so the final element type matches
# serial's runtime promote_typejoin widening rather than the static promote_op type.
_ET = gensym(:ET)
buf_type_setup = :(local $_ET = Base.promote_op($esc_lidx -> $esc_body, eltype(items)))
buf_init = :($_ET[])
end
quote
let iter = $itr
local items = iter isa Union{Tuple, AbstractArray} ? iter : collect(iter)
local _npool = threadpoolsize()
$buf_type_setup
# One buffer per task-id; tasks process contiguous ranges so concatenating
# in tid order preserves iteration order without a sort step.
local local_bufs = [$buf_init for _ in 1:_npool]
function threadsfor_fun(tid = 1)
# Reads: items, tid. Defines: r, loop_first, loop_last.
$work_dist
local buf = local_bufs[tid]
for i = loop_first:loop_last
local $esc_lidx = @inbounds r[i]
if $esc_condition
push!(buf, $esc_body)
end
end
end
local_bufs # Return per-task buffers to be vcat'd after threading_run
end
end
end
"""
Threads.@threads [schedule] for ... end
Threads.@threads [schedule] [expr for ... end]
Threads.@threads [schedule] T[expr for ... end]
A macro to execute a `for` loop or array comprehension in parallel. The iteration space is distributed to
coarse-grained tasks. This policy can be specified by the `schedule` argument. The
execution of the loop waits for the evaluation of all iterations.
For `for` loops, the macro executes the loop body in parallel but does not return a value.
For array comprehensions, the macro executes the comprehension in parallel and returns
the collected results as an array.
Tasks spawned by `@threads` are scheduled on the `:default` threadpool. This means that
`@threads` will not use threads from the `:interactive` threadpool, even if called from
the main thread or from a task in the interactive pool. The `:default` threadpool is
intended for compute-intensive parallel workloads.
See also: [`@spawn`](@ref Threads.@spawn) and
`pmap` in [`Distributed`](@ref man-distributed).
For more information on threadpools, see the chapter on [threadpools](@ref man-threadpools).
# Extended help
## Semantics
Unless stronger guarantees are specified by the scheduling option, the loop executed by
`@threads` macro have the following semantics.
The `@threads` macro executes the loop body in an unspecified order and potentially
concurrently. It does not specify the exact assignments of the tasks and the worker threads.
The assignments can be different for each execution. The loop body code (including any code
transitively called from it) must not make any assumptions about the distribution of
iterations to tasks or the worker thread in which they are executed. The loop body for each
iteration must be able to make forward progress independent of other iterations and be free
from data races. As such, invalid synchronizations across iterations may deadlock while
unsynchronized memory accesses may result in undefined behavior.
For example, the above conditions imply that:
- A lock taken in an iteration *must* be released within the same iteration.
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
- Write only to locations not shared across iterations (unless a lock or atomic operation is
used).
- Unless the `:static` schedule is used, the value of [`threadid()`](@ref Threads.threadid)
may change even within a single iteration. See [`Task Migration`](@ref man-task-migration).
## Schedulers
Without the scheduler argument, the exact scheduling is unspecified and varies across Julia
releases. Currently, `:dynamic` is used when the scheduler is not specified.
!!! compat "Julia 1.5"
The `schedule` argument is available as of Julia 1.5.
### `:dynamic` (default)
`:dynamic` scheduler executes iterations dynamically to available worker threads. Current
implementation assumes that the workload for each iteration is uniform. However, this
assumption may be removed in the future.
This scheduling option is merely a hint to the underlying execution mechanism. However, a
few properties can be expected. The number of `Task`s used by `:dynamic` scheduler is
bounded by a small constant multiple of the number of available worker threads
([`Threads.threadpoolsize()`](@ref)). Each task processes contiguous regions of the
iteration space. Thus, `@threads :dynamic for x in xs; f(x); end` is typically more
efficient than `@sync for x in xs; @spawn f(x); end` if `length(xs)` is significantly
larger than the number of the worker threads and the run-time of `f(x)` is relatively
smaller than the cost of spawning and synchronizing a task (typically less than 10
microseconds).
!!! compat "Julia 1.8"
The `:dynamic` option for the `schedule` argument is available and the default as of Julia 1.8.
### `:greedy`
`:greedy` scheduler spawns up to [`Threads.threadpoolsize()`](@ref) tasks, each greedily working on
the given iterated values as they are produced. As soon as one task finishes its work, it takes
the next value from the iterator. Work done by any individual task is not necessarily on
contiguous values from the iterator. The given iterator may produce values forever, only the
iterator interface is required (no indexing).
This scheduling option is generally a good choice if the workload of individual iterations
is not uniform/has a large spread.
!!! compat "Julia 1.11"
The `:greedy` option for the `schedule` argument is available as of Julia 1.11.
### `:static`
`:static` scheduler creates one task per thread and divides the iterations equally among
them, assigning each task specifically to each thread. In particular, the value of
[`threadid()`](@ref Threads.threadid) is guaranteed to be constant within one iteration.
Specifying `:static` is an error if used from inside another `@threads` loop or from a
thread other than 1.
!!! note
`:static` scheduling exists for supporting transition of code written before Julia 1.3.
In newly written library functions, `:static` scheduling is discouraged because the
functions using this option cannot be called from arbitrary worker threads.
## Examples
### For loops
To illustrate of the different scheduling strategies, consider the following function
`busywait` containing a non-yielding timed loop that runs for a given number of seconds.
```julia-repl
julia> function busywait(seconds)
tstart = time_ns()
while (time_ns() - tstart) / 1e9 < seconds
end
end
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
```
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
to run two of the 1-second iterations to complete the for loop.
### Array comprehensions
The `@threads` macro also supports array comprehensions, which return the collected results.
Array comprehensions preserve element order for `:static` and `:dynamic` (default) scheduling.
The `:greedy` scheduler does not guarantee element order, since tasks consume work items as
they become available. Multi-dimensional comprehensions preserve the dimensions of the
original comprehension (e.g., `[f(i,j) for i in 1:n, j in 1:m]` returns an `n×m` matrix).
Typed comprehensions (`T[expr for ...]`) are also supported and return an array with the
specified element type.
For non-filtered comprehensions with non-`:greedy` scheduling, a fast path is used that
pre-allocates the result array and writes directly by index, avoiding Channel overhead.
!!! tip "Performance tip"
For best performance, use typed comprehensions (`T[expr for ...]`) when the element type
is known. Untyped comprehensions infer the type from the first result; if later results
have incompatible types, a type-widening path is used which may be slower.
!!! warning
The body expression of a threaded comprehension may execute on any thread and may
migrate between threads. Do not rely on [`threadid()`](@ref Threads.threadid) or
[`task_local_storage()`](@ref) returning consistent values within the body expression.
```julia-repl
julia> Threads.@threads [i^2 for i in 1:5] # Simple comprehension
5-element Vector{Int64}:
1
4
9
16
25
julia> Threads.@threads [i^2 for i in 1:5 if iseven(i)] # Filtered comprehension
2-element Vector{Int64}:
4
16
julia> Threads.@threads [i + j for i in 1:3, j in 1:3] # Multiple loops
3×3 Matrix{Int64}:
2 3 4
3 4 5
4 5 6
julia> Threads.@threads Float64[i^2 for i in 1:5] # Typed comprehension
5-element Vector{Float64}:
1.0
4.0
9.0
16.0
25.0
```
When the iterator doesn't have a known length, such as a channel, the `:greedy` scheduling
option can be used.
```julia-repl
julia> c = Channel(5, spawn=true) do ch
foreach(i -> put!(ch, i), 1:5)
end;
julia> Threads.@threads :greedy [i^2 for i in c if iseven(i)]
2-element Vector{Int64}:
4
16
julia> # Non-indexable iterators are also supported
Threads.@threads [i for i in Iterators.flatten([1:3, 4:6])]
6-element Vector{Int64}:
1
2
3
4
5
6
```
"""
macro threads(args...)
na = length(args)
if na == 2
sched, ex = args
if sched isa QuoteNode
sched = sched.value
elseif sched isa Symbol
# for now only allow quoted symbols
sched = nothing
end
if sched !== :static && sched !== :dynamic && sched !== :greedy
throw(ArgumentError("unsupported schedule argument in @threads"))
end
elseif na == 1
sched = :default
ex = args[1]
else
throw(ArgumentError("wrong number of arguments in @threads"))
end
if isa(ex, Expr) && (ex.head === :comprehension || ex.head === :typed_comprehension)
# Handle array comprehensions (typed and untyped)
if ex.head === :typed_comprehension
return _threadsfor_comprehension(ex.args[2], sched, ex.args[1])
else
return _threadsfor_comprehension(ex.args[1], sched)
end
elseif isa(ex, Expr) && ex.head === :for
# Handle for loops
if !(ex.args[1] isa Expr && ex.args[1].head === :(=))
throw(ArgumentError("nested outer loops are not currently supported by @threads"))
end
return _threadsfor(ex.args[1], ex.args[2], sched)
else
throw(ArgumentError("@threads requires a `for` loop or comprehension expression"))
end
end
function _spawn_set_thrpool(t::Task, tp::Symbol)
tpid = _sym_to_tpid(tp)
if tpid == -1 || _nthreads_in_pool(tpid) == 0
tpid = _sym_to_tpid(:default)
end
_result = ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), t, tpid)
@assert _result == 1 "_result != 1"
nothing
end
"""
Threads.@spawn [:default|:interactive|:samepool] expr
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
thread in the specified threadpool: `:default`, `:interactive`, or `:samepool`
to use the same as the caller. `:default` is used if unspecified. The task is
allocated to a thread once one becomes available. To wait for the task to
finish, call [`wait`](@ref) on the result of this macro, or call
[`fetch`](@ref) to wait and then obtain its return value.
Values can be interpolated into `@spawn` via `\$`, which copies the value
directly into the constructed underlying closure. This allows you to insert
the _value_ of a variable, isolating the asynchronous code from changes to
the variable's value in the current task.
!!! note
The thread that the task runs on may change if the task yields, therefore `threadid()` should not
be treated as constant for a task. See [`Task Migration`](@ref man-task-migration), and the broader
[multi-threading](@ref man-multithreading) manual for further important caveats.
See also the chapter on [threadpools](@ref man-threadpools).
!!! compat "Julia 1.3"
This macro is available as of Julia 1.3.
!!! compat "Julia 1.4"
Interpolating values via `\$` is available as of Julia 1.4.
!!! compat "Julia 1.9"
A threadpool may be specified as of Julia 1.9.
!!! compat "Julia 1.12"
The same threadpool may be specified as of Julia 1.12.
# Examples
```julia-repl
julia> t() = println("Hello from ", Threads.threadid());
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
```
"""
macro spawn(args...)
tp = QuoteNode(:default)
na = length(args)
if na == 2
ttype, ex = args
if ttype isa QuoteNode
ttype = ttype.value
if !in(ttype, (:interactive, :default, :samepool))
throw(ArgumentError(LazyString("unsupported threadpool in @spawn: ", ttype)))
end
tp = QuoteNode(ttype)
else
tp = ttype
end
elseif na == 1
ex = args[1]
else
throw(ArgumentError("wrong number of arguments in @spawn"))
end
letargs = Base._lift_one_interp!(ex)
thunk = Base.replace_linenums!(:(()->($(esc(ex)))), __source__)
var = esc(Base.sync_varname)
quote
let $(letargs...)
local task = Task($thunk)
task.sticky = false
local tp = $(esc(tp))
if tp == :samepool
tp = Threads.threadpool()
end
_spawn_set_thrpool(task, tp)
if $(Expr(:islocal, var))
put!($var, task)
end
schedule(task)
task
end
end
end
# This is a stub that can be overloaded for downstream structures like `Channel`
function foreach end
# Scheduling traits that can be employed for downstream overloads
abstract type AbstractSchedule end
struct StaticSchedule <: AbstractSchedule end
struct FairSchedule <: AbstractSchedule end