Tue, 06 Dec 2022 08:32:57 +0200
Fix broken links in doc comments after Mapping -> Apply change.
/*! Parallel computing helper routines. It mains the number of threads to use, a thread pool, and provides a budgeting tool to limit the number of threads spawned in recursive computations. For actually spawning scoped tasks in a thread pool, it currently uses [`rayon`]. */ use std::sync::Once; use std::num::NonZeroUsize; use std::thread::available_parallelism; pub use rayon::{Scope, ThreadPoolBuilder, ThreadPool}; use std::sync::atomic::{ AtomicUsize, Ordering::{Release, Relaxed}, }; #[cfg(use_custom_thread_pool)] type Pool = ThreadPool; #[cfg(not(use_custom_thread_pool))] type Pool = GlobalPool; const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); static mut N_THREADS : NonZeroUsize = ONE; static mut POOL : Option<Pool> = None; static INIT: Once = Once::new(); #[cfg(not(use_custom_thread_pool))] mod global_pool { /// This is a nicer way to use the global pool of [`rayon`]. pub struct GlobalPool; #[cfg(not(use_custom_thread_pool))] impl GlobalPool { #[inline] pub fn scope<'scope, OP, R>(&self, op: OP) -> R where OP: FnOnce(&rayon::Scope<'scope>) -> R + Send, R: Send { rayon::scope(op) } } } #[cfg(not(use_custom_thread_pool))] pub use global_pool::GlobalPool; /// Set the number of threads. /// /// This routine can only be called once. /// It also calls [`set_task_overbudgeting`] with $m = (n + 1) / 2$. pub fn set_num_threads(n : NonZeroUsize) { INIT.call_once(|| unsafe { N_THREADS = n; let n = n.get(); set_task_overbudgeting((n + 1) / 2); POOL = if n > 1 { #[cfg(use_custom_thread_pool)] { Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) } #[cfg(not(use_custom_thread_pool))] { ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); Some(GlobalPool) } } else { None } }); } /// Set overbudgeting count for [`TaskBudget`]. /// /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where /// $n$ is the number of threads. pub fn set_task_overbudgeting(m : usize) { unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } } /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. /// /// This routine can only be called once. pub fn set_max_threads(n : NonZeroUsize) { let available = available_parallelism().unwrap_or(ONE); set_num_threads(available.min(n)); } /// Get the number of threads pub fn num_threads() -> NonZeroUsize { unsafe { N_THREADS } } /// Get the thread pool. /// /// If the number of configured threads is less than 2, this is None. /// The pool has [`num_threads`]` - 1` threads. pub fn thread_pool() -> Option<&'static Pool> { unsafe { POOL.as_ref() } } /// Get the number of thread pool workers. /// /// This is [`num_threads`]` - 1`. pub fn thread_pool_size() -> usize { unsafe { N_THREADS }.get() - 1 } /// Thread budgeting tool. /// /// This allows limiting the number of tasks inserted into the queue in nested computations. /// Otherwise it wraps [`rayon`] when not single-threaded. pub enum TaskBudget<'scope, 'scheduler> { /// No threading performed. SingleThreaded, /// Initial multi-threaded state MultiThreadedInitial { /// Thread budget counter budget : AtomicUsize, /// Thread pool pool : &'scheduler Pool, }, /// Nested multi-threaded state MultiThreadedZoom { /// Thread budget reference budget : &'scope AtomicUsize, scope : &'scheduler Scope<'scope>, } } /// Task execution scope for [`TaskBudget`]. pub enum TaskBudgetScope<'scope, 'scheduler> { SingleThreaded, MultiThreaded { budget : &'scope AtomicUsize, scope : &'scheduler Scope<'scope>, } } impl<'scope, 'b> TaskBudget<'scope, 'b> { /// Initialise a task budget in the main thread pool. /// /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` /// is `None`, the [global setting][set_task_overbudgeting] is used.ยง pub fn init(overbudget : Option<usize>) -> Self { let n = num_threads().get(); let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); if n <= 1 { Self::SingleThreaded } else if let Some(pool) = thread_pool() { let budget = AtomicUsize::new(n + m); Self::MultiThreadedInitial { budget, pool } } else { Self::SingleThreaded } } /// Initialise single-threaded thread budgeting. pub fn none() -> Self { Self::SingleThreaded } } impl<'scope, 'scheduler> TaskBudget<'scope, 'scheduler> { /// Create a sub-scope for launching tasks pub fn zoom<'smaller, F, R : Send>(&self, scheduler : F) -> R where 'scope : 'smaller, F : for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller { match self { &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded), &Self::MultiThreadedInitial { ref budget, pool } => { let budget_ref = unsafe { // Safe: scheduler is dropped when 'smaller becomes invalid. std::mem::transmute::<&AtomicUsize, &'smaller AtomicUsize>(budget) }; pool.scope(move |scope| { scheduler(TaskBudgetScope::MultiThreaded { scope, budget: budget_ref }) }) }, &Self::MultiThreadedZoom { budget, .. /* scope */ } => { rayon::scope(move |scope| { scheduler(TaskBudgetScope::MultiThreaded { scope, budget }) }) }, } } } impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> { /// Queue a task or execute it in this thread if the thread budget is exhausted. pub fn execute<F>(&self, job : F) where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope { match self { Self::SingleThreaded => job(TaskBudget::SingleThreaded), Self::MultiThreaded { scope, budget } => { let spawn = budget.fetch_update(Release, Relaxed, |n| (n > 1).then_some(n - 1)) .is_ok(); if spawn { scope.spawn(|scope| { let task_budget = TaskBudget::MultiThreadedZoom { scope, budget }; job(task_budget); budget.fetch_add(1, Release); }) } else { job(TaskBudget::MultiThreadedZoom { scope, budget }) } } } } } /// Runs `scheduler` with a [`TaskBudget`]. /// /// This corresponds to calling `scheduler` with [`TaskBudget::init(None)`]. pub fn with_task_budget<'scope, F, R>(scheduler : F) -> R where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope { scheduler(TaskBudget::init(None)) }