diff -r f40dfaf2166d -r fc50a2d39053 src/parallelism.rs --- a/src/parallelism.rs Fri Nov 18 10:29:50 2022 +0200 +++ b/src/parallelism.rs Wed Nov 02 00:11:49 2022 +0200 @@ -107,6 +107,12 @@ unsafe { N_THREADS }.get() - 1 } +#[derive(Debug, Copy, Clone)] +struct WorkCounter { + current_units : usize, + min_units : usize, +} + /// Thread budgeting tool. /// /// This allows limiting the number of tasks inserted into the queue in nested computations. @@ -120,12 +126,15 @@ budget : AtomicUsize, /// Thread pool pool : &'scheduler Pool, + /// Counter for work performed, + min_units : usize, }, /// Nested multi-threaded state MultiThreadedZoom { /// Thread budget reference budget : &'scope AtomicUsize, scope : &'scheduler Scope<'scope>, + work : WorkCounter, } } @@ -135,6 +144,7 @@ MultiThreaded { budget : &'scope AtomicUsize, scope : &'scheduler Scope<'scope>, + work : WorkCounter, } } @@ -143,15 +153,18 @@ /// /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` - /// is `None`, the [global setting][set_task_overbudgeting] is used.§ - pub fn init(overbudget : Option) -> Self { + /// is `None`, the [global setting][set_task_overbudgeting] is used. + /// + /// The parameter `min_units` specifies the minimum number of work units to perform + /// in each task before off-loading work to other threads. + pub fn init(min_units : usize, overbudget : Option) -> Self { let n = num_threads().get(); let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); if n <= 1 { Self::SingleThreaded } else if let Some(pool) = thread_pool() { let budget = AtomicUsize::new(n + m); - Self::MultiThreadedInitial { budget, pool } + Self::MultiThreadedInitial { budget, pool, min_units } } else { Self::SingleThreaded } @@ -168,19 +181,39 @@ F : for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller { match self { &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded), - &Self::MultiThreadedInitial { ref budget, pool } => { + &Self::MultiThreadedInitial { ref budget, pool, min_units } => { let budget_ref = unsafe { // Safe: scheduler is dropped when 'smaller becomes invalid. std::mem::transmute::<&AtomicUsize, &'smaller AtomicUsize>(budget) }; pool.scope(move |scope| { - scheduler(TaskBudgetScope::MultiThreaded { scope, budget: budget_ref }) + scheduler(TaskBudgetScope::MultiThreaded { + scope, + budget: budget_ref, + work : WorkCounter { min_units, current_units : 0 }, + }) }) }, - &Self::MultiThreadedZoom { budget, .. /* scope */ } => { - rayon::scope(move |scope| { - scheduler(TaskBudgetScope::MultiThreaded { scope, budget }) - }) + &Self::MultiThreadedZoom { budget, work, scope } => { + if work.current_units < work.min_units { + // Not enough work performed in this task, stay in the same thread. + scheduler(TaskBudgetScope::MultiThreaded { + scope, + budget, + work, + }) + } else { + rayon::scope(move |new_scope| { + scheduler(TaskBudgetScope::MultiThreaded { + scope : new_scope, + budget, + work : WorkCounter { + min_units : work.min_units, + current_units : 0 + }, + }) + }) + } }, } } @@ -188,24 +221,33 @@ impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> { /// Queue a task or execute it in this thread if the thread budget is exhausted. - pub fn execute(&self, job : F) + pub fn execute(&mut self, job : F) where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope { match self { Self::SingleThreaded => job(TaskBudget::SingleThreaded), - Self::MultiThreaded { scope, budget } => { - let spawn = budget.fetch_update(Release, - Relaxed, - |n| (n > 1).then_some(n - 1)) - .is_ok(); + Self::MultiThreaded { scope, budget, work } => { + let spawn = work.current_units >= work.min_units + && budget.fetch_update(Release, + Relaxed, + |n| (n > 1).then_some(n - 1)) + .is_ok(); if spawn { scope.spawn(|scope| { - let task_budget = TaskBudget::MultiThreadedZoom { scope, budget }; + let task_budget = TaskBudget::MultiThreadedZoom { + scope, + budget, + work : WorkCounter { + min_units : work.min_units, + current_units : 0 + }, + }; job(task_budget); budget.fetch_add(1, Release); }) } else { - job(TaskBudget::MultiThreadedZoom { scope, budget }) - } + let task_budget = TaskBudget::MultiThreadedZoom { scope, budget, work }; + job(task_budget) +§ } } } }