src/parallelism.rs

branch
dev
changeset 197
1f301affeae3
parent 45
ad1f3705c3fc
equal deleted inserted replaced
196:3697375f4ee9 197:1f301affeae3
5 to limit the number of threads spawned in recursive computations. 5 to limit the number of threads spawned in recursive computations.
6 6
7 For actually spawning scoped tasks in a thread pool, it currently uses [`rayon`]. 7 For actually spawning scoped tasks in a thread pool, it currently uses [`rayon`].
8 */ 8 */
9 9
10 use std::sync::Once; 10 pub use rayon::{Scope, ThreadPool, ThreadPoolBuilder};
11 use std::num::NonZeroUsize; 11 use std::num::NonZeroUsize;
12 use std::thread::available_parallelism;
13 pub use rayon::{Scope, ThreadPoolBuilder, ThreadPool};
14 use std::sync::atomic::{ 12 use std::sync::atomic::{
15 AtomicUsize, 13 AtomicUsize,
16 Ordering::{Release, Relaxed}, 14 Ordering::{Relaxed, Release},
17 }; 15 };
16 use std::sync::Once;
17 use std::thread::available_parallelism;
18 18
19 #[cfg(feature = "use_custom_thread_pool")] 19 #[cfg(feature = "use_custom_thread_pool")]
20 type Pool = ThreadPool; 20 type Pool = ThreadPool;
21 #[cfg(not(feature = "use_custom_thread_pool"))] 21 #[cfg(not(feature = "use_custom_thread_pool"))]
22 type Pool = GlobalPool; 22 type Pool = GlobalPool;
23 23
24 const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; 24 const ONE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
25 static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); 25 static mut TASK_OVERBUDGETING: AtomicUsize = AtomicUsize::new(1);
26 static mut N_THREADS : NonZeroUsize = ONE; 26 static mut N_THREADS: NonZeroUsize = ONE;
27 static mut POOL : Option<Pool> = None; 27 static mut POOL: Option<Pool> = None;
28 static INIT: Once = Once::new(); 28 static INIT: Once = Once::new();
29 29
30 #[cfg(not(feature = "use_custom_thread_pool"))] 30 #[cfg(not(feature = "use_custom_thread_pool"))]
31 mod global_pool { 31 mod global_pool {
32 /// This is a nicer way to use the global pool of [`rayon`]. 32 /// This is a nicer way to use the global pool of [`rayon`].
36 impl GlobalPool { 36 impl GlobalPool {
37 #[inline] 37 #[inline]
38 pub fn scope<'scope, OP, R>(&self, op: OP) -> R 38 pub fn scope<'scope, OP, R>(&self, op: OP) -> R
39 where 39 where
40 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send, 40 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
41 R: Send { 41 R: Send,
42 {
42 rayon::scope(op) 43 rayon::scope(op)
43 } 44 }
44 } 45 }
45 } 46 }
46 47
49 50
50 /// Set the number of threads. 51 /// Set the number of threads.
51 /// 52 ///
52 /// This routine can only be called once. 53 /// This routine can only be called once.
53 /// It also calls [`set_task_overbudgeting`] with $m = (n + 1) / 2$. 54 /// It also calls [`set_task_overbudgeting`] with $m = (n + 1) / 2$.
54 pub fn set_num_threads(n : NonZeroUsize) { 55 pub fn set_num_threads(n: NonZeroUsize) {
55 INIT.call_once(|| unsafe { 56 INIT.call_once(|| unsafe {
56 N_THREADS = n; 57 N_THREADS = n;
57 let n = n.get(); 58 let n = n.get();
58 set_task_overbudgeting((n + 1) / 2); 59 set_task_overbudgeting((n + 1) / 2);
59 POOL = if n > 1 { 60 POOL = if n > 1 {
60 #[cfg(feature = "use_custom_thread_pool")] { 61 #[cfg(feature = "use_custom_thread_pool")]
62 {
61 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) 63 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap())
62 } 64 }
63 #[cfg(not(feature = "use_custom_thread_pool"))] { 65 #[cfg(not(feature = "use_custom_thread_pool"))]
64 ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); 66 {
67 ThreadPoolBuilder::new()
68 .num_threads(n)
69 .build_global()
70 .unwrap();
65 Some(GlobalPool) 71 Some(GlobalPool)
66 } 72 }
67 } else { 73 } else {
68 None 74 None
69 } 75 }
72 78
73 /// Set overbudgeting count for [`TaskBudget`]. 79 /// Set overbudgeting count for [`TaskBudget`].
74 /// 80 ///
75 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where 81 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where
76 /// $n$ is the number of threads. 82 /// $n$ is the number of threads.
77 pub fn set_task_overbudgeting(m : usize) { 83 pub fn set_task_overbudgeting(m: usize) {
78 #[allow(static_mut_refs)] 84 #[allow(static_mut_refs)]
79 unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } 85 unsafe {
86 TASK_OVERBUDGETING.store(m, Relaxed)
87 }
80 } 88 }
81 89
82 /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. 90 /// Set the number of threads to the minimum of `n` and [`available_parallelism`].
83 /// 91 ///
84 /// This routine can only be called once. 92 /// This routine can only be called once.
85 pub fn set_max_threads(n : NonZeroUsize) { 93 pub fn set_max_threads(n: NonZeroUsize) {
86 let available = available_parallelism().unwrap_or(ONE); 94 let available = available_parallelism().unwrap_or(ONE);
87 set_num_threads(available.min(n)); 95 set_num_threads(available.min(n));
88 } 96 }
89
90 97
91 /// Get the number of threads 98 /// Get the number of threads
92 pub fn num_threads() -> NonZeroUsize { 99 pub fn num_threads() -> NonZeroUsize {
93 unsafe { N_THREADS } 100 unsafe { N_THREADS }
94 } 101 }
97 /// 104 ///
98 /// If the number of configured threads is less than 2, this is None. 105 /// If the number of configured threads is less than 2, this is None.
99 /// The pool has [`num_threads`]` - 1` threads. 106 /// The pool has [`num_threads`]` - 1` threads.
100 pub fn thread_pool() -> Option<&'static Pool> { 107 pub fn thread_pool() -> Option<&'static Pool> {
101 #[allow(static_mut_refs)] 108 #[allow(static_mut_refs)]
102 unsafe { POOL.as_ref() } 109 unsafe {
110 POOL.as_ref()
111 }
103 } 112 }
104 113
105 /// Get the number of thread pool workers. 114 /// Get the number of thread pool workers.
106 /// 115 ///
107 /// This is [`num_threads`]` - 1`. 116 /// This is [`num_threads`]` - 1`.
117 /// No threading performed. 126 /// No threading performed.
118 SingleThreaded, 127 SingleThreaded,
119 /// Initial multi-threaded state 128 /// Initial multi-threaded state
120 MultiThreadedInitial { 129 MultiThreadedInitial {
121 /// Thread budget counter 130 /// Thread budget counter
122 budget : AtomicUsize, 131 budget: AtomicUsize,
123 /// Thread pool 132 /// Thread pool
124 pool : &'scheduler Pool, 133 pool: &'scheduler Pool,
125 }, 134 },
126 /// Nested multi-threaded state 135 /// Nested multi-threaded state
127 MultiThreadedZoom { 136 MultiThreadedZoom {
128 /// Thread budget reference 137 /// Thread budget reference
129 budget : &'scope AtomicUsize, 138 budget: &'scope AtomicUsize,
130 scope : &'scheduler Scope<'scope>, 139 scope: &'scheduler Scope<'scope>,
131 } 140 },
132 } 141 }
133 142
134 /// Task execution scope for [`TaskBudget`]. 143 /// Task execution scope for [`TaskBudget`].
135 pub enum TaskBudgetScope<'scope, 'scheduler> { 144 pub enum TaskBudgetScope<'scope, 'scheduler> {
136 SingleThreaded, 145 SingleThreaded,
137 MultiThreaded { 146 MultiThreaded {
138 budget : &'scope AtomicUsize, 147 budget: &'scope AtomicUsize,
139 scope : &'scheduler Scope<'scope>, 148 scope: &'scheduler Scope<'scope>,
140 } 149 },
141 } 150 }
142 151
143 impl<'scope, 'b> TaskBudget<'scope, 'b> { 152 impl<'scope, 'b> TaskBudget<'scope, 'b> {
144 /// Initialise a task budget in the main thread pool. 153 /// Initialise a task budget in the main thread pool.
145 /// 154 ///
146 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] 155 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom]
147 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` 156 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget`
148 /// is `None`, the [global setting][set_task_overbudgeting] is used.§ 157 /// is `None`, the [global setting][set_task_overbudgeting] is used.§
149 pub fn init(overbudget : Option<usize>) -> Self { 158 pub fn init(overbudget: Option<usize>) -> Self {
150 let n = num_threads().get(); 159 let n = num_threads().get();
151 #[allow(static_mut_refs)] 160 #[allow(static_mut_refs)]
152 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); 161 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) });
153 if n <= 1 { 162 if n <= 1 {
154 Self::SingleThreaded 163 Self::SingleThreaded
159 Self::SingleThreaded 168 Self::SingleThreaded
160 } 169 }
161 } 170 }
162 171
163 /// Initialise single-threaded thread budgeting. 172 /// Initialise single-threaded thread budgeting.
164 pub fn none() -> Self { Self::SingleThreaded } 173 pub fn none() -> Self {
174 Self::SingleThreaded
175 }
165 } 176 }
166 177
167 impl<'scope, 'scheduler> TaskBudget<'scope, 'scheduler> { 178 impl<'scope, 'scheduler> TaskBudget<'scope, 'scheduler> {
168 /// Create a sub-scope for launching tasks 179 /// Create a sub-scope for launching tasks
169 pub fn zoom<'smaller, F, R : Send>(&self, scheduler : F) -> R 180 pub fn zoom<'smaller, F, R: Send>(&self, scheduler: F) -> R
170 where 'scope : 'smaller, 181 where
171 F : for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller { 182 'scope: 'smaller,
183 F: for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller,
184 {
172 match self { 185 match self {
173 &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded), 186 &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded),
174 &Self::MultiThreadedInitial { ref budget, pool } => { 187 &Self::MultiThreadedInitial { ref budget, pool } => {
175 let budget_ref = unsafe { 188 let budget_ref = unsafe {
176 // Safe: scheduler is dropped when 'smaller becomes invalid. 189 // Safe: scheduler is dropped when 'smaller becomes invalid.
189 } 202 }
190 } 203 }
191 204
192 impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> { 205 impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> {
193 /// Queue a task or execute it in this thread if the thread budget is exhausted. 206 /// Queue a task or execute it in this thread if the thread budget is exhausted.
194 pub fn execute<F>(&self, job : F) 207 pub fn execute<F>(&self, job: F)
195 where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope { 208 where
209 F: for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope,
210 {
196 match self { 211 match self {
197 Self::SingleThreaded => job(TaskBudget::SingleThreaded), 212 Self::SingleThreaded => job(TaskBudget::SingleThreaded),
198 Self::MultiThreaded { scope, budget } => { 213 Self::MultiThreaded { scope, budget } => {
199 let spawn = budget.fetch_update(Release, 214 let spawn = budget
200 Relaxed, 215 .fetch_update(Release, Relaxed, |n| (n > 1).then_some(n - 1))
201 |n| (n > 1).then_some(n - 1)) 216 .is_ok();
202 .is_ok();
203 if spawn { 217 if spawn {
204 scope.spawn(|scope| { 218 scope.spawn(|scope| {
205 let task_budget = TaskBudget::MultiThreadedZoom { scope, budget }; 219 let task_budget = TaskBudget::MultiThreadedZoom { scope, budget };
206 job(task_budget); 220 job(task_budget);
207 budget.fetch_add(1, Release); 221 budget.fetch_add(1, Release);
214 } 228 }
215 } 229 }
216 230
217 /// Runs `scheduler` with a [`TaskBudget`]. 231 /// Runs `scheduler` with a [`TaskBudget`].
218 /// 232 ///
219 /// This corresponds to calling `scheduler` with [`TaskBudget::init(None)`]. 233 /// This corresponds to calling `scheduler` with [`TaskBudget::init`]`(None)`.
220 pub fn with_task_budget<'scope, F, R>(scheduler : F) -> R 234 pub fn with_task_budget<'scope, F, R>(scheduler: F) -> R
221 where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope { 235 where
236 F: for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope,
237 {
222 scheduler(TaskBudget::init(None)) 238 scheduler(TaskBudget::init(None))
223 } 239 }

mercurial