src/parallelism.rs

branch
dev
changeset 197
1f301affeae3
parent 45
ad1f3705c3fc
--- a/src/parallelism.rs	Thu Mar 19 18:21:40 2026 -0500
+++ b/src/parallelism.rs	Wed Apr 22 23:41:59 2026 -0500
@@ -7,24 +7,24 @@
 For actually spawning scoped tasks in a thread pool, it currently uses [`rayon`].
 */
 
-use std::sync::Once;
+pub use rayon::{Scope, ThreadPool, ThreadPoolBuilder};
 use std::num::NonZeroUsize;
-use std::thread::available_parallelism;
-pub use rayon::{Scope, ThreadPoolBuilder, ThreadPool};
 use std::sync::atomic::{
     AtomicUsize,
-    Ordering::{Release, Relaxed},
+    Ordering::{Relaxed, Release},
 };
+use std::sync::Once;
+use std::thread::available_parallelism;
 
 #[cfg(feature = "use_custom_thread_pool")]
 type Pool = ThreadPool;
 #[cfg(not(feature = "use_custom_thread_pool"))]
 type Pool = GlobalPool;
 
-const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
-static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1);
-static mut N_THREADS : NonZeroUsize = ONE;
-static mut POOL : Option<Pool> = None;
+const ONE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
+static mut TASK_OVERBUDGETING: AtomicUsize = AtomicUsize::new(1);
+static mut N_THREADS: NonZeroUsize = ONE;
+static mut POOL: Option<Pool> = None;
 static INIT: Once = Once::new();
 
 #[cfg(not(feature = "use_custom_thread_pool"))]
@@ -38,7 +38,8 @@
         pub fn scope<'scope, OP, R>(&self, op: OP) -> R
         where
             OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
-            R: Send {
+            R: Send,
+        {
             rayon::scope(op)
         }
     }
@@ -51,17 +52,22 @@
 ///
 /// This routine can only be called once.
 /// It also calls [`set_task_overbudgeting`] with $m = (n + 1) / 2$.
