--- a/src/parallelism.rs Thu Mar 19 18:21:40 2026 -0500 +++ b/src/parallelism.rs Wed Apr 22 23:41:59 2026 -0500 @@ -7,24 +7,24 @@ For actually spawning scoped tasks in a thread pool, it currently uses [`rayon`]. */ -use std::sync::Once; +pub use rayon::{Scope, ThreadPool, ThreadPoolBuilder}; use std::num::NonZeroUsize; -use std::thread::available_parallelism; -pub use rayon::{Scope, ThreadPoolBuilder, ThreadPool}; use std::sync::atomic::{ AtomicUsize, - Ordering::{Release, Relaxed}, + Ordering::{Relaxed, Release}, }; +use std::sync::Once; +use std::thread::available_parallelism; #[cfg(feature = "use_custom_thread_pool")] type Pool = ThreadPool; #[cfg(not(feature = "use_custom_thread_pool"))] type Pool = GlobalPool; -const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; -static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); -static mut N_THREADS : NonZeroUsize = ONE; -static mut POOL : Option<Pool> = None; +const ONE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; +static mut TASK_OVERBUDGETING: AtomicUsize = AtomicUsize::new(1); +static mut N_THREADS: NonZeroUsize = ONE; +static mut POOL: Option<Pool> = None; static INIT: Once = Once::new(); #[cfg(not(feature = "use_custom_thread_pool"))] @@ -38,7 +38,8 @@ pub fn scope<'scope, OP, R>(&self, op: OP) -> R where OP: FnOnce(&rayon::Scope<'scope>) -> R + Send, - R: Send { + R: Send, + { rayon::scope(op) } } @@ -51,17 +52,22 @@ /// /// This routine can only be called once. /// It also calls [`set_task_overbudgeting`] with $m = (n + 1) / 2$. -pub fn set_num_threads(n : NonZeroUsize) { +pub fn set_num_threads(n: NonZeroUsize) { INIT.call_once(|| unsafe { N_THREADS = n; let n = n.get(); set_task_overbudgeting((n + 1) / 2); POOL = if n > 1 { - #[cfg(feature = "use_custom_thread_pool")] { + #[cfg(feature = "use_custom_thread_pool")] + { Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) } - #[cfg(not(feature = "use_custom_thread_pool"))] { - ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); + #[cfg(not(feature = "use_custom_thread_pool"))] + { + ThreadPoolBuilder::new() + .num_threads(n) + .build_global() + .unwrap(); Some(GlobalPool) } } else { @@ -74,20 +80,21 @@ /// /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where /// $n$ is the number of threads. -pub fn set_task_overbudgeting(m : usize) { +pub fn set_task_overbudgeting(m: usize) { #[allow(static_mut_refs)] - unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } + unsafe { + TASK_OVERBUDGETING.store(m, Relaxed) + } } /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. /// /// This routine can only be called once. -pub fn set_max_threads(n : NonZeroUsize) { +pub fn set_max_threads(n: NonZeroUsize) { let available = available_parallelism().unwrap_or(ONE); set_num_threads(available.min(n)); } - /// Get the number of threads pub fn num_threads() -> NonZeroUsize { unsafe { N_THREADS } @@ -99,7 +106,9 @@ /// The pool has [`num_threads`]` - 1` threads. pub fn thread_pool() -> Option<&'static Pool> { #[allow(static_mut_refs)] - unsafe { POOL.as_ref() } + unsafe { + POOL.as_ref() + } } /// Get the number of thread pool workers. @@ -119,25 +128,25 @@ /// Initial multi-threaded state MultiThreadedInitial { /// Thread budget counter - budget : AtomicUsize, + budget: AtomicUsize, /// Thread pool - pool : &'scheduler Pool, + pool: &'scheduler Pool, }, /// Nested multi-threaded state MultiThreadedZoom { /// Thread budget reference - budget : &'scope AtomicUsize, - scope : &'scheduler Scope<'scope>, - } + budget: &'scope AtomicUsize, + scope: &'scheduler Scope<'scope>, + }, } /// Task execution scope for [`TaskBudget`]. pub enum TaskBudgetScope<'scope, 'scheduler> { SingleThreaded, MultiThreaded { - budget : &'scope AtomicUsize, - scope : &'scheduler Scope<'scope>, - } + budget: &'scope AtomicUsize, + scope: &'scheduler Scope<'scope>, + }, } impl<'scope, 'b> TaskBudget<'scope, 'b> { @@ -146,7 +155,7 @@ /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` /// is `None`, the [global setting][set_task_overbudgeting] is used.ยง - pub fn init(overbudget : Option<usize>) -> Self { + pub fn init(overbudget: Option<usize>) -> Self { let n = num_threads().get(); #[allow(static_mut_refs)] let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); @@ -161,14 +170,18 @@ } /// Initialise single-threaded thread budgeting. - pub fn none() -> Self { Self::SingleThreaded } + pub fn none() -> Self { + Self::SingleThreaded + } } impl<'scope, 'scheduler> TaskBudget<'scope, 'scheduler> { /// Create a sub-scope for launching tasks - pub fn zoom<'smaller, F, R : Send>(&self, scheduler : F) -> R - where 'scope : 'smaller, - F : for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller { + pub fn zoom<'smaller, F, R: Send>(&self, scheduler: F) -> R + where + 'scope: 'smaller, + F: for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller, + { match self { &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded), &Self::MultiThreadedInitial { ref budget, pool } => { @@ -191,15 +204,16 @@ impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> { /// Queue a task or execute it in this thread if the thread budget is exhausted. - pub fn execute<F>(&self, job : F) - where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope { + pub fn execute<F>(&self, job: F) + where + F: for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope, + { match self { Self::SingleThreaded => job(TaskBudget::SingleThreaded), Self::MultiThreaded { scope, budget } => { - let spawn = budget.fetch_update(Release, - Relaxed, - |n| (n > 1).then_some(n - 1)) - .is_ok(); + let spawn = budget + .fetch_update(Release, Relaxed, |n| (n > 1).then_some(n - 1)) + .is_ok(); if spawn { scope.spawn(|scope| { let task_budget = TaskBudget::MultiThreadedZoom { scope, budget }; @@ -216,8 +230,10 @@ /// Runs `scheduler` with a [`TaskBudget`]. /// -/// This corresponds to calling `scheduler` with [`TaskBudget::init(None)`]. -pub fn with_task_budget<'scope, F, R>(scheduler : F) -> R -where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope { +/// This corresponds to calling `scheduler` with [`TaskBudget::init`]`(None)`. +pub fn with_task_budget<'scope, F, R>(scheduler: F) -> R +where + F: for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope, +{ scheduler(TaskBudget::init(None)) }