# HG changeset patch # User Tuomo Valkonen # Date 1667287485 -7200 # Node ID 4e09b7829b5165f80415825202fc222ea1dd633f # Parent 860a54fca7bc29f800f4b9197bc3192481e726e6 Multithreaded bisection tree operations diff -r 860a54fca7bc -r 4e09b7829b51 .hgtags --- a/.hgtags Wed Oct 26 22:16:57 2022 +0300 +++ b/.hgtags Tue Nov 01 09:24:45 2022 +0200 @@ -1,1 +1,2 @@ d80b87b8acd08e71cabdfad7ad0429934b12ab95 unthreaded +f4eca695149f28f78055e2dabfb35905e98591ee non-scoped-threading diff -r 860a54fca7bc -r 4e09b7829b51 Cargo.lock --- a/Cargo.lock Wed Oct 26 22:16:57 2022 +0300 +++ b/Cargo.lock Tue Nov 01 09:24:45 2022 +0200 @@ -14,6 +14,7 @@ "num", "num-traits", "numeric_literals", + "rayon", "serde", "serde_json", "trait-set", @@ -64,6 +65,12 @@ checksum = "2f5715e491b5a1598fc2bef5a606847b5dc1d48ea625bd3c02c00de8285591da" [[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] name = "colored" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -85,6 +92,49 @@ ] [[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f916dfc5d356b0ed9dae65f1db9fc9770aa2851d2662b988ccf4fe3516e86348" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edbafec5fa1f196ca66527c1b12c2ec4745ca14b50f1ad8f9f6f720b55d11fac" +dependencies = [ + "cfg-if", +] + +[[package]] name = "csv" version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -170,6 +220,15 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg", +] + +[[package]] name = "nalgebra" version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -273,6 +332,16 @@ ] [[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] name = "numeric_literals" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -313,6 +382,30 @@ checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" [[package]] +name = "rayon" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + +[[package]] name = "regex-automata" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -334,6 +427,12 @@ ] [[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] name = "serde" version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" diff -r 860a54fca7bc -r 4e09b7829b51 Cargo.toml --- a/Cargo.toml Wed Oct 26 22:16:57 2022 +0300 +++ b/Cargo.toml Tue Nov 01 09:24:45 2022 +0200 @@ -16,6 +16,7 @@ numeric_literals = "~0.2.0" cpu-time = "~1.0.0" serde_json = "~1.0.85" +rayon = "1.5.3" [package.metadata.docs.rs] rustdoc-args = [ "--html-in-header", "katex-header.html" ] diff -r 860a54fca7bc -r 4e09b7829b51 src/bisection_tree/aggregator.rs --- a/src/bisection_tree/aggregator.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/bisection_tree/aggregator.rs Tue Nov 01 09:24:45 2022 +0200 @@ -18,7 +18,7 @@ /// of a function on a greater domain from bounds on subdomains /// (in practise [`Cube`][crate::sets::Cube]s). /// -pub trait Aggregator : Clone + std::fmt::Debug { +pub trait Aggregator : Clone + Sync + Send + 'static + std::fmt::Debug { /// Aggregate a new data to current state. fn aggregate(&mut self, aggregates : I) where I : Iterator; @@ -36,8 +36,6 @@ pub struct NullAggregator; impl Aggregator for NullAggregator { - // TODO: these should probabably also take a Cube as argument to - // allow integrating etc. fn aggregate(&mut self, _aggregates : I) where I : Iterator {} @@ -75,6 +73,32 @@ let &Bounds(lower, upper) = self; lower.abs().max(upper.abs()) } + + /// Construct a bounds, making sure `lower` bound is less than `upper` + #[inline] + pub fn corrected(lower : F, upper : F) -> Self { + if lower <= upper { + Bounds(lower, upper) + } else { + Bounds(upper, lower) + } + } + + /// Refine the lower bound + #[inline] + pub fn refine_lower(&self, lower : F) -> Self { + let &Bounds(l, u) = self; + debug_assert!(l <= u); + Bounds(l.max(lower), u.max(lower)) + } + + /// Refine the lower bound + #[inline] + pub fn refine_upper(&self, upper : F) -> Self { + let &Bounds(l, u) = self; + debug_assert!(l <= u); + Bounds(l.min(upper), u.min(upper)) + } } impl<'a, F : Float> std::ops::Add for Bounds { @@ -143,6 +167,7 @@ debug_assert!(l1 <= u1 && l2 <= u2); let l = l1.max(l2); let u = u1.min(u2); + debug_assert!(l <= u); if l < u { Some(Bounds(l, u)) } else { diff -r 860a54fca7bc -r 4e09b7829b51 src/bisection_tree/bt.rs --- a/src/bisection_tree/bt.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/bisection_tree/bt.rs Tue Nov 01 09:24:45 2022 +0200 @@ -3,15 +3,15 @@ Bisection tree basics, [`BT`] type and the [`BTImpl`] trait. */ -use std::slice::{Iter,IterMut}; +use std::slice::IterMut; use std::iter::once; -use std::rc::Rc; +use std::sync::Arc; use serde::{Serialize, Deserialize}; pub(super) use nalgebra::Const; use itertools::izip; -use crate::iter::{MapF,Mappable}; use crate::types::{Float, Num}; +use crate::parallelism::{with_task_budget, TaskBudget}; use crate::coefficients::pow; use crate::maputil::{ array_init, @@ -35,15 +35,15 @@ Uninitialised, /// Indicates a leaf node containing a copy-on-write reference-counted vector /// of data of type `D`. - Leaf(Rc>), + Leaf(Vec), /// Indicates a branch node, cotaning a copy-on-write reference to the [`Branches`]. - Branches(Rc>), + Branches(Arc>), } /// Node of a [`BT`] bisection tree. /// /// For the type and const parameteres, see the [module level documentation][super]. -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub struct Node { /// The data or branches under the node. pub(super) data : NodeOption, @@ -54,7 +54,7 @@ /// Branching information of a [`Node`] of a [`BT`] bisection tree into `P` subnodes. /// /// For the type and const parameters, see the [module level documentation][super]. -#[derive(Clone,Debug)] +#[derive(Clone, Debug)] pub(super) struct Branches { /// Point for subdivision of the (unstored) [`Cube`] corresponding to the node. pub(super) branch_at : Loc, @@ -68,10 +68,11 @@ fn drop(&mut self) { use NodeOption as NO; - let process = |brc : Rc>, - to_drop : &mut Vec>>| { + let process = |brc : Arc>, + to_drop : &mut Vec>>| { // We only drop Branches if we have the only strong reference. - Rc::try_unwrap(brc).ok().map(|branches| branches.nodes.map(|mut node| { + // FIXME: update the RwLocks on Nodes. + Arc::try_unwrap(brc).ok().map(|branches| branches.nodes.map(|mut node| { if let NO::Branches(brc2) = std::mem::replace(&mut node.data, NO::Uninitialised) { to_drop.push(brc2) } @@ -81,7 +82,7 @@ // We mark Self as NodeOption::Uninitialised, extracting the real contents. // If we have subprocess, we need to process them. if let NO::Branches(brc1) = std::mem::replace(&mut self.data, NO::Uninitialised) { - // We store a queue of Rc to drop into a vector + // We store a queue of Arc to drop into a vector let mut to_drop = Vec::new(); process(brc1, &mut to_drop); @@ -97,13 +98,18 @@ /// Trait for the depth of a [`BT`]. /// /// This will generally be either a runtime [`DynamicDepth`] or compile-time [`Const`] depth. -pub trait Depth : 'static + Copy + std::fmt::Debug { +pub trait Depth : 'static + Copy + Send + Sync + std::fmt::Debug { /// Lower depth type. type Lower : Depth; + /// Returns a lower depth, if there still is one. fn lower(&self) -> Option; + /// Returns a lower depth or self if this is the lowest depth. fn lower_or(&self) -> Self::Lower; + + /// Returns the numeric value of the depth + fn value(&self) -> usize; } /// Dynamic (runtime) [`Depth`] for a [`BT`]. @@ -123,16 +129,23 @@ None } } + #[inline] fn lower_or(&self) -> Self { DynamicDepth(if self.0>0 { self.0 - 1 } else { 0 }) } + + #[inline] + fn value(&self) -> usize { + self.0 as usize + } } impl Depth for Const<0> { type Lower = Self; fn lower(&self) -> Option { None } fn lower_or(&self) -> Self::Lower { Const } + fn value(&self) -> usize { 0 } } macro_rules! impl_constdepth { @@ -141,6 +154,7 @@ type Lower = Const<{$n-1}>; fn lower(&self) -> Option { Some(Const) } fn lower_or(&self) -> Self::Lower { Const } + fn value(&self) -> usize { $n } } )* }; } @@ -183,19 +197,6 @@ } } -/// An iterator over the data `D` in a [`BT`]. -pub struct BTIter<'a, D> { - iter : Iter<'a, D>, -} - -impl<'a, D> Iterator for BTIter<'a,D> { - type Item = &'a D; - #[inline] - fn next(&mut self) -> Option<&'a D> { - self.iter.next() - } -} - /// An iterator over the $P=2^N$ subcubes of a [`Cube`] subdivided at a point `d`. pub(super) struct SubcubeIter<'b, F : Float, const N : usize, const P : usize> { domain : &'b Cube, @@ -234,10 +235,11 @@ } } -impl +impl Branches where Const

