work-units-sketching draft

Wed, 02 Nov 2022 00:11:49 +0200

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 02 Nov 2022 00:11:49 +0200
changeset 11
fc50a2d39053
parent 9
f40dfaf2166d

work-units-sketching

src/parallelism.rs file | annotate | diff | comparison | revisions
--- a/src/parallelism.rs	Fri Nov 18 10:29:50 2022 +0200
+++ b/src/parallelism.rs	Wed Nov 02 00:11:49 2022 +0200
@@ -107,6 +107,12 @@
     unsafe { N_THREADS }.get() - 1
 }
 
+#[derive(Debug, Copy, Clone)]
+struct WorkCounter {
+    current_units : usize,
+    min_units : usize,
+}
+
 /// Thread budgeting tool.
 ///
 /// This allows limiting the number of tasks inserted into the  queue in nested computations.
@@ -120,12 +126,15 @@
         budget : AtomicUsize,
         /// Thread pool
         pool : &'scheduler Pool,
+        /// Counter for work performed,
+        min_units : usize,
     },
     /// Nested multi-threaded state
     MultiThreadedZoom {
         /// Thread budget reference
         budget : &'scope AtomicUsize,
         scope : &'scheduler Scope<'scope>,
+        work : WorkCounter,
     }
 }
 
@@ -135,6 +144,7 @@
     MultiThreaded {
         budget : &'scope AtomicUsize,
         scope : &'scheduler Scope<'scope>,
+        work : WorkCounter,
     }
 }
 
@@ -143,15 +153,18 @@
     ///
     /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom]
     /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget`
-    /// is `None`, the [global setting][set_task_overbudgeting] is used.§
-    pub fn init(overbudget : Option<usize>) -> Self {
+    /// is `None`, the [global setting][set_task_overbudgeting] is used.
+    ///
+    /// The parameter `min_units` specifies the minimum number of work units to perform
+    /// in each task before off-loading work to other threads.
+    pub fn init(min_units : usize, overbudget : Option<usize>) -> Self {
         let n = num_threads().get();
         let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) });
         if n <= 1 {
             Self::SingleThreaded
         } else if let Some(pool) = thread_pool() {
             let budget = AtomicUsize::new(n + m);
-            Self::MultiThreadedInitial { budget, pool }
+            Self::MultiThreadedInitial { budget, pool, min_units }
         } else {
             Self::SingleThreaded
         }
@@ -168,19 +181,39 @@
           F : for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller {
         match self {
             &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded),
-            &Self::MultiThreadedInitial { ref budget, pool } => {
+            &Self::MultiThreadedInitial { ref budget, pool, min_units } => {
                 let budget_ref = unsafe {
                     // Safe: scheduler is dropped when 'smaller becomes invalid.
                     std::mem::transmute::<&AtomicUsize, &'smaller AtomicUsize>(budget)
                 };
                 pool.scope(move |scope| {
-                    scheduler(TaskBudgetScope::MultiThreaded { scope, budget: budget_ref })
+                    scheduler(TaskBudgetScope::MultiThreaded {
+                        scope,
+                        budget: budget_ref,
+                        work : WorkCounter { min_units, current_units : 0 },
+                    })
                 })
             },
-            &Self::MultiThreadedZoom { budget, .. /* scope */ } => {
-                rayon::scope(move |scope| {
-                    scheduler(TaskBudgetScope::MultiThreaded { scope, budget })
-                })
+            &Self::MultiThreadedZoom { budget, work, scope } => {
+                if work.current_units < work.min_units {
+                    // Not enough work performed in this task, stay in the same thread.
+                    scheduler(TaskBudgetScope::MultiThreaded {
+                        scope,
+                        budget,
+                        work,
+                    })
+                } else {
+                    rayon::scope(move |new_scope| {
+                        scheduler(TaskBudgetScope::MultiThreaded {
+                            scope : new_scope,
+                            budget,
+                            work : WorkCounter {
+                                min_units : work.min_units,
+                                current_units : 0
+                            },
+                        })
+                    })
+                }
             },
         }
     }
@@ -188,24 +221,33 @@
 
 impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> {
     /// Queue a task or execute it in this thread if the thread budget is exhausted.
-    pub fn execute<F>(&self, job : F)
+    pub fn execute<F>(&mut self, job : F)
     where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope {
         match self {
             Self::SingleThreaded => job(TaskBudget::SingleThreaded),
-            Self::MultiThreaded { scope, budget } => {
-                let spawn = budget.fetch_update(Release,
-                                                Relaxed,
-                                                |n| (n > 1).then_some(n - 1))
-                                  .is_ok();
+            Self::MultiThreaded { scope, budget, work } => {
+                let spawn = work.current_units >= work.min_units
+                            && budget.fetch_update(Release,
+                                                   Relaxed,
+                                                   |n| (n > 1).then_some(n - 1))
+                                     .is_ok();
                 if spawn {
                     scope.spawn(|scope| {
-                        let task_budget = TaskBudget::MultiThreadedZoom { scope, budget };
+                        let task_budget = TaskBudget::MultiThreadedZoom {
+                            scope,
+                            budget,
+                            work :  WorkCounter {
+                                min_units : work.min_units,
+                                current_units : 0
+                            },
+                        };
                         job(task_budget);
                         budget.fetch_add(1, Release);
                     })
                 } else {
-                    job(TaskBudget::MultiThreadedZoom { scope, budget })
-                }
+                    let task_budget = TaskBudget::MultiThreadedZoom { scope, budget, work };
+                    job(task_budget)
+§                }
             }
         }
     }

mercurial