14 use std::sync::atomic::{ |
14 use std::sync::atomic::{ |
15 AtomicUsize, |
15 AtomicUsize, |
16 Ordering::{Release, Relaxed}, |
16 Ordering::{Release, Relaxed}, |
17 }; |
17 }; |
18 |
18 |
19 #[cfg(use_custom_thread_pool)] |
19 #[cfg(feature = "use_custom_thread_pool")] |
20 type Pool = ThreadPool; |
20 type Pool = ThreadPool; |
21 #[cfg(not(use_custom_thread_pool))] |
21 #[cfg(not(feature = "use_custom_thread_pool"))] |
22 type Pool = GlobalPool; |
22 type Pool = GlobalPool; |
23 |
23 |
24 const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; |
24 const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; |
25 static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); |
25 static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); |
26 static mut N_THREADS : NonZeroUsize = ONE; |
26 static mut N_THREADS : NonZeroUsize = ONE; |
27 static mut POOL : Option<Pool> = None; |
27 static mut POOL : Option<Pool> = None; |
28 static INIT: Once = Once::new(); |
28 static INIT: Once = Once::new(); |
29 |
29 |
30 #[cfg(not(use_custom_thread_pool))] |
30 #[cfg(not(feature = "use_custom_thread_pool"))] |
31 mod global_pool { |
31 mod global_pool { |
32 /// This is a nicer way to use the global pool of [`rayon`]. |
32 /// This is a nicer way to use the global pool of [`rayon`]. |
33 pub struct GlobalPool; |
33 pub struct GlobalPool; |
34 |
34 |
35 #[cfg(not(use_custom_thread_pool))] |
35 #[cfg(not(feature = "use_custom_thread_pool"))] |
36 impl GlobalPool { |
36 impl GlobalPool { |
37 #[inline] |
37 #[inline] |
38 pub fn scope<'scope, OP, R>(&self, op: OP) -> R |
38 pub fn scope<'scope, OP, R>(&self, op: OP) -> R |
39 where |
39 where |
40 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send, |
40 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send, |
55 INIT.call_once(|| unsafe { |
55 INIT.call_once(|| unsafe { |
56 N_THREADS = n; |
56 N_THREADS = n; |
57 let n = n.get(); |
57 let n = n.get(); |
58 set_task_overbudgeting((n + 1) / 2); |
58 set_task_overbudgeting((n + 1) / 2); |
59 POOL = if n > 1 { |
59 POOL = if n > 1 { |
60 #[cfg(use_custom_thread_pool)] { |
60 #[cfg(feature = "use_custom_thread_pool")] { |
61 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) |
61 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) |
62 } |
62 } |
63 #[cfg(not(use_custom_thread_pool))] { |
63 #[cfg(not(feature = "use_custom_thread_pool"))] { |
64 ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); |
64 ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); |
65 Some(GlobalPool) |
65 Some(GlobalPool) |
66 } |
66 } |
67 } else { |
67 } else { |
68 None |
68 None |
73 /// Set overbudgeting count for [`TaskBudget`]. |
73 /// Set overbudgeting count for [`TaskBudget`]. |
74 /// |
74 /// |
75 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where |
75 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where |
76 /// $n$ is the number of threads. |
76 /// $n$ is the number of threads. |
77 pub fn set_task_overbudgeting(m : usize) { |
77 pub fn set_task_overbudgeting(m : usize) { |
|
78 #[allow(static_mut_refs)] |
78 unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } |
79 unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } |
79 } |
80 } |
80 |
81 |
81 /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. |
82 /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. |
82 /// |
83 /// |
95 /// Get the thread pool. |
96 /// Get the thread pool. |
96 /// |
97 /// |
97 /// If the number of configured threads is less than 2, this is None. |
98 /// If the number of configured threads is less than 2, this is None. |
98 /// The pool has [`num_threads`]` - 1` threads. |
99 /// The pool has [`num_threads`]` - 1` threads. |
99 pub fn thread_pool() -> Option<&'static Pool> { |
100 pub fn thread_pool() -> Option<&'static Pool> { |
|
101 #[allow(static_mut_refs)] |
100 unsafe { POOL.as_ref() } |
102 unsafe { POOL.as_ref() } |
101 } |
103 } |
102 |
104 |
103 /// Get the number of thread pool workers. |
105 /// Get the number of thread pool workers. |
104 /// |
106 /// |
144 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] |
146 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] |
145 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` |
147 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` |
146 /// is `None`, the [global setting][set_task_overbudgeting] is used.§ |
148 /// is `None`, the [global setting][set_task_overbudgeting] is used.§ |
147 pub fn init(overbudget : Option<usize>) -> Self { |
149 pub fn init(overbudget : Option<usize>) -> Self { |
148 let n = num_threads().get(); |
150 let n = num_threads().get(); |
|
151 #[allow(static_mut_refs)] |
149 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); |
152 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); |
150 if n <= 1 { |
153 if n <= 1 { |
151 Self::SingleThreaded |
154 Self::SingleThreaded |
152 } else if let Some(pool) = thread_pool() { |
155 } else if let Some(pool) = thread_pool() { |
153 let budget = AtomicUsize::new(n + m); |
156 let budget = AtomicUsize::new(n + m); |