src/parallelism.rs

branch
dev
changeset 45
ad1f3705c3fc
parent 8
4e09b7829b51
equal deleted inserted replaced
44:8de8a80852c2 45:ad1f3705c3fc
14 use std::sync::atomic::{ 14 use std::sync::atomic::{
15 AtomicUsize, 15 AtomicUsize,
16 Ordering::{Release, Relaxed}, 16 Ordering::{Release, Relaxed},
17 }; 17 };
18 18
19 #[cfg(use_custom_thread_pool)] 19 #[cfg(feature = "use_custom_thread_pool")]
20 type Pool = ThreadPool; 20 type Pool = ThreadPool;
21 #[cfg(not(use_custom_thread_pool))] 21 #[cfg(not(feature = "use_custom_thread_pool"))]
22 type Pool = GlobalPool; 22 type Pool = GlobalPool;
23 23
24 const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; 24 const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
25 static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); 25 static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1);
26 static mut N_THREADS : NonZeroUsize = ONE; 26 static mut N_THREADS : NonZeroUsize = ONE;
27 static mut POOL : Option<Pool> = None; 27 static mut POOL : Option<Pool> = None;
28 static INIT: Once = Once::new(); 28 static INIT: Once = Once::new();
29 29
30 #[cfg(not(use_custom_thread_pool))] 30 #[cfg(not(feature = "use_custom_thread_pool"))]
31 mod global_pool { 31 mod global_pool {
32 /// This is a nicer way to use the global pool of [`rayon`]. 32 /// This is a nicer way to use the global pool of [`rayon`].
33 pub struct GlobalPool; 33 pub struct GlobalPool;
34 34
35 #[cfg(not(use_custom_thread_pool))] 35 #[cfg(not(feature = "use_custom_thread_pool"))]
36 impl GlobalPool { 36 impl GlobalPool {
37 #[inline] 37 #[inline]
38 pub fn scope<'scope, OP, R>(&self, op: OP) -> R 38 pub fn scope<'scope, OP, R>(&self, op: OP) -> R
39 where 39 where
40 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send, 40 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
42 rayon::scope(op) 42 rayon::scope(op)
43 } 43 }
44 } 44 }
45 } 45 }
46 46
47 #[cfg(not(use_custom_thread_pool))] 47 #[cfg(not(feature = "use_custom_thread_pool"))]
48 pub use global_pool::GlobalPool; 48 pub use global_pool::GlobalPool;
49 49
50 /// Set the number of threads. 50 /// Set the number of threads.
51 /// 51 ///
52 /// This routine can only be called once. 52 /// This routine can only be called once.
55 INIT.call_once(|| unsafe { 55 INIT.call_once(|| unsafe {
56 N_THREADS = n; 56 N_THREADS = n;
57 let n = n.get(); 57 let n = n.get();
58 set_task_overbudgeting((n + 1) / 2); 58 set_task_overbudgeting((n + 1) / 2);
59 POOL = if n > 1 { 59 POOL = if n > 1 {
60 #[cfg(use_custom_thread_pool)] { 60 #[cfg(feature = "use_custom_thread_pool")] {
61 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) 61 Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap())
62 } 62 }
63 #[cfg(not(use_custom_thread_pool))] { 63 #[cfg(not(feature = "use_custom_thread_pool"))] {
64 ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); 64 ThreadPoolBuilder::new().num_threads(n).build_global().unwrap();
65 Some(GlobalPool) 65 Some(GlobalPool)
66 } 66 }
67 } else { 67 } else {
68 None 68 None
73 /// Set overbudgeting count for [`TaskBudget`]. 73 /// Set overbudgeting count for [`TaskBudget`].
74 /// 74 ///
75 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where 75 /// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where
76 /// $n$ is the number of threads. 76 /// $n$ is the number of threads.
77 pub fn set_task_overbudgeting(m : usize) { 77 pub fn set_task_overbudgeting(m : usize) {
78 #[allow(static_mut_refs)]
78 unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } 79 unsafe { TASK_OVERBUDGETING.store(m, Relaxed) }
79 } 80 }
80 81
81 /// Set the number of threads to the minimum of `n` and [`available_parallelism`]. 82 /// Set the number of threads to the minimum of `n` and [`available_parallelism`].
82 /// 83 ///
95 /// Get the thread pool. 96 /// Get the thread pool.
96 /// 97 ///
97 /// If the number of configured threads is less than 2, this is None. 98 /// If the number of configured threads is less than 2, this is None.
98 /// The pool has [`num_threads`]` - 1` threads. 99 /// The pool has [`num_threads`]` - 1` threads.
99 pub fn thread_pool() -> Option<&'static Pool> { 100 pub fn thread_pool() -> Option<&'static Pool> {
101 #[allow(static_mut_refs)]
100 unsafe { POOL.as_ref() } 102 unsafe { POOL.as_ref() }
101 } 103 }
102 104
103 /// Get the number of thread pool workers. 105 /// Get the number of thread pool workers.
104 /// 106 ///
144 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] 146 /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom]
145 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` 147 /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget`
146 /// is `None`, the [global setting][set_task_overbudgeting] is used.§ 148 /// is `None`, the [global setting][set_task_overbudgeting] is used.§
147 pub fn init(overbudget : Option<usize>) -> Self { 149 pub fn init(overbudget : Option<usize>) -> Self {
148 let n = num_threads().get(); 150 let n = num_threads().get();
151 #[allow(static_mut_refs)]
149 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); 152 let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) });
150 if n <= 1 { 153 if n <= 1 {
151 Self::SingleThreaded 154 Self::SingleThreaded
152 } else if let Some(pool) = thread_pool() { 155 } else if let Some(pool) = thread_pool() {
153 let budget = AtomicUsize::new(n + m); 156 let budget = AtomicUsize::new(n + m);

mercurial