| 5 to limit the number of threads spawned in recursive computations. |
5 to limit the number of threads spawned in recursive computations. |
| 6 |
6 |
| 7 For actually spawning scoped tasks in a thread pool, it currently uses [`rayon`]. |
7 For actually spawning scoped tasks in a thread pool, it currently uses [`rayon`]. |
| 8 */ |
8 */ |
| 9 |
9 |
| 10 use std::sync::Once; |
10 pub use rayon::{Scope, ThreadPool, ThreadPoolBuilder}; |
| 11 use std::num::NonZeroUsize; |
11 use std::num::NonZeroUsize; |
| 12 use std::thread::available_parallelism; |
|
| 13 pub use rayon::{Scope, ThreadPoolBuilder, ThreadPool}; |
|
| 14 use std::sync::atomic::{ |
12 use std::sync::atomic::{ |
| 15 AtomicUsize, |
13 AtomicUsize, |
| 16 Ordering::{Release, Relaxed}, |
14 Ordering::{Relaxed, Release}, |
| 17 }; |
15 }; |
| |
16 use std::sync::Once; |
| |
17 use std::thread::available_parallelism; |
| 18 |
18 |
| 19 #[cfg(feature = "use_custom_thread_pool")] |
19 #[cfg(feature = "use_custom_thread_pool")] |
| 20 type Pool = ThreadPool; |
20 type Pool = ThreadPool; |
| 21 #[cfg(not(feature = "use_custom_thread_pool"))] |
21 #[cfg(not(feature = "use_custom_thread_pool"))] |
| 22 type Pool = GlobalPool; |
22 type Pool = GlobalPool; |
| 23 |
23 |
| 24 const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; |
24 const ONE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; |
| 25 static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); |
25 static mut TASK_OVERBUDGETING: AtomicUsize = AtomicUsize::new(1); |
| 26 static mut N_THREADS : NonZeroUsize = ONE; |
26 static mut N_THREADS: NonZeroUsize = ONE; |
| 27 static mut POOL : Option<Pool> = None; |
27 static mut POOL: Option<Pool> = None; |
| 28 static INIT: Once = Once::new(); |
28 static INIT: Once = Once::new(); |
| 29 |
29 |
| 30 #[cfg(not(feature = "use_custom_thread_pool"))] |
30 #[cfg(not(feature = "use_custom_thread_pool"))] |
| 31 mod global_pool { |
31 mod global_pool { |
| 32 /// This is a nicer way to use the global pool of [`rayon`]. |
32 /// This is a nicer way to use the global pool of [`rayon`]. |
| 49 |
50 |
| 50 /// Set the number of threads. |
51 /// Set the number of threads. |
| 51 /// |
52 /// |
| 52 /// This routine can only be called once. |
53 /// This routine can only be called once. |
| 53 /// It also calls [`set_task_overbudgeting`] with $m = (n + 1) / 2$. |
54 /// It also calls [`set_task_overbudgeting`] with $m = (n + 1) / 2$. |
| 54 pub fn set_num_threads(n : NonZeroUsize) { |
55 pub fn set_num_threads(n: NonZeroUsize) { |
| 55 INIT.call_once(|| unsafe { |
56 INIT.call_once(|| unsafe { |
| 56 N_THREADS = n; |
57 N_THREADS = n; |
| 57 let n = n.get(); |
58 let n = n.get(); |
| 58 set_task_overbudgeting((n + 1) / 2); |
59 set_task_overbudgeting((n + 1) / 2); |
| 59 POOL = if n > 1 { |
60 POOL = if n > 1 { |
| 60 #[cfg(feature = "use_custom_thread_pool")] { |
61 #[cfg(feature = "use_custom_thread_pool")] |
| |
62 { |
| 61 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) |
63 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) |
| 62 } |
64 } |
| 63 #[cfg(not(feature = "use_custom_thread_pool"))] { |
65 #[cfg(not(feature = "use_custom_thread_pool"))] |
| 64 ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); |
66 { |
| |
67 ThreadPoolBuilder::new() |
| |
68 .num_threads(n) |
| |
69 .build_global() |
| |
70 .unwrap(); |
| 65 Some(GlobalPool) |
71 Some(GlobalPool) |
| 66 } |
72 } |
| 67 } else { |
73 } else { |
| 68 None |
74 None |
| 69 } |
75 } |
| 72 |
78 |
| 73 /// Set overbudgeting count for [`TaskBudget`]. |
79 /// Set overbudgeting count for [`TaskBudget`]. |
| 74 /// |
80 /// |
| 75 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where |
81 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where |
| 76 /// $n$ is the number of threads. |
82 /// $n$ is the number of threads. |
| 77 pub fn set_task_overbudgeting(m : usize) { |
83 pub fn set_task_overbudgeting(m: usize) { |
| 78 #[allow(static_mut_refs)] |
84 #[allow(static_mut_refs)] |
| 79 unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } |
85 unsafe { |
| |
86 TASK_OVERBUDGETING.store(m, Relaxed) |
| |
87 } |
| 80 } |
88 } |
| 81 |
89 |
| 82 /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. |
90 /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. |
| 83 /// |
91 /// |
| 84 /// This routine can only be called once. |
92 /// This routine can only be called once. |
| 85 pub fn set_max_threads(n : NonZeroUsize) { |
93 pub fn set_max_threads(n: NonZeroUsize) { |
| 86 let available = available_parallelism().unwrap_or(ONE); |
94 let available = available_parallelism().unwrap_or(ONE); |
| 87 set_num_threads(available.min(n)); |
95 set_num_threads(available.min(n)); |
| 88 } |
96 } |
| 89 |
|
| 90 |
97 |
| 91 /// Get the number of threads |
98 /// Get the number of threads |
| 92 pub fn num_threads() -> NonZeroUsize { |
99 pub fn num_threads() -> NonZeroUsize { |
| 93 unsafe { N_THREADS } |
100 unsafe { N_THREADS } |
| 94 } |
101 } |
| 97 /// |
104 /// |
| 98 /// If the number of configured threads is less than 2, this is None. |
105 /// If the number of configured threads is less than 2, this is None. |
| 99 /// The pool has [`num_threads`]` - 1` threads. |
106 /// The pool has [`num_threads`]` - 1` threads. |
| 100 pub fn thread_pool() -> Option<&'static Pool> { |
107 pub fn thread_pool() -> Option<&'static Pool> { |
| 101 #[allow(static_mut_refs)] |
108 #[allow(static_mut_refs)] |
| 102 unsafe { POOL.as_ref() } |
109 unsafe { |
| |
110 POOL.as_ref() |
| |
111 } |
| 103 } |
112 } |
| 104 |
113 |
| 105 /// Get the number of thread pool workers. |
114 /// Get the number of thread pool workers. |
| 106 /// |
115 /// |
| 107 /// This is [`num_threads`]` - 1`. |
116 /// This is [`num_threads`]` - 1`. |
| 117 /// No threading performed. |
126 /// No threading performed. |
| 118 SingleThreaded, |
127 SingleThreaded, |
| 119 /// Initial multi-threaded state |
128 /// Initial multi-threaded state |
| 120 MultiThreadedInitial { |
129 MultiThreadedInitial { |
| 121 /// Thread budget counter |
130 /// Thread budget counter |
| 122 budget : AtomicUsize, |
131 budget: AtomicUsize, |
| 123 /// Thread pool |
132 /// Thread pool |
| 124 pool : &'scheduler Pool, |
133 pool: &'scheduler Pool, |
| 125 }, |
134 }, |
| 126 /// Nested multi-threaded state |
135 /// Nested multi-threaded state |
| 127 MultiThreadedZoom { |
136 MultiThreadedZoom { |
| 128 /// Thread budget reference |
137 /// Thread budget reference |
| 129 budget : &'scope AtomicUsize, |
138 budget: &'scope AtomicUsize, |
| 130 scope : &'scheduler Scope<'scope>, |
139 scope: &'scheduler Scope<'scope>, |
| 131 } |
140 }, |
| 132 } |
141 } |
| 133 |
142 |
| 134 /// Task execution scope for [`TaskBudget`]. |
143 /// Task execution scope for [`TaskBudget`]. |
| 135 pub enum TaskBudgetScope<'scope, 'scheduler> { |
144 pub enum TaskBudgetScope<'scope, 'scheduler> { |
| 136 SingleThreaded, |
145 SingleThreaded, |
| 137 MultiThreaded { |
146 MultiThreaded { |
| 138 budget : &'scope AtomicUsize, |
147 budget: &'scope AtomicUsize, |
| 139 scope : &'scheduler Scope<'scope>, |
148 scope: &'scheduler Scope<'scope>, |
| 140 } |
149 }, |
| 141 } |
150 } |
| 142 |
151 |
| 143 impl<'scope, 'b> TaskBudget<'scope, 'b> { |
152 impl<'scope, 'b> TaskBudget<'scope, 'b> { |
| 144 /// Initialise a task budget in the main thread pool. |
153 /// Initialise a task budget in the main thread pool. |
| 145 /// |
154 /// |
| 146 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] |
155 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] |
| 147 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` |
156 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` |
| 148 /// is `None`, the [global setting][set_task_overbudgeting] is used.§ |
157 /// is `None`, the [global setting][set_task_overbudgeting] is used.§ |
| 149 pub fn init(overbudget : Option<usize>) -> Self { |
158 pub fn init(overbudget: Option<usize>) -> Self { |
| 150 let n = num_threads().get(); |
159 let n = num_threads().get(); |
| 151 #[allow(static_mut_refs)] |
160 #[allow(static_mut_refs)] |
| 152 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); |
161 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); |
| 153 if n <= 1 { |
162 if n <= 1 { |
| 154 Self::SingleThreaded |
163 Self::SingleThreaded |
| 159 Self::SingleThreaded |
168 Self::SingleThreaded |
| 160 } |
169 } |
| 161 } |
170 } |
| 162 |
171 |
| 163 /// Initialise single-threaded thread budgeting. |
172 /// Initialise single-threaded thread budgeting. |
| 164 pub fn none() -> Self { Self::SingleThreaded } |
173 pub fn none() -> Self { |
| |
174 Self::SingleThreaded |
| |
175 } |
| 165 } |
176 } |
| 166 |
177 |
| 167 impl<'scope, 'scheduler> TaskBudget<'scope, 'scheduler> { |
178 impl<'scope, 'scheduler> TaskBudget<'scope, 'scheduler> { |
| 168 /// Create a sub-scope for launching tasks |
179 /// Create a sub-scope for launching tasks |
| 169 pub fn zoom<'smaller, F, R : Send>(&self, scheduler : F) -> R |
180 pub fn zoom<'smaller, F, R: Send>(&self, scheduler: F) -> R |
| 170 where 'scope : 'smaller, |
181 where |
| 171 F : for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller { |
182 'scope: 'smaller, |
| |
183 F: for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller, |
| |
184 { |
| 172 match self { |
185 match self { |
| 173 &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded), |
186 &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded), |
| 174 &Self::MultiThreadedInitial { ref budget, pool } => { |
187 &Self::MultiThreadedInitial { ref budget, pool } => { |
| 175 let budget_ref = unsafe { |
188 let budget_ref = unsafe { |
| 176 // Safe: scheduler is dropped when 'smaller becomes invalid. |
189 // Safe: scheduler is dropped when 'smaller becomes invalid. |
| 189 } |
202 } |
| 190 } |
203 } |
| 191 |
204 |
| 192 impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> { |
205 impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> { |
| 193 /// Queue a task or execute it in this thread if the thread budget is exhausted. |
206 /// Queue a task or execute it in this thread if the thread budget is exhausted. |
| 194 pub fn execute<F>(&self, job : F) |
207 pub fn execute<F>(&self, job: F) |
| 195 where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope { |
208 where |
| |
209 F: for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope, |
| |
210 { |
| 196 match self { |
211 match self { |
| 197 Self::SingleThreaded => job(TaskBudget::SingleThreaded), |
212 Self::SingleThreaded => job(TaskBudget::SingleThreaded), |
| 198 Self::MultiThreaded { scope, budget } => { |
213 Self::MultiThreaded { scope, budget } => { |
| 199 let spawn = budget.fetch_update(Release, |
214 let spawn = budget |
| 200 Relaxed, |
215 .fetch_update(Release, Relaxed, |n| (n > 1).then_some(n - 1)) |
| 201 |n| (n > 1).then_some(n - 1)) |
216 .is_ok(); |
| 202 .is_ok(); |
|
| 203 if spawn { |
217 if spawn { |
| 204 scope.spawn(|scope| { |
218 scope.spawn(|scope| { |
| 205 let task_budget = TaskBudget::MultiThreadedZoom { scope, budget }; |
219 let task_budget = TaskBudget::MultiThreadedZoom { scope, budget }; |
| 206 job(task_budget); |
220 job(task_budget); |
| 207 budget.fetch_add(1, Release); |
221 budget.fetch_add(1, Release); |
| 214 } |
228 } |
| 215 } |
229 } |
| 216 |
230 |
| 217 /// Runs `scheduler` with a [`TaskBudget`]. |
231 /// Runs `scheduler` with a [`TaskBudget`]. |
| 218 /// |
232 /// |
| 219 /// This corresponds to calling `scheduler` with [`TaskBudget::init(None)`]. |
233 /// This corresponds to calling `scheduler` with [`TaskBudget::init`]`(None)`. |
| 220 pub fn with_task_budget<'scope, F, R>(scheduler : F) -> R |
234 pub fn with_task_budget<'scope, F, R>(scheduler: F) -> R |
| 221 where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope { |
235 where |
| |
236 F: for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope, |
| |
237 { |
| 222 scheduler(TaskBudget::init(None)) |
238 scheduler(TaskBudget::init(None)) |
| 223 } |
239 } |