: BranchCount, - A : Aggregator { + A : Aggregator, + D : 'static + Copy + Send + Sync { /// Creates a new node branching structure, subdividing `domain` based on the /// [hint][Support::support_hint] of `support`. @@ -255,10 +257,10 @@ } } - /// Returns an iterator over the aggregators of the nodes directly under this branch head. - #[inline] - pub(super) fn aggregators(&self) -> MapF>, &'_ A> { - self.nodes.iter().mapF(Node::get_aggregator) + /// Summarises the aggregators of these branches into `agg` + pub(super) fn summarise_into(&self, agg : &mut A) { + // We need to create an array of the aggregators clones due to the RwLock. + agg.summarise(self.nodes.iter().map(Node::get_aggregator)); } /// Returns an iterator over the subcubes of `domain` subdivided at the branching point @@ -290,6 +292,26 @@ self.nodes.iter_mut().zip(subcube_iter) } + /// Call `f` on all `(subnode, subcube)` pairs in multiple threads, if `guard` so deems. + #[inline] + fn recurse<'scope, 'smaller, 'refs>( + &'smaller mut self, + domain : &'smaller Cube, + task_budget : TaskBudget<'scope, 'refs>, + guard : impl Fn(&Node, &Cube) -> bool + Send + 'smaller, + mut f : impl for<'a> FnMut(&mut Node, &Cube, TaskBudget<'smaller, 'a>) + + Send + Copy + 'smaller + ) where 'scope : 'smaller { + let subs = self.nodes_and_cubes_mut(domain); + task_budget.zoom(move |s| { + for (node, subcube) in subs { + if guard(node, &subcube) { + s.execute(move |new_budget| f(node, &subcube, new_budget)) + } + } + }); + } + /// Insert data into the branch. /// /// The parameters are as follows: @@ -299,24 +321,19 @@ /// * `support` is the [`Support`] that is used determine with which subcubes of `domain` /// (at subdivision depth `new_leaf_depth`) the data `d` is to be associated with. /// - pub(super) fn insert>( + pub(super) fn insert<'refs, 'scope, M : Depth, S : LocalAnalysis>( &mut self, domain : &Cube, d : D, new_leaf_depth : M, - support : &S + support : &S, + task_budget : TaskBudget<'scope, 'refs>, ) { let support_hint = support.support_hint(); - for (node, subcube) in self.nodes_and_cubes_mut(&domain) { - if support_hint.intersects(&subcube) { - node.insert( - &subcube, - d, - new_leaf_depth, - support - ); - } - } + self.recurse(domain, task_budget, + |_, subcube| support_hint.intersects(&subcube), + move |node, subcube, new_budget| node.insert(subcube, d, new_leaf_depth, support, + new_budget)); } /// Construct a new instance of the branch for a different aggregator. @@ -336,8 +353,7 @@ let branch_at = self.branch_at; let subcube_iter = self.iter_subcubes(domain); let new_nodes = self.nodes.into_iter().zip(subcube_iter).map(|(node, subcube)| { - // TODO: avoid clone - node.convert_aggregator(generator, &subcube) + Node::convert_aggregator(node, generator, &subcube) }); Branches { branch_at : branch_at, @@ -349,22 +365,25 @@ /// /// The `generator` is used to convert the data of type `D` of the branch into corresponding /// [`Support`]s. The `domain` is the cube corresponding to `self`. - pub(super) fn refresh_aggregator( + pub(super) fn refresh_aggregator<'refs, 'scope, G>( &mut self, generator : &G, - domain : &Cube + domain : &Cube, + task_budget : TaskBudget<'scope, 'refs>, ) where G : SupportGenerator, G::SupportType : LocalAnalysis { - for (node, subcube) in self.nodes_and_cubes_mut(domain) { - node.refresh_aggregator(generator, &subcube) - } + self.recurse(domain, task_budget, + |_, _| true, + move |node, subcube, new_budget| node.refresh_aggregator(generator, subcube, + new_budget)); } } -impl +impl Node where Const

: BranchCount, - A : Aggregator { + A : Aggregator, + D : 'static + Copy + Send + Sync { /// Create a new node #[inline] @@ -375,6 +394,7 @@ } } + /* /// Get leaf data #[inline] pub(super) fn get_leaf_data(&self, x : &Loc) -> Option<&Vec> { @@ -383,6 +403,16 @@ NodeOption::Leaf(ref data) => Some(data), NodeOption::Branches(ref b) => b.get_node(x).get_leaf_data(x), } + }*/ + + /// Get leaf data iterator + #[inline] + pub(super) fn get_leaf_data_iter(&self, x : &Loc) -> Option> { + match self.data { + NodeOption::Uninitialised => None, + NodeOption::Leaf(ref data) => Some(data.iter()), + NodeOption::Branches(ref b) => b.get_node(x).get_leaf_data_iter(x), + } } /// Returns a reference to the aggregator of this node @@ -404,12 +434,13 @@ /// If `self` is a [`NodeOption::Branches`], the data is passed to branches whose subcubes /// `support` intersects. If an [`NodeOption::Uninitialised`] node is encountered, a new leaf is /// created at a minimum depth of `new_leaf_depth`. - pub(super) fn insert>( + pub(super) fn insert<'refs, 'scope, M : Depth, S : LocalAnalysis >( &mut self, domain : &Cube, d : D, new_leaf_depth : M, - support : &S + support : &S, + task_budget : TaskBudget<'scope, 'refs>, ) { match &mut self.data { NodeOption::Uninitialised => { @@ -422,27 +453,30 @@ // should add capacity as a parameter let mut vec = Vec::with_capacity(2*P+1); vec.push(d); - NodeOption::Leaf(Rc::new(vec)) + NodeOption::Leaf(vec) }, Some(lower) => { - let b = Rc::new({ + let b = Arc::new({ let mut b0 = Branches::new_with(domain, support); - b0.insert(domain, d, lower, support); + b0.insert(domain, d, lower, support, task_budget); b0 }); - self.aggregator.summarise(b.aggregators()); + b.summarise_into(&mut self.aggregator); NodeOption::Branches(b) } } }, NodeOption::Leaf(leaf) => { - Rc::make_mut(leaf).push(d); + leaf.push(d); let a = support.local_analysis(&domain); self.aggregator.aggregate(once(a)); }, NodeOption::Branches(b) => { - Rc::make_mut(b).insert(domain, d, new_leaf_depth.lower_or(), support); - self.aggregator.summarise(b.aggregators()); + // FIXME: recursion that may cause stack overflow if the tree becomes + // very deep, e.g. due to [`BTSearch::search_and_refine`]. + Arc::make_mut(b).insert(domain, d, new_leaf_depth.lower_or(), support, + task_budget); + b.summarise_into(&mut self.aggregator) }, } } @@ -481,10 +515,11 @@ } }, NodeOption::Branches(b) => { - // TODO: now with Rc, convert_aggregator should be reference-based. - let bnew = Rc::new(Rc::unwrap_or_clone(b).convert_aggregator(generator, domain)); + // FIXME: recursion that may cause stack overflow if the tree becomes + // very deep, e.g. due to [`BTSearch::search_and_refine`]. + let bnew = Arc::new(Arc::unwrap_or_clone(b).convert_aggregator(generator, domain)); let mut anew = ANew::new(); - anew.summarise(bnew.aggregators()); + bnew.summarise_into(&mut anew); Node { data : NodeOption::Branches(bnew), aggregator : anew, @@ -497,10 +532,11 @@ /// /// The `generator` is used to convert the data of type `D` of the node into corresponding /// [`Support`]s. The `domain` is the cube corresponding to `self`. - pub(super) fn refresh_aggregator( + pub(super) fn refresh_aggregator<'refs, 'scope, G>( &mut self, generator : &G, - domain : &Cube + domain : &Cube, + task_budget : TaskBudget<'scope, 'refs>, ) where G : SupportGenerator, G::SupportType : LocalAnalysis { match &mut self.data { @@ -513,9 +549,10 @@ })); }, NodeOption::Branches(ref mut b) => { - // TODO: now with Rc, convert_aggregator should be reference-based. - Rc::make_mut(b).refresh_aggregator(generator, domain); - self.aggregator.summarise(b.aggregators()); + // FIXME: recursion that may cause stack overflow if the tree becomes + // very deep, e.g. due to [`BTSearch::search_and_refine`]. + Arc::make_mut(b).refresh_aggregator(generator, domain, task_budget); + b.summarise_into(&mut self.aggregator); } } } @@ -544,7 +581,7 @@ /// Further routines are provided by the [`BTSearch`][super::refine::BTSearch] trait. pub trait BTImpl : std::fmt::Debug + Clone + GlobalAnalysis { /// The data type stored in the tree - type Data : 'static + Copy; + type Data : 'static + Copy + Send + Sync; /// The depth type of the tree type Depth : Depth; /// The type for the [aggregate information][Aggregator] about the `Data` stored in each node @@ -582,8 +619,13 @@ where G : SupportGenerator, G::SupportType : LocalAnalysis; - /// Iterarate all [`Self::Data`] items at the point `x` of the domain. - fn iter_at<'a>(&'a self, x : &'a Loc) -> BTIter<'a, Self::Data>; + /// Returns an iterator over all [`Self::Data`] items at the point `x` of the domain. + fn iter_at(&self, x : &Loc) -> std::slice::Iter<'_, Self::Data>; + + /* + /// Returns all [`Self::Data`] items at the point `x` of the domain. + fn data_at(&self, x : &Loc) -> Arc>; + */ /// Create a new tree on `domain` of indicated `depth`. fn new(domain : Cube, depth : Self::Depth) -> Self; @@ -614,7 +656,7 @@ ($($n:literal)*) => { $( impl BTNode for BTNodeLookup where F : Float, - D : 'static + Copy + std::fmt::Debug, + D : 'static + Copy + Send + Sync + std::fmt::Debug, A : Aggregator { type Node = Node; } @@ -622,7 +664,7 @@ impl BTImpl for BT where M : Depth, F : Float, - D : 'static + Copy + std::fmt::Debug, + D : 'static + Copy + Send + Sync + std::fmt::Debug, A : Aggregator { type Data = D; type Depth = M; @@ -634,12 +676,15 @@ d : D, support : &S ) { - self.topnode.insert( - &self.domain, - d, - self.depth, - support - ); + with_task_budget(|task_budget| + self.topnode.insert( + &self.domain, + d, + self.depth, + support, + task_budget + ) + ) } fn convert_aggregator(self, generator : &G) -> Self::Converted @@ -647,6 +692,7 @@ G : SupportGenerator, G::SupportType : LocalAnalysis { let topnode = self.topnode.convert_aggregator(generator, &self.domain); + BT { depth : self.depth, domain : self.domain, @@ -657,14 +703,17 @@ fn refresh_aggregator(&mut self, generator : &G) where G : SupportGenerator, G::SupportType : LocalAnalysis { - self.topnode.refresh_aggregator(generator, &self.domain); + with_task_budget(|task_budget| + self.topnode.refresh_aggregator(generator, &self.domain, task_budget) + ) } - fn iter_at<'a>(&'a self, x : &'a Loc) -> BTIter<'a,D> { - match self.topnode.get_leaf_data(x) { - Some(data) => BTIter { iter : data.iter() }, - None => BTIter { iter : [].iter() } - } + /*fn data_at(&self, x : &Loc) -> Arc> { + self.topnode.get_leaf_data(x).unwrap_or_else(|| Arc::new(Vec::new())) + }*/ + + fn iter_at(&self, x : &Loc) -> std::slice::Iter<'_, D> { + self.topnode.get_leaf_data_iter(x).unwrap_or_else(|| [].iter()) } fn new(domain : Cube, depth : M) -> Self { @@ -679,7 +728,7 @@ impl GlobalAnalysis for BT where M : Depth, F : Float, - D : 'static + Copy + std::fmt::Debug, + D : 'static + Copy + Send + Sync + std::fmt::Debug, A : Aggregator { fn global_analysis(&self) -> A { self.topnode.get_aggregator().clone() diff -r 860a54fca7bc -r 4e09b7829b51 src/bisection_tree/btfn.rs --- a/src/bisection_tree/btfn.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/bisection_tree/btfn.rs Tue Nov 01 09:24:45 2022 +0200 @@ -2,6 +2,7 @@ use numeric_literals::replace_float_literals; use std::iter::Sum; use std::marker::PhantomData; +use std::sync::Arc; use crate::types::Float; use crate::mapping::Mapping; use crate::linops::Linear; @@ -34,7 +35,7 @@ const N : usize > /*where G::SupportType : LocalAnalysis*/ { bt : BT, - generator : G, + generator : Arc, _phantoms : PhantomData, } @@ -52,6 +53,10 @@ /// /// See the documentation for [`BTFN`] on the role of the `generator`. pub fn new(bt : BT, generator : G) -> Self { + Self::new_arc(bt, Arc::new(generator)) + } + + fn new_arc(bt : BT, generator : Arc) -> Self { BTFN { bt : bt, generator : generator, @@ -81,11 +86,15 @@ /// /// See the documentation for [`BTFN`] on the role of the `generator`. pub fn construct(domain : Cube, depth : BT::Depth, generator : G) -> Self { + Self::construct_arc(domain, depth, Arc::new(generator)) + } + + fn construct_arc(domain : Cube, depth : BT::Depth, generator : Arc) -> Self { let mut bt = BT::new(domain, depth); for (d, support) in generator.all_data() { bt.insert(d, &support); } - Self::new(bt, generator) + Self::new_arc(bt, generator) } /// Convert the aggregator of the [`BTFN`] to a different one. @@ -96,7 +105,7 @@ where ANew : Aggregator, G : SupportGenerator, G::SupportType : LocalAnalysis { - BTFN::new(self.bt.convert_aggregator(&self.generator), self.generator) + BTFN::new_arc(self.bt.convert_aggregator(&*self.generator), self.generator) } /// Change the generator (after, e.g., a scaling of the latter). @@ -106,7 +115,7 @@ /// Refresh aggregator after updates to generator fn refresh_aggregator(&mut self) { - self.bt.refresh_aggregator(&self.generator); + self.bt.refresh_aggregator(&*self.generator); } } @@ -122,7 +131,7 @@ BTNew : BTImpl, > (self, domain : Cube, depth : BTNew::Depth) -> BTFN where G::SupportType : LocalAnalysis { - BTFN::construct(domain, depth, self.generator) + BTFN::construct_arc(domain, depth, self.generator) } } @@ -139,7 +148,7 @@ pub fn new_pre(generator : G) -> Self { BTFN { bt : (), - generator : generator, + generator : Arc::new(generator), _phantoms : std::marker::PhantomData, } } @@ -152,12 +161,12 @@ BT : BTImpl { /// Helper function for implementing [`std::ops::Add`]. - fn add_another(self, other : G2) -> BTFN, BT, N> + fn add_another(&self, g2 : Arc) -> BTFN, BT, N> where G2 : SupportGenerator, G2::SupportType : LocalAnalysis { - let mut bt = self.bt; - let both = BothGenerators(self.generator, other); + let mut bt = self.bt.clone(); + let both = BothGenerators(Arc::clone(&self.generator), g2); for (d, support) in both.all_right_data() { bt.insert(d, &support); @@ -165,7 +174,7 @@ BTFN { bt : bt, - generator : both, + generator : Arc::new(both), _phantoms : std::marker::PhantomData, } } @@ -193,7 +202,7 @@ $lhs where BT1 : BTImpl, G1 : SupportGenerator + $($extra_trait)?, - G2 : SupportGenerator + Clone, + G2 : SupportGenerator, G1::SupportType : LocalAnalysis, G2::SupportType : LocalAnalysis { @@ -207,7 +216,7 @@ } make_btfn_add!(BTFN, std::convert::identity, ); -make_btfn_add!(&'a BTFN, Clone::clone, Clone); +make_btfn_add!(&'a BTFN, Clone::clone, ); macro_rules! make_btfn_sub { ($lhs:ty, $preprocess:path, $($extra_trait:ident)?) => { @@ -222,7 +231,11 @@ type Output = BTFN, BT1, N>; #[inline] fn sub(self, other : BTFN) -> Self::Output { - $preprocess(self).add_another(other.generator.neg()) + $preprocess(self).add_another(Arc::new( + Arc::try_unwrap(other.generator) + .unwrap_or_else(|arc| (*arc).clone()) + .neg() + )) } } @@ -239,14 +252,14 @@ type Output = BTFN, BT1, N>; #[inline] fn sub(self, other : &'b BTFN) -> Self::Output { - $preprocess(self).add_another(-&other.generator) + $preprocess(self).add_another(Arc::new((*other.generator).clone().neg())) } } } } make_btfn_sub!(BTFN, std::convert::identity, ); -make_btfn_sub!(&'a BTFN, Clone::clone, Clone); +make_btfn_sub!(&'a BTFN, std::convert::identity, ); macro_rules! make_btfn_scalarop_rhs { ($trait:ident, $fn:ident, $trait_assign:ident, $fn_assign:ident) => { @@ -258,7 +271,7 @@ G::SupportType : LocalAnalysis { #[inline] fn $fn_assign(&mut self, t : F) { - self.generator.$fn_assign(t); + Arc::make_mut(&mut self.generator).$fn_assign(t); self.refresh_aggregator(); } } @@ -272,7 +285,7 @@ type Output = Self; #[inline] fn $fn(mut self, t : F) -> Self::Output { - self.generator.$fn_assign(t); + Arc::make_mut(&mut self.generator).$fn_assign(t); self.refresh_aggregator(); self } @@ -308,7 +321,7 @@ type Output = BTFN<$f, G, BT, N>; #[inline] fn $fn(self, mut a : BTFN<$f, G, BT, N>) -> Self::Output { - a.generator.$fn_assign(self); + Arc::make_mut(&mut a.generator).$fn_assign(self); a.refresh_aggregator(); a } @@ -325,7 +338,7 @@ type Output = BTFN<$f, G, BT, N>; #[inline] fn $fn(self, a : &'a BTFN<$f, G, BT, N>) -> Self::Output { - let mut tmp = a.generator.clone(); + let mut tmp = (*a.generator).clone(); tmp.$fn_assign(self); a.new_generator(tmp) // FIXME: Prevented by the compiler overflow above. @@ -349,7 +362,7 @@ type Output = Self; #[inline] fn $fn(mut self) -> Self::Output { - self.generator = self.generator.$fn(); + self.generator = Arc::new(Arc::unwrap_or_clone(self.generator).$fn()); self.refresh_aggregator(); self } @@ -389,7 +402,8 @@ type Codomain = V; fn value(&self, x : &'a Loc) -> Self::Codomain { - self.bt.iter_at(x).map(|&d| self.generator.support_for(d).value(x)).sum() + self.bt.iter_at(x) + .map(|&d| self.generator.support_for(d).value(x)).sum() } } @@ -403,7 +417,8 @@ type Codomain = V; fn value(&self, x : Loc) -> Self::Codomain { - self.bt.iter_at(&x).map(|&d| self.generator.support_for(d).value(x)).sum() + self.bt.iter_at(&x) + .map(|&d| self.generator.support_for(d).value(x)).sum() } } @@ -562,19 +577,24 @@ generator : &G, step : usize ) -> RefinerResult, Self::Result> { + + if self.bound.map_or(false, |b| aggregator.upper() <= b + self.tolerance) { + // The upper bound is below the maximisation threshold. Don't bother with this cube. + return RefinerResult::Uncertain(*aggregator, None) + } + // g gives the negative of the value of the function presented by `data` and `generator`. let g = move |x : &Loc| { let f = move |&d| generator.support_for(d).value(x); -data.iter().map(f).sum::() }; // … so the negative of the minimum is the maximm we want. - let (x, neg_v) = cube.p2_minimise(g); - let v = -neg_v; + let (x, _neg_v) = cube.p2_minimise(g); + //let v = -neg_v; + let v = -g(&x); - if self.bound.map_or(false, |b| aggregator.upper() <= b + self.tolerance) { - // The upper bound is below the maximisation threshold. Don't bother with this cube. - RefinerResult::Uncertain(*aggregator, None) - } else if step < self.max_steps && (aggregator.upper() - v).abs() > self.tolerance { + if step < self.max_steps && (aggregator.upper() > v + self.tolerance + /*|| aggregator.lower() > v - self.tolerance*/) { // The function isn't refined enough in `cube`, so return None // to indicate that further subdivision is required. RefinerResult::NeedRefinement @@ -582,10 +602,18 @@ // The data is refined enough, so return new hopefully better bounds // and the maximiser. let res = (x, v); - let bounds = Bounds(aggregator.lower(), v); + let bounds = Bounds(v, v); RefinerResult::Uncertain(bounds, Some(res)) } } + + fn fuse_results(r1 : &mut Self::Result, r2 : Self::Result) { + match (*r1, r2) { + (Some((_, v1)), Some((_, v2))) => if v1 < v2 { *r1 = r2 } + (None, Some(_)) => *r1 = r2, + (_, _) => {}, + } + } } @@ -606,18 +634,23 @@ generator : &G, step : usize ) -> RefinerResult, Self::Result> { + + if self.bound.map_or(false, |b| aggregator.lower() >= b - self.tolerance) { + // The lower bound is above the minimisation threshold. Don't bother with this cube. + return RefinerResult::Uncertain(*aggregator, None) + } + // g gives the value of the function presented by `data` and `generator`. let g = move |x : &Loc| { let f = move |&d| generator.support_for(d).value(x); data.iter().map(f).sum::() }; // Minimise it. - let (x, v) = cube.p2_minimise(g); + let (x, _v) = cube.p2_minimise(g); + let v = g(&x); - if self.bound.map_or(false, |b| aggregator.lower() >= b - self.tolerance) { - // The lower bound is above the minimisation threshold. Don't bother with this cube. - RefinerResult::Uncertain(*aggregator, None) - } else if step < self.max_steps && (aggregator.lower() - v).abs() > self.tolerance { + if step < self.max_steps && (aggregator.lower() < v - self.tolerance + /*|| aggregator.upper() < v + self.tolerance*/) { // The function isn't refined enough in `cube`, so return None // to indicate that further subdivision is required. RefinerResult::NeedRefinement @@ -625,10 +658,18 @@ // The data is refined enough, so return new hopefully better bounds // and the minimiser. let res = (x, v); - let bounds = Bounds(v, aggregator.upper()); + let bounds = Bounds(v, v); RefinerResult::Uncertain(bounds, Some(res)) } } + + fn fuse_results(r1 : &mut Self::Result, r2 : Self::Result) { + match (*r1, r2) { + (Some((_, v1)), Some((_, v2))) => if v1 > v2 { *r1 = r2 } + (_, Some(_)) => *r1 = r2, + (_, _) => {}, + } + } } @@ -678,6 +719,10 @@ RefinerResult::Uncertain(*aggregator, false) } } + + fn fuse_results(r1 : &mut Self::Result, r2 : Self::Result) { + *r1 = *r1 && r2; + } } impl Refiner, G, N> @@ -708,6 +753,10 @@ RefinerResult::Uncertain(*aggregator, false) } } + + fn fuse_results(r1 : &mut Self::Result, r2 : Self::Result) { + *r1 = *r1 && r2; + } } impl BTFN @@ -723,7 +772,7 @@ /// Returns the approximate maximiser and the corresponding function value. pub fn maximise(&mut self, tolerance : F, max_steps : usize) -> (Loc, F) { let refiner = P2Refiner{ tolerance, max_steps, how : RefineMax, bound : None }; - self.bt.search_and_refine(&refiner, &self.generator).expect("Refiner failure.").unwrap() + self.bt.search_and_refine(refiner, &self.generator).expect("Refiner failure.").unwrap() } /// Maximise the `BTFN` within stated value `tolerance` subject to a lower bound. @@ -734,7 +783,7 @@ pub fn maximise_above(&mut self, bound : F, tolerance : F, max_steps : usize) -> Option<(Loc, F)> { let refiner = P2Refiner{ tolerance, max_steps, how : RefineMax, bound : Some(bound) }; - self.bt.search_and_refine(&refiner, &self.generator).expect("Refiner failure.") + self.bt.search_and_refine(refiner, &self.generator).expect("Refiner failure.") } /// Minimise the `BTFN` within stated value `tolerance`. @@ -743,7 +792,7 @@ /// Returns the approximate minimiser and the corresponding function value. pub fn minimise(&mut self, tolerance : F, max_steps : usize) -> (Loc, F) { let refiner = P2Refiner{ tolerance, max_steps, how : RefineMin, bound : None }; - self.bt.search_and_refine(&refiner, &self.generator).expect("Refiner failure.").unwrap() + self.bt.search_and_refine(refiner, &self.generator).expect("Refiner failure.").unwrap() } /// Minimise the `BTFN` within stated value `tolerance` subject to a lower bound. @@ -754,7 +803,7 @@ pub fn minimise_below(&mut self, bound : F, tolerance : F, max_steps : usize) -> Option<(Loc, F)> { let refiner = P2Refiner{ tolerance, max_steps, how : RefineMin, bound : Some(bound) }; - self.bt.search_and_refine(&refiner, &self.generator).expect("Refiner failure.") + self.bt.search_and_refine(refiner, &self.generator).expect("Refiner failure.") } /// Verify that the `BTFN` has a given upper `bound` within indicated `tolerance`. @@ -762,7 +811,7 @@ /// At most `max_steps` refinement steps are taken. pub fn has_upper_bound(&mut self, bound : F, tolerance : F, max_steps : usize) -> bool { let refiner = BoundRefiner{ bound, tolerance, max_steps, how : RefineMax }; - self.bt.search_and_refine(&refiner, &self.generator).expect("Refiner failure.") + self.bt.search_and_refine(refiner, &self.generator).expect("Refiner failure.") } /// Verify that the `BTFN` has a given lower `bound` within indicated `tolerance`. @@ -770,6 +819,6 @@ /// At most `max_steps` refinement steps are taken. pub fn has_lower_bound(&mut self, bound : F, tolerance : F, max_steps : usize) -> bool { let refiner = BoundRefiner{ bound, tolerance, max_steps, how : RefineMin }; - self.bt.search_and_refine(&refiner, &self.generator).expect("Refiner failure.") + self.bt.search_and_refine(refiner, &self.generator).expect("Refiner failure.") } } diff -r 860a54fca7bc -r 4e09b7829b51 src/bisection_tree/either.rs --- a/src/bisection_tree/either.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/bisection_tree/either.rs Tue Nov 01 09:24:45 2022 +0200 @@ -1,5 +1,6 @@ use std::iter::Chain; +use std::sync::Arc; use crate::types::*; use crate::mapping::Mapping; @@ -15,8 +16,8 @@ /// This is needed to work with sums of different types of [`Support`]s. #[derive(Debug,Clone)] pub struct BothGenerators( - pub(super) A, - pub(super) B, + pub(super) Arc, + pub(super) Arc, ); /// A structure for a [`Support`] that can be either `A` or `B`. @@ -194,12 +195,12 @@ impl std::ops::$trait_assign for BothGenerators - where G1 : std::ops::$trait_assign, - G2 : std::ops::$trait_assign, { + where G1 : std::ops::$trait_assign + Clone, + G2 : std::ops::$trait_assign + Clone, { #[inline] fn $fn_assign(&mut self, t : F) { - self.0.$fn_assign(t); - self.1.$fn_assign(t); + Arc::make_mut(&mut self.0).$fn_assign(t); + Arc::make_mut(&mut self.1).$fn_assign(t); } } @@ -211,7 +212,8 @@ type Output = BothGenerators; #[inline] fn $fn(self, t : F) -> BothGenerators { - BothGenerators(self.0.$fn(t), self.1.$fn(t)) + BothGenerators(Arc::new(self.0.$fn(t)), + Arc::new(self.1.$fn(t))) } } } @@ -221,11 +223,13 @@ make_either_scalarop_rhs!(Div, div, DivAssign, div_assign); impl std::ops::Neg for BothGenerators -where G1 : std::ops::Neg, G2 : std::ops::Neg, { +where G1 : std::ops::Neg + Clone, + G2 : std::ops::Neg + Clone, { type Output = BothGenerators; #[inline] fn neg(self) -> Self::Output { - BothGenerators(self.0.neg(), self.1.neg()) + BothGenerators(Arc::new(Arc::unwrap_or_clone(self.0).neg()), + Arc::new(Arc::unwrap_or_clone(self.1).neg())) } } /* diff -r 860a54fca7bc -r 4e09b7829b51 src/bisection_tree/refine.rs --- a/src/bisection_tree/refine.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/bisection_tree/refine.rs Tue Nov 01 09:24:45 2022 +0200 @@ -1,24 +1,26 @@ use std::collections::BinaryHeap; -use std::cmp::{PartialOrd,Ord,Ordering,Ordering::*,max}; -use std::rc::Rc; +use std::cmp::{PartialOrd, Ord, Ordering, Ordering::*, max}; use std::marker::PhantomData; +use std::sync::{Arc, Mutex, Condvar}; use crate::types::*; use crate::nanleast::NaNLeast; use crate::sets::Cube; +use crate::parallelism::{thread_pool_size, thread_pool}; use super::support::*; use super::bt::*; use super::aggregator::*; +use crate::parallelism::TaskBudget; /// Trait for sorting [`Aggregator`]s for [`BT`] refinement. /// /// The sorting involves two sorting keys, the “upper” and the “lower” key. Any [`BT`] nodes /// with upper key less the lower key of another are discarded from the refinement process. /// Nodes with the highest upper sorting key are picked for refinement. -pub trait AggregatorSorting { +pub trait AggregatorSorting : Sync + Send + 'static { // Priority type Agg : Aggregator; - type Sort : Ord + Copy + std::fmt::Debug; + type Sort : Ord + Copy + std::fmt::Debug + Sync + Send; /// Returns lower sorting key fn sort_lower(aggregator : &Self::Agg) -> Self::Sort; @@ -90,13 +92,13 @@ /// The `Refiner` is used to determine whether an [`Aggregator`] `A` stored in the [`BT`] is /// sufficiently refined within a [`Cube`], and in such a case, produce a desired result (e.g. /// a maximum value of a function). -pub trait Refiner +pub trait Refiner : Sync + Send + 'static where F : Num, A : Aggregator, G : SupportGenerator { /// The result type of the refiner - type Result : std::fmt::Debug; + type Result : std::fmt::Debug + Sync + Send + 'static; /// The sorting to be employed by [`BTSearch::search_and_refine`] on node aggregators /// to detemrine node priority. type Sorting : AggregatorSorting; @@ -128,12 +130,15 @@ generator : &G, step : usize, ) -> RefinerResult; + + /// Fuse two [`Self::Result`]s (needed in threaded refinement). + fn fuse_results(r1 : &mut Self::Result, r2 : Self::Result); } /// Structure for tracking the refinement process in a [`BinaryHeap`]. struct RefinementInfo<'a, F, D, A, S, RResult, const N : usize, const P : usize> where F : Float, - D : 'static +, + D : 'static, A : Aggregator, S : AggregatorSorting { cube : Cube, @@ -150,21 +155,21 @@ S : AggregatorSorting { #[inline] - fn aggregator(&self) -> &A { + fn with_aggregator(&self, f : impl FnOnce(&A) -> U) -> U { match self.refiner_info { - Some((ref agg, _)) => agg, - None => &self.node.aggregator, + Some((ref agg, _)) => f(agg), + None => f(&self.node.aggregator), } } #[inline] fn sort_lower(&self) -> S::Sort { - S::sort_lower(self.aggregator()) + self.with_aggregator(S::sort_lower) } #[inline] fn sort_upper(&self) -> S::Sort { - S::sort_upper(self.aggregator()) + self.with_aggregator(S::sort_upper) } } @@ -207,12 +212,12 @@ #[inline] fn cmp(&self, other : &Self) -> Ordering { - let agg1 = self.aggregator(); - let agg2 = other.aggregator(); - match S::sort_upper(agg1).cmp(&S::sort_upper(agg2)) { - Equal => S::sort_lower(agg1).cmp(&S::sort_lower(agg2)), - order => order, - } + self.with_aggregator(|agg1| other.with_aggregator(|agg2| { + match S::sort_upper(agg1).cmp(&S::sort_upper(agg2)) { + Equal => S::sort_lower(agg1).cmp(&S::sort_lower(agg2)), + order => order, + } + })) } } @@ -229,6 +234,9 @@ glb : S::Sort, glb_stale_counter : usize, stale_insert_counter : usize, + result : Option, + step : usize, + n_processing : usize } impl<'a, F, D, A, S, RResult, const N : usize, const P : usize> @@ -252,17 +260,19 @@ } } -impl +impl Branches where Const

: BranchCount, - A : Aggregator { + A : Aggregator, + D : 'static + Copy + Send + Sync { /// Stage all subnodes of `self` into the refinement queue [`container`]. fn stage_refine<'a, S, RResult>( &'a mut self, domain : Cube, - container : &mut HeapContainer<'a, F, D, A, S, RResult, N, P>, + container_arc : &Arc>>, ) where S : AggregatorSorting { + let mut container = container_arc.lock().unwrap(); // Insert all subnodes into the refinement heap. for (node, subcube) in self.nodes_and_cubes_mut(&domain) { container.push(RefinementInfo { @@ -276,10 +286,11 @@ } -impl +impl Node where Const

: BranchCount, - A : Aggregator { + A : Aggregator, + D : 'static + Copy + Send + Sync { /// If `self` is a leaf node, uses the `refiner` to determine whether further subdivision /// is required to get a sufficiently refined solution for the problem the refiner is used @@ -290,19 +301,17 @@ /// /// `domain`, as usual, indicates the spatial area corresponding to `self`. fn search_and_refine<'a, 'b, R, G>( - &'a mut self, + self : &'a mut Self, domain : Cube, refiner : &R, generator : &G, - container : &'b mut HeapContainer<'a, F, D, A, R::Sorting, R::Result, N, P>, + container_arc : &Arc>>, step : usize ) -> Option where R : Refiner, G : SupportGenerator, G::SupportType : LocalAnalysis { - // The “complex” repeated pattern matching here is forced by mutability requirements. - // Refine a leaf. let res = if let NodeOption::Leaf(ref v) = &mut self.data { let res = refiner.refine(&self.aggregator, &domain, v, generator, step); @@ -314,18 +323,18 @@ if let Some(&d) = it.next() { // Construct new Branches let support = generator.support_for(d); - let b = Rc::new({ + let b = Arc::new({ let mut b0 = Branches::new_with(&domain, &support); - b0.insert(&domain, d, Const::<1>, &support); + b0.insert(&domain, d, Const::<1>, &support, TaskBudget::none()); for &d in it { let support = generator.support_for(d); // TODO: can we be smarter than just refining one level? - b0.insert(&domain, d, Const::<1>, &support); + b0.insert(&domain, d, Const::<1>, &support, TaskBudget::none()); } b0 }); // Update current node - self.aggregator.summarise(b.aggregators()); + b.summarise_into(&mut self.aggregator); self.data = NodeOption::Branches(b); // The branches will be inserted into the refinement priority queue below. } @@ -340,6 +349,7 @@ // with the new refined aggregator and custom return value. It will be popped and // returned in the loop of [`BT::search_and_refine`] when there are no unrefined // candidates that could potentially be better according to their basic aggregator. + let mut container = container_arc.lock().unwrap(); container.push(RefinementInfo { cube : domain, node : self, @@ -352,11 +362,12 @@ Some(val) } else if let NodeOption::Branches(ref mut b) = &mut self.data { // Insert branches into refinement priority queue. - Rc::make_mut(b).stage_refine(domain, container); + Arc::make_mut(b).stage_refine(domain, container_arc); None } else { None } + } } @@ -378,14 +389,118 @@ /// The `generator` converts [`BTImpl::Data`] stored in the bisection tree into a [`Support`]. fn search_and_refine<'b, R, G>( &'b mut self, - refiner : &R, - generator : &G, + refiner : R, + generator : &Arc, ) -> Option - where R : Refiner, - G : SupportGenerator, + where R : Refiner + Sync + Send + 'static, + G : SupportGenerator + Sync + Send + 'static, G::SupportType : LocalAnalysis; } +fn refinement_loop ( + condvar : Option>, + refiner : &R, + generator_arc : &Arc, + container_arc : &Arc>>, +) where A : Aggregator, + R : Refiner, + G : SupportGenerator, + G::SupportType : LocalAnalysis, + Const

: BranchCount, + D : 'static + Copy + Sync + Send + std::fmt::Debug { + + let mut did_park = true; + + 'main: loop { + let (ri, step) = { + let mut container = container_arc.lock().unwrap(); + let ri = 'get_next: loop { + if did_park { + container.n_processing += 1; + did_park = false; + } + + // Some thread has found a result, return + if container.result.is_some() { + container.n_processing -= 1; + break 'main + } + + let ri = match container.heap.pop() { + Some(ri) => ri, + None => { + debug_assert!(container.n_processing > 0); + container.n_processing -= 1; + if container.n_processing == 0 { + break 'main; + } else if let Some(ref c) = condvar { + //eprintln!("Sleeping {t:?} {n} {worker_counter}\n", t=thread::current(), n=container.n_processing); + did_park = true; + container = c.wait(container).unwrap(); + continue 'get_next; + } else { + break 'main + } + } + }; + break ri; + }; + + let step = container.step; + container.step += 1; + + if let Some((_, result)) = ri.refiner_info { + // Terminate based on a “best possible” result. + container.result = Some(result); + container.n_processing -= 1; + break 'main + } + + if ri.sort_lower() >= container.glb { + container.glb_stale_counter += 1; + if container.stale_insert_counter + container.glb_stale_counter + > container.heap.len()/2 { + // GLB propery no longer correct. + match container.heap.iter().map(|ri| ri.sort_lower()).reduce(max) { + Some(glb) => { + container.glb = glb; + container.heap.retain(|ri| ri.sort_upper() >= glb); + }, + None => { + container.glb = R::Sorting::bottom() + } + } + container.glb_stale_counter = 0; + container.stale_insert_counter = 0; + } + } + + (ri, step) + }; + + let res = Node::search_and_refine(ri.node, ri.cube, refiner, &**generator_arc, + &container_arc, step); + if let Some(r) = res { + // Terminate based on a certain result from the refiner + let mut container = container_arc.lock().unwrap(); + if let &mut Some(ref mut r_prev) = &mut container.result { + R::fuse_results(r_prev, r); + } else { + container.result = Some(r); + } + break 'main + } + + if let Some(ref c) = condvar { + c.notify_one(); + } + } + + if let Some(ref c) = condvar { + c.notify_all(); + } +} + // Needed to get access to a Node without a trait interface. macro_rules! impl_btsearch { ($($n:literal)*) => { $( @@ -394,65 +509,60 @@ for BT where //Self : BTImpl, // <== automatically deduce to be implemented M : Depth, - F : Float, - A : 'a + Aggregator, - D : 'static + Copy + std::fmt::Debug { + F : Float + Send, + A : Aggregator, + D : 'static + Copy + Sync + Send + std::fmt::Debug { fn search_and_refine<'b, R, G>( &'b mut self, - refiner : &R, - generator : &G, + refiner : R, + generator : &Arc, ) -> Option where R : Refiner, G : SupportGenerator, G::SupportType : LocalAnalysis { - let mut container = HeapContainer { + let mut init_container /*: HeapContainer*/ + = HeapContainer { heap : BinaryHeap::new(), glb : R::Sorting::bottom(), glb_stale_counter : 0, stale_insert_counter : 0, + result : None, + step : 0, + n_processing : 0, }; - container.push(RefinementInfo { + init_container.push(RefinementInfo { cube : self.domain, node : &mut self.topnode, refiner_info : None, sorting : PhantomData, }); - let mut step = 0; - while let Some(ri) = container.heap.pop() { - if let Some((_, result)) = ri.refiner_info { - // Terminate based on a “best possible” result. - return Some(result) - } + // let n_workers = thread::available_parallelism() + // .map_or(1, core::num::NonZeroUsize::get); + let maybe_pool = thread_pool(); + let container_arc = Arc::new(Mutex::new(init_container)); + if let Some(pool) = maybe_pool { + let n = thread_pool_size(); + pool.scope(|s| { + let condvar = Arc::new(Condvar::new()); + for _ in 0..n{ + let refiner_ref = &refiner; + let container_t = Arc::clone(&container_arc); + let condvar_t = Arc::clone(&condvar); + s.spawn(move |_| { + refinement_loop(Some(condvar_t), refiner_ref, generator, + &container_t); + }); + } + refinement_loop(Some(condvar), &refiner, generator, &container_arc); + }); + } else { + refinement_loop(None, &refiner, generator, &container_arc); + } - if ri.sort_lower() >= container.glb { - container.glb_stale_counter += 1; - if container.stale_insert_counter + container.glb_stale_counter - > container.heap.len()/2 { - // GLB propery no longer correct. - match container.heap.iter().map(|ri| ri.sort_lower()).reduce(max) { - Some(glb) => { - container.glb = glb; - container.heap.retain(|ri| ri.sort_upper() >= glb); - }, - None => { - container.glb = R::Sorting::bottom() - } - } - container.glb_stale_counter = 0; - container.stale_insert_counter = 0; - } - } - - let res = ri.node.search_and_refine(ri.cube, refiner, generator, - &mut container, step); - if let Some(_) = res { - // Terminate based on a certain result from the refiner - return res - } - - step += 1; - } - None + Arc::try_unwrap(container_arc) + .map(|mtx| mtx.into_inner().unwrap().result) + .ok() + .flatten() } } )* } diff -r 860a54fca7bc -r 4e09b7829b51 src/bisection_tree/support.rs --- a/src/bisection_tree/support.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/bisection_tree/support.rs Tue Nov 01 09:24:45 2022 +0200 @@ -13,7 +13,7 @@ use crate::norms::{Norm, L1, L2, Linfinity}; /// A trait for encoding constant [`Float`] values -pub trait Constant : Copy + 'static + std::fmt::Debug + Into { +pub trait Constant : Copy + Sync + Send + 'static + std::fmt::Debug + Into { /// The type of the value type Type : Float; /// Returns the value of the constant @@ -30,7 +30,7 @@ /// A trait for working with the supports of [`Mapping`]s. /// /// Mapping is not a super-trait to allow more general use. -pub trait Support : Sized { +pub trait Support : Sized + Sync + Send + 'static { /// Return a cube containing the support of the function represented by `self`. /// /// The hint may be larger than the actual support, but must contain it. @@ -262,6 +262,7 @@ #[inline] fn global_analysis(&self) -> Bounds { let Bounds(lower, upper) = self.base_fn.global_analysis(); + debug_assert!(lower <= upper); match self.weight.value() { w if w < F::ZERO => Bounds(w * upper, w * lower), w => Bounds(w * lower, w * upper), @@ -275,6 +276,7 @@ #[inline] fn local_analysis(&self, cube : &Cube) -> Bounds { let Bounds(lower, upper) = self.base_fn.local_analysis(cube); + debug_assert!(lower <= upper); match self.weight.value() { w if w < F::ZERO => Bounds(w * upper, w * lower), w => Bounds(w * lower, w * upper), @@ -386,6 +388,7 @@ #[inline] fn global_analysis(&self) -> Bounds { let Bounds(lower, upper) = self.0.global_analysis(); + debug_assert!(lower <= upper); let w = self.0.norm(L1); debug_assert!(w >= F::ZERO); Bounds(w * lower, w * upper) @@ -397,6 +400,7 @@ #[inline] fn local_analysis(&self, cube : &Cube) -> Bounds { let Bounds(lower, upper) = self.0.local_analysis(cube); + debug_assert!(lower <= upper); let w = self.0.norm(L1); debug_assert!(w >= F::ZERO); Bounds(w * lower, w * upper) @@ -442,7 +446,7 @@ /// Generator of [`Support`]-implementing component functions based on low storage requirement /// [ids][`Self::Id`]. pub trait SupportGenerator -: MulAssign + DivAssign + Neg { +: MulAssign + DivAssign + Neg + Clone + Sync + Send + 'static { /// The identification type type Id : 'static + Copy; /// The type of the [`Support`] (often also a [`Mapping`]). diff -r 860a54fca7bc -r 4e09b7829b51 src/lib.rs --- a/src/lib.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/lib.rs Tue Nov 01 09:24:45 2022 +0200 @@ -26,6 +26,7 @@ pub mod types; pub mod nanleast; pub mod error; +pub mod parallelism; pub mod maputil; pub mod tuple; pub mod euclidean; diff -r 860a54fca7bc -r 4e09b7829b51 src/lingrid.rs --- a/src/lingrid.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/lingrid.rs Tue Nov 01 09:24:45 2022 +0200 @@ -1,13 +1,14 @@ /*! Linear grids. -These are multi-dimensional intervals $\prod_{i=1}^N [a_i, b_i]$ divided along each dimension -into n_i equally-spaced nodes, with $a_i$ the first node and $b_i$ the last node along each +These are multi-dimensional intervals $\prod\_{i=1}^N [a\_i, b\_i]$ divided along each dimension +into $n\_i$ equally-spaced nodes, with $a\_i$ the first node and $b\_i$ the last node along each dimension. The [`LinSpace`]s provided by this module are similar to [`num::range_step_inclusive`], but as an iterator they are [restartable][RestartableIterator] and parametrised by the number of nodes -instead of a step. This way it can be ensured that $a_i$ and $b_i$ are the last and the first node. +instead of a step. This way it can be ensured that $a\_i$ and $b\_i$ are the last and the first +node. The starting points for the use of this module are the [`linspace`], [`lingrid`], and [`lingrid_centered`] functions. They return a [`LinSpace`]s that implements [`IntoIterator`] for @@ -59,9 +60,9 @@ /// Create a multi-dimensional linear grid with centered nodes. /// /// There are `count` along each dimension and each node has equally-sized subcube surrounding it -/// inside `cube`. Thus, if $w_i$ is the width of the cube along dimension $i$, and $n_i$ the number -/// of nodes, the width of the subcube along this dimension is $h_i = w_i/(n_i+1)$, and the first -/// and last nodes are at a distance $h_i/2$ from the closest boundary. +/// inside `cube`. Thus, if $w\_i$ is the width of the cube along dimension $i$, and $n_i$ the number +/// of nodes, the width of the subcube along this dimension is $h_i = w\_i/(n\_i+1)$, and the first +/// and last nodes are at a distance $h\_i/2$ from the closest boundary. pub fn lingrid_centered( cube : &Cube, count : &[usize; N] diff -r 860a54fca7bc -r 4e09b7829b51 src/linops.rs --- a/src/linops.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/linops.rs Tue Nov 01 09:24:45 2022 +0200 @@ -84,8 +84,8 @@ /// Trait for forming a preadjoint of an operator. /// -/// For an operator $A$ this is an operator $A_*$ -/// such that its adjoint $(A_*)^*=A$. The space `X` is the domain of the `Self` +/// For an operator $A$ this is an operator $A\_\*$ +/// such that its adjoint $(A\_\*)^\*=A$. The space `X` is the domain of the `Self` /// operator. The space `Ypre` is the predual of its codomain, and should be the /// domain of the adjointed operator. `Self::Preadjoint` should be /// [`Adjointable`]`<'a,Ypre,X>`. diff -r 860a54fca7bc -r 4e09b7829b51 src/parallelism.rs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/parallelism.rs Tue Nov 01 09:24:45 2022 +0200 @@ -0,0 +1,220 @@ +/*! +Parallel computing helper routines. + +It mains the number of threads to use, a thread pool, and provides a budgeting tool +to limit the number of threads spawned in recursive computations. + +For actually spawning scoped tasks in a thread pool, it currently uses [`rayon`]. +*/ + +use std::sync::Once; +use std::num::NonZeroUsize; +use std::thread::available_parallelism; +pub use rayon::{Scope, ThreadPoolBuilder, ThreadPool}; +use std::sync::atomic::{ + AtomicUsize, + Ordering::{Release, Relaxed}, +}; + +#[cfg(use_custom_thread_pool)] +type Pool = ThreadPool; +#[cfg(not(use_custom_thread_pool))] +type Pool = GlobalPool; + +const ONE : NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; +static mut TASK_OVERBUDGETING : AtomicUsize = AtomicUsize::new(1); +static mut N_THREADS : NonZeroUsize = ONE; +static mut POOL : Option = None; +static INIT: Once = Once::new(); + +#[cfg(not(use_custom_thread_pool))] +mod global_pool { + /// This is a nicer way to use the global pool of [`rayon`]. + pub struct GlobalPool; + + #[cfg(not(use_custom_thread_pool))] + impl GlobalPool { + #[inline] + pub fn scope<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&rayon::Scope<'scope>) -> R + Send, + R: Send { + rayon::scope(op) + } + } +} + +#[cfg(not(use_custom_thread_pool))] +pub use global_pool::GlobalPool; + +/// Set the number of threads. +/// +/// This routine can only be called once. +/// It also calls [`set_task_overbudgeting`] with $m = (n + 1) / 2$. +pub fn set_num_threads(n : NonZeroUsize) { + INIT.call_once(|| unsafe { + N_THREADS = n; + let n = n.get(); + set_task_overbudgeting((n + 1) / 2); + POOL = if n > 1 { + #[cfg(use_custom_thread_pool)] { + Some(ThreadPoolBuilder::new().num_threads(n).build().unwrap()) + } + #[cfg(not(use_custom_thread_pool))] { + ThreadPoolBuilder::new().num_threads(n).build_global().unwrap(); + Some(GlobalPool) + } + } else { + None + } + }); +} + +/// Set overbudgeting count for [`TaskBudget`]. +/// +/// The initial value is 1. Calling [`set_num_threads`] sets this to $m = (n + 1) / 2$, where +/// $n$ is the number of threads. +pub fn set_task_overbudgeting(m : usize) { + unsafe { TASK_OVERBUDGETING.store(m, Relaxed) } +} + +/// Set the number of threads to the minimum of `n` and [`available_parallelism`]. +/// +/// This routine can only be called once. +pub fn set_max_threads(n : NonZeroUsize) { + let available = available_parallelism().unwrap_or(ONE); + set_num_threads(available.min(n)); +} + + +/// Get the number of threads +pub fn num_threads() -> NonZeroUsize { + unsafe { N_THREADS } +} + +/// Get the thread pool. +/// +/// If the number of configured threads is less than 2, this is None. +/// The pool has [`num_threads`]` - 1` threads. +pub fn thread_pool() -> Option<&'static Pool> { + unsafe { POOL.as_ref() } +} + +/// Get the number of thread pool workers. +/// +/// This is [`num_threads`]` - 1`. +pub fn thread_pool_size() -> usize { + unsafe { N_THREADS }.get() - 1 +} + +/// Thread budgeting tool. +/// +/// This allows limiting the number of tasks inserted into the queue in nested computations. +/// Otherwise it wraps [`rayon`] when not single-threaded. +pub enum TaskBudget<'scope, 'scheduler> { + /// No threading performed. + SingleThreaded, + /// Initial multi-threaded state + MultiThreadedInitial { + /// Thread budget counter + budget : AtomicUsize, + /// Thread pool + pool : &'scheduler Pool, + }, + /// Nested multi-threaded state + MultiThreadedZoom { + /// Thread budget reference + budget : &'scope AtomicUsize, + scope : &'scheduler Scope<'scope>, + } +} + +/// Task execution scope for [`TaskBudget`]. +pub enum TaskBudgetScope<'scope, 'scheduler> { + SingleThreaded, + MultiThreaded { + budget : &'scope AtomicUsize, + scope : &'scheduler Scope<'scope>, + } +} + +impl<'scope, 'b> TaskBudget<'scope, 'b> { + /// Initialise a task budget in the main thread pool. + /// + /// The number of tasks [executed][TaskBudgetScope::execute] in [scopes][TaskBudget::zoom] + /// created through the budget is limited to [`num_threads()`]` + overbudget`. If `overbudget` + /// is `None`, the [global setting][set_task_overbudgeting] is used.§ + pub fn init(overbudget : Option) -> Self { + let n = num_threads().get(); + let m = overbudget.unwrap_or_else(|| unsafe { TASK_OVERBUDGETING.load(Relaxed) }); + if n <= 1 { + Self::SingleThreaded + } else if let Some(pool) = thread_pool() { + let budget = AtomicUsize::new(n + m); + Self::MultiThreadedInitial { budget, pool } + } else { + Self::SingleThreaded + } + } + + /// Initialise single-threaded thread budgeting. + pub fn none() -> Self { Self::SingleThreaded } +} + +impl<'scope, 'scheduler> TaskBudget<'scope, 'scheduler> { + /// Create a sub-scope for launching tasks + pub fn zoom<'smaller, F, R : Send>(&self, scheduler : F) -> R + where 'scope : 'smaller, + F : for<'a> FnOnce(TaskBudgetScope<'smaller, 'a>) -> R + Send + 'smaller { + match self { + &Self::SingleThreaded => scheduler(TaskBudgetScope::SingleThreaded), + &Self::MultiThreadedInitial { ref budget, pool } => { + let budget_ref = unsafe { + // Safe: scheduler is dropped when 'smaller becomes invalid. + std::mem::transmute::<&AtomicUsize, &'smaller AtomicUsize>(budget) + }; + pool.scope(move |scope| { + scheduler(TaskBudgetScope::MultiThreaded { scope, budget: budget_ref }) + }) + }, + &Self::MultiThreadedZoom { budget, .. /* scope */ } => { + rayon::scope(move |scope| { + scheduler(TaskBudgetScope::MultiThreaded { scope, budget }) + }) + }, + } + } +} + +impl<'scope, 'scheduler> TaskBudgetScope<'scope, 'scheduler> { + /// Queue a task or execute it in this thread if the thread budget is exhausted. + pub fn execute(&self, job : F) + where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) + Send + 'scope { + match self { + Self::SingleThreaded => job(TaskBudget::SingleThreaded), + Self::MultiThreaded { scope, budget } => { + let spawn = budget.fetch_update(Release, + Relaxed, + |n| (n > 1).then_some(n - 1)) + .is_ok(); + if spawn { + scope.spawn(|scope| { + let task_budget = TaskBudget::MultiThreadedZoom { scope, budget }; + job(task_budget); + budget.fetch_add(1, Release); + }) + } else { + job(TaskBudget::MultiThreadedZoom { scope, budget }) + } + } + } + } +} + +/// Runs `scheduler` with a [`TaskBudget`]. +/// +/// This corresponds to calling `scheduler` with [`TaskBudget::init(None)`]. +pub fn with_task_budget<'scope, F, R>(scheduler : F) -> R +where F : for<'b> FnOnce(TaskBudget<'scope, 'b>) -> R + 'scope { + scheduler(TaskBudget::init(None)) +} diff -r 860a54fca7bc -r 4e09b7829b51 src/sets.rs --- a/src/sets.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/sets.rs Tue Nov 01 09:24:45 2022 +0200 @@ -59,7 +59,7 @@ /// vector and $t$ the offset. /// /// `U` is the element type, `F` the floating point number type, and `A` the type of the -/// orthogonal (dual) vectors. They need implement [`Dot`]``. +/// orthogonal (dual) vectors. They need implement [`Dot`]. #[derive(Clone,Copy,Debug,Serialize,Eq,PartialEq)] pub struct Halfspace where A : Dot, F : Float { pub orthogonal : A, diff -r 860a54fca7bc -r 4e09b7829b51 src/types.rs --- a/src/types.rs Wed Oct 26 22:16:57 2022 +0300 +++ b/src/types.rs Tue Nov 01 09:24:45 2022 +0200 @@ -50,7 +50,7 @@ f32 f64); /// Trait for general numeric types -pub trait Num : 'static + Copy + num::Num + num_traits::NumAssign +pub trait Num : 'static + Copy + Sync + Send + num::Num + num_traits::NumAssign + std::iter::Sum + std::iter::Product + std::fmt::Debug + std::fmt::Display + serde::Serialize + CastFrom + CastFrom + CastFrom + CastFrom @@ -62,9 +62,9 @@ const ZERO : Self; const ONE : Self; const TWO : Self; - /// Generic version of ::MAX; + /// Generic version of `Self::MAX` const RANGE_MAX : Self; - /// Generic version of ::MIN; + /// Generic version of `Self::MIN` const RANGE_MIN : Self; }