| 14 use std::sync::atomic::{ |
14 use std::sync::atomic::{ |
| 15 AtomicUsize, |
15 AtomicUsize, |
| 16 Ordering::{Release, Relaxed}, |
16 Ordering::{Release, Relaxed}, |
| 17 }; |
17 }; |
| 18 |
18 |
| 19 #[cfg(use_custom_thread_pool)] |
19 #[cfg(feature = "use_custom_thread_pool")] |
| 20 type Pool = ThreadPool; |
20 type Pool = ThreadPool; |
| 21 #[cfg(not(use_custom_thread_pool))] |
21 #[cfg(not(feature = "use_custom_thread_pool"))] |
| 22 type Pool = GlobalPool; |
22 type Pool = GlobalPool; |
| 23 |
23 |
| 24 const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; |
24 const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; |
| 25 static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); |
25 static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); |
| 26 static mut N_THREADS : NonZeroUsize = ONE; |
26 static mut N_THREADS : NonZeroUsize = ONE; |
| 27 static mut POOL : Option<Pool> = None; |
27 static mut POOL : Option<Pool> = None; |
| 28 static INIT: Once = Once::new(); |
28 static INIT: Once = Once::new(); |
| 29 |
29 |
| 30 #[cfg(not(use_custom_thread_pool))] |
30 #[cfg(not(feature = "use_custom_thread_pool"))] |
| 31 mod global_pool { |
31 mod global_pool { |
| 32 /// This is a nicer way to use the global pool of [`rayon`]. |
32 /// This is a nicer way to use the global pool of [`rayon`]. |
| 33 pub struct GlobalPool; |
33 pub struct GlobalPool; |
| 34 |
34 |
| 35 #[cfg(not(use_custom_thread_pool))] |
35 #[cfg(not(feature = "use_custom_thread_pool"))] |
| 36 impl GlobalPool { |
36 impl GlobalPool { |
| 37 #[inline] |
37 #[inline] |
| 38 pub fn scope<'scope, OP, R>(&self, op: OP) -> R |
38 pub fn scope<'scope, OP, R>(&self, op: OP) -> R |
| 39 where |
39 where |
| 40 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send, |
40 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send, |
| 55 INIT.call_once(|| unsafe { |
55 INIT.call_once(|| unsafe { |
| 56 N_THREADS = n; |
56 N_THREADS = n; |
| 57 let n = n.get(); |
57 let n = n.get(); |
| 58 set_task_overbudgeting((n + 1) / 2); |
58 set_task_overbudgeting((n + 1) / 2); |
| 59 POOL = if n > 1 { |
59 POOL = if n > 1 { |
| 60 #[cfg(use_custom_thread_pool)] { |
60 #[cfg(feature = "use_custom_thread_pool")] { |
| 61 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) |
61 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) |
| 62 } |
62 } |
| 63 #[cfg(not(use_custom_thread_pool))] { |
63 #[cfg(not(feature = "use_custom_thread_pool"))] { |
| 64 ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); |
64 ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); |
| 65 Some(GlobalPool) |
65 Some(GlobalPool) |
| 66 } |
66 } |
| 67 } else { |
67 } else { |
| 68 None |
68 None |
| 73 /// Set overbudgeting count for [`TaskBudget`]. |
73 /// Set overbudgeting count for [`TaskBudget`]. |
| 74 /// |
74 /// |
| 75 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where |
75 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where |
| 76 /// $n$ is the number of threads. |
76 /// $n$ is the number of threads. |
| 77 pub fn set_task_overbudgeting(m : usize) { |
77 pub fn set_task_overbudgeting(m : usize) { |
| |
78 #[allow(static_mut_refs)] |
| 78 unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } |
79 unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } |
| 79 } |
80 } |
| 80 |
81 |
| 81 /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. |
82 /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. |
| 82 /// |
83 /// |
| 95 /// Get the thread pool. |
96 /// Get the thread pool. |
| 96 /// |
97 /// |
| 97 /// If the number of configured threads is less than 2, this is None. |
98 /// If the number of configured threads is less than 2, this is None. |
| 98 /// The pool has [`num_threads`]` - 1` threads. |
99 /// The pool has [`num_threads`]` - 1` threads. |
| 99 pub fn thread_pool() -> Option<&'static Pool> { |
100 pub fn thread_pool() -> Option<&'static Pool> { |
| |
101 #[allow(static_mut_refs)] |
| 100 unsafe { POOL.as_ref() } |
102 unsafe { POOL.as_ref() } |
| 101 } |
103 } |
| 102 |
104 |
| 103 /// Get the number of thread pool workers. |
105 /// Get the number of thread pool workers. |
| 104 /// |
106 /// |
| 144 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] |
146 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] |
| 145 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` |
147 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` |
| 146 /// is `None`, the [global setting][set_task_overbudgeting] is used.§ |
148 /// is `None`, the [global setting][set_task_overbudgeting] is used.§ |
| 147 pub fn init(overbudget : Option<usize>) -> Self { |
149 pub fn init(overbudget : Option<usize>) -> Self { |
| 148 let n = num_threads().get(); |
150 let n = num_threads().get(); |
| |
151 #[allow(static_mut_refs)] |
| 149 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); |
152 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); |
| 150 if n <= 1 { |
153 if n <= 1 { |
| 151 Self::SingleThreaded |
154 Self::SingleThreaded |
| 152 } else if let Some(pool) = thread_pool() { |
155 } else if let Some(pool) = thread_pool() { |
| 153 let budget = AtomicUsize::new(n + m); |
156 let budget = AtomicUsize::new(n + m); |