src/parallelism.rs

Tue, 20 Feb 2024 12:33:16 -0500

author
Tuomo Valkonen <tuomov@iki.fi>
date
Tue, 20 Feb 2024 12:33:16 -0500
changeset 25
d14c877e14b7
parent 8
4e09b7829b51
child 11
fc50a2d39053
permissions
-rw-r--r--

Logarithmic logging base correction

/*!
Parallel computing helper routines.

It mains the number of threads to use, a thread pool, and provides a budgeting tool
to limit the number of threads spawned in recursive computations.

For actually spawning scoped tasks in a thread pool, it currently uses [`rayon`].
*/

use std::sync::Once;
use std::num::NonZeroUsize;
use std::thread::available_parallelism;
pub use rayon::{Scope, ThreadPoolBuilder, ThreadPool};
use std::sync::atomic::{
    AtomicUsize,
    Ordering::{Release, Relaxed},
};

#[cfg(use_custom_thread_pool)]
type Pool = ThreadPool;
#[cfg(not(use_custom_thread_pool))]
type Pool = GlobalPool;

const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1);
static mut N_THREADS : NonZeroUsize = ONE;
static mut POOL : Option<Pool> = None;
static INIT: Once = Once::new();

#[cfg(not(use_custom_thread_pool))]
mod global_pool {
    /// This is a nicer way to use the global pool of [`rayon`].
    pub struct GlobalPool;

    #[cfg(not(use_custom_thread_pool))]
    impl GlobalPool {
        #[inline]
        pub fn scope<'scope, OP, R>(&self, op: OP) -> R
        where
            OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
            R: Send {
            rayon::scope(op)
        }
    }
}

#[cfg(not(use_custom_thread_pool))]
pub use global_pool::GlobalPool;

/// Set the number of threads.
///
/// This routine can only be called once.
/// It also calls [`set_task_overbudgeting`] with $m = (n + 1) / 2$.
pub fn set_num_threads(n : NonZeroUsize) {
    INIT.call_once(|| unsafe {
        N_THREADS = n;
        let n = n.get();
        set_task_overbudgeting((n + 1) / 2);
        POOL = if n > 1 {
            #[cfg(use_custom_thread_pool)] {
                Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap())
            }
            #[cfg(not(use_custom_thread_pool))] {
                ThreadPoolBuilder::new().num_threads(n).build_global().unwrap();
                Some(GlobalPool)
            }
        } else {
            None
        }
    });
}

/// Set overbudgeting count for [`TaskBudget`].
///
/// The initial value is 1. Calling [`set_num_threads`] sets this to  $m = (n + 1) / 2$, where
/// $n$ is the number of threads.
pub fn set_task_overbudgeting(m : usize) {
    unsafe { TASK_OVERBUDGETING.store(m, Relaxed) }
}

/// Set the number of threads to the minimum of `n` and [`available_parallelism`].
///
/// This routine can only be called once.
pub fn set_max_threads(n : NonZeroUsize) {
    let available = available_parallelism().unwrap_or(ONE);
    set_num_threads(available.min(n));
}


/// Get the number of threads
pub fn num_threads() -> NonZeroUsize {
    unsafe { N_THREADS }
}

/// Get the thread pool.
///
/// If the number of configured threads is less than 2, this is None.
/// The pool has [`num_threads`]` - 1` threads.
pub fn thread_pool() -> Option<&'static Pool> {
    unsafe { POOL.as_ref() }
}

/// Get the number of thread pool workers.
///
/// This is [`num_threads`]` - 1`.
pub fn thread_pool_size() -> usize {
    unsafe { N_THREADS }.get() - 1
}

/// Thread budgeting tool.
///
/// This allows limiting the number of tasks inserted into the  queue in nested computations.
/// Otherwise it wraps [`rayon`] when not single-threaded.
pub enum TaskBudget<'scope, 'scheduler> {
    /// No threading performed.
    SingleThreaded,
    /// Initial multi-threaded state
    MultiThreadedInitial {
        /// Thread budget counter
        budget : AtomicUsize,
        /// Thread pool
        pool : &'scheduler Pool,
    },
    /// Nested multi-threaded state
    MultiThreadedZoom {
        /// Thread budget reference
        budget : &'scope AtomicUsize,
        scope : &'scheduler Scope<'scope>,
    }
}

/// Task execution scope for [`TaskBudget`].
pub enum TaskBudgetScope<'scope, 'scheduler> {
    SingleThreaded,
    MultiThreaded {
        budget : &'scope AtomicUsize,
        scope : &'scheduler Scope<'scope>,
    }
}

impl<'scope, 'b> TaskBudget<'scope, 'b> {
    /// Initialise a task budget in the main thread pool.
    ///
    /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom]
    /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget`
    /// is `None`, the [global setting][set_task_overbudgeting] is used.ยง
    pub fn init(overbudget : Option<usize>) -> Self {
        let n = num_threads().get();
        let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) });
        if n <= 1 {
            Self::SingleThreaded
        } else if let Some(pool) = thread_pool() {
            let budget = AtomicUsize::new(n + m);
            Self::MultiThreadedInitial { budget, pool }
        } else {
            Self::SingleThreaded
        }
    }

    /// Initialise single-threaded thread budgeting.
    pub fn none() -> Self { Self::SingleThreaded }
}

impl<'scope, 'scheduler> TaskBudget<'scope, 'scheduler> {
    /// Create a sub-scope for launching tasks
    pub fn zoom<'smaller, F, R : Send>(&self, scheduler : F) -> R
    where 'scope : 'smaller,
          F : for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller {
        match self {
            &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded),
            &Self::MultiThreadedInitial { ref budget, pool } => {
                let budget_ref = unsafe {
                    // Safe: scheduler is dropped when 'smaller becomes invalid.
                    std::mem::transmute::<&AtomicUsize, &'smaller AtomicUsize>(budget)
                };
                pool.scope(move |scope| {
                    scheduler(TaskBudgetScope::MultiThreaded { scope, budget: budget_ref })
                })
            },
            &Self::MultiThreadedZoom { budget, .. /* scope */ } => {
                rayon::scope(move |scope| {
                    scheduler(TaskBudgetScope::MultiThreaded { scope, budget })
                })
            },
        }
    }
}

impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> {
    /// Queue a task or execute it in this thread if the thread budget is exhausted.
    pub fn execute<F>(&self, job : F)
    where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope {
        match self {
            Self::SingleThreaded => job(TaskBudget::SingleThreaded),
            Self::MultiThreaded { scope, budget } => {
                let spawn = budget.fetch_update(Release,
                                                Relaxed,
                                                |n| (n > 1).then_some(n - 1))
                                  .is_ok();
                if spawn {
                    scope.spawn(|scope| {
                        let task_budget = TaskBudget::MultiThreadedZoom { scope, budget };
                        job(task_budget);
                        budget.fetch_add(1, Release);
                    })
                } else {
                    job(TaskBudget::MultiThreadedZoom { scope, budget })
                }
            }
        }
    }
}

/// Runs `scheduler` with a [`TaskBudget`].
///
/// This corresponds to calling `scheduler` with [`TaskBudget::init(None)`].
pub fn with_task_budget<'scope, F, R>(scheduler : F) -> R
where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope {
    scheduler(TaskBudget::init(None))
}

mercurial