-pub fn set_num_threads(n : NonZeroUsize) {
+pub fn set_num_threads(n: NonZeroUsize) {
     INIT.call_once(|| unsafe {
         N_THREADS = n;
         let n = n.get();
         set_task_overbudgeting((n + 1) / 2);
         POOL = if n > 1 {
-            #[cfg(feature = "use_custom_thread_pool")] {
+            #[cfg(feature = "use_custom_thread_pool")]
+            {
                 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap())
             }
-            #[cfg(not(feature = "use_custom_thread_pool"))] {
-                ThreadPoolBuilder::new().num_threads(n).build_global().unwrap();
+            #[cfg(not(feature = "use_custom_thread_pool"))]
+            {
+                ThreadPoolBuilder::new()
+                    .num_threads(n)
+                    .build_global()
+                    .unwrap();
                 Some(GlobalPool)
             }
         } else {
@@ -74,20 +80,21 @@
 ///
 /// The initial value is 1. Calling [`set_num_threads`] sets this to  $m = (n + 1) / 2$, where
 /// $n$ is the number of threads.
-pub fn set_task_overbudgeting(m : usize) {
+pub fn set_task_overbudgeting(m: usize) {
     #[allow(static_mut_refs)]
-    unsafe { TASK_OVERBUDGETING.store(m, Relaxed) }
+    unsafe {
+        TASK_OVERBUDGETING.store(m, Relaxed)
+    }
 }
 
 /// Set the number of threads to the minimum of `n` and [`available_parallelism`].
 ///
 /// This routine can only be called once.
-pub fn set_max_threads(n : NonZeroUsize) {
+pub fn set_max_threads(n: NonZeroUsize) {
     let available = available_parallelism().unwrap_or(ONE);
     set_num_threads(available.min(n));
 }
 
-
 /// Get the number of threads
 pub fn num_threads() -> NonZeroUsize {
     unsafe { N_THREADS }
@@ -99,7 +106,9 @@
 /// The pool has [`num_threads`]` - 1` threads.
 pub fn thread_pool() -> Option<&'static Pool> {
     #[allow(static_mut_refs)]
-    unsafe { POOL.as_ref() }
+    unsafe {
+        POOL.as_ref()
+    }
 }
 
 /// Get the number of thread pool workers.
@@ -119,25 +128,25 @@
     /// Initial multi-threaded state
     MultiThreadedInitial {
         /// Thread budget counter
-        budget : AtomicUsize,
+        budget: AtomicUsize,
         /// Thread pool
-        pool : &'scheduler Pool,
+        pool: &'scheduler Pool,
     },
     /// Nested multi-threaded state
     MultiThreadedZoom {
         /// Thread budget reference
-        budget : &'scope AtomicUsize,
-        scope : &'scheduler Scope<'scope>,
-    }
+        budget: &'scope AtomicUsize,
+        scope: &'scheduler Scope<'scope>,
+    },
 }
 
 /// Task execution scope for [`TaskBudget`].
 pub enum TaskBudgetScope<'scope, 'scheduler> {
     SingleThreaded,
     MultiThreaded {
-        budget : &'scope AtomicUsize,
-        scope : &'scheduler Scope<'scope>,
-    }
+        budget: &'scope AtomicUsize,
+        scope: &'scheduler Scope<'scope>,
+    },
 }
 
 impl<'scope, 'b> TaskBudget<'scope, 'b> {
@@ -146,7 +155,7 @@
     /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom]
     /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget`
     /// is `None`, the [global setting][set_task_overbudgeting] is used.ยง
-    pub fn init(overbudget : Option<usize>) -> Self {
+    pub fn init(overbudget: Option<usize>) -> Self {
         let n = num_threads().get();
         #[allow(static_mut_refs)]
         let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) });
@@ -161,14 +170,18 @@
     }
 
     /// Initialise single-threaded thread budgeting.
-    pub fn none() -> Self { Self::SingleThreaded }
+    pub fn none() -> Self {
+        Self::SingleThreaded
+    }
 }
 
 impl<'scope, 'scheduler> TaskBudget<'scope, 'scheduler> {
     /// Create a sub-scope for launching tasks
-    pub fn zoom<'smaller, F, R : Send>(&self, scheduler : F) -> R
-    where 'scope : 'smaller,
-          F : for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller {
+    pub fn zoom<'smaller, F, R: Send>(&self, scheduler: F) -> R
+    where
+        'scope: 'smaller,
+        F: for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller,
+    {
         match self {
             &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded),
             &Self::MultiThreadedInitial { ref budget, pool } => {
@@ -191,15 +204,16 @@
 
 impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> {
     /// Queue a task or execute it in this thread if the thread budget is exhausted.
-    pub fn execute<F>(&self, job : F)
-    where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope {
+    pub fn execute<F>(&self, job: F)
+    where
+        F: for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope,
+    {
         match self {
             Self::SingleThreaded => job(TaskBudget::SingleThreaded),
             Self::MultiThreaded { scope, budget } => {
-                let spawn = budget.fetch_update(Release,
-                                                Relaxed,
-                                                |n| (n > 1).then_some(n - 1))
-                                  .is_ok();
+                let spawn = budget
+                    .fetch_update(Release, Relaxed, |n| (n > 1).then_some(n - 1))
+                    .is_ok();
                 if spawn {
                     scope.spawn(|scope| {
                         let task_budget = TaskBudget::MultiThreadedZoom { scope, budget };
@@ -216,8 +230,10 @@
 
 /// Runs `scheduler` with a [`TaskBudget`].
 ///
-/// This corresponds to calling `scheduler` with [`TaskBudget::init(None)`].
-pub fn with_task_budget<'scope, F, R>(scheduler : F) -> R
-where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope {
+/// This corresponds to calling `scheduler` with [`TaskBudget::init`]`(None)`.
+pub fn with_task_budget<'scope, F, R>(scheduler: F) -> R
+where
+    F: for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope,
+{
     scheduler(TaskBudget::init(None))
 }

mercurial