Fri, 16 Jan 2026 19:41:32 -0500
pointsource_algs step length estimation support
import numpy as np from dolfinx import fem from petsc4py import PETSc from pointsource_pde.dolfinx_extras import get_mass_matrix from slepc4py import SLEPc class FullSampling: """ A linear operator class for full sampling of a concentration (fem.Function) """ def __init__( self, V, ): """ M: number of laser beams num_points: number of discrete points per beam domain_size: [xmin, xmax, ymin, ymax] """ self.V = V self.M = V.dofmap.index_map.size_global A = get_mass_matrix(V) solver = PETSc.KSP().create(A.comm) solver.setOperators(A) solver.setType(PETSc.KSP.Type.PREONLY) solver.getPC().setType(PETSc.PC.Type.LU) self.solver = solver # ‖x‖^2 = ‖Ax.array‖^2 # ‖Rx‖^2 = ‖x.array‖^2 = ‖A^{-1}Ax.array‖^2 ≤ ‖A^{-1}‖‖x‖^2, # so we need the inverse of the minimal eigenvalue. E = SLEPc.EPS().create(A.comm) E.setOperators(A) E.setProblemType(SLEPc.EPS.ProblemType.NHEP) E.setWhichEigenpairs(SLEPc.EPS.Which.SMALLEST_MAGNITUDE) # E.setWhichEigenpairs(SLEPc.EPS.Which.LARGEST_MAGNITUDE) E.setFromOptions() E.solve() sigma = abs(E.getEigenvalue(0)) self._opnorm = 1.0 / sigma print("Full sampling opnorm = %f" % self._opnorm) def apply(self, x): """ This does not work with MPI """ return x.x.array def diff_adjdir(self, j, _x): """ This does not work with MPI """ # We need ⟨Rx,v⟩_{ℝ^n} = ⟨x,R^*v⟩_V # We have ⟨x,R^*v⟩_V = ⟨Ax.array,[R^*v].array⟩_{ℝ^m} # But Rx = x.array, so we need # ⟨x.array,v⟩_{ℝ^n} = ⟨Ax.array,[R^*v].array⟩_{ℝ^m} # Taking [R^*v].array=A^{-1}v, the conjugate works correctly. tmp = fem.Function(self.V) tmp.x.array[:] = j tmp.x.scatter_forward() return tmp # TODO: is convection_diffusion actualy based on norms in ℝ^n? # res = fem.Function(self.V) # res.x.array[:] = 0.0 # self.solver.solve(tmp.x.petsc_vec, res.x.petsc_vec) # res.x.scatter_forward() # return res def opnorm(self, *args): # raise NotImplementedError("Need mesh weighting?") return self._opnorm def lipschitz_factor(self, *args): return self.opnorm(*args) def diff_adj_lipschitz_factor(self, *args): return self.opnorm(*args) def diff_bound(self, *args): return self.opnorm(*args) def bound(self, *args): return self.opnorm(*args) # Construct observation noise def noise(self, noise_level): M = self.M # Create diagonal covariance matrix Gamma_v_t of shape (M, M) Gamma_v_t = np.diag((noise_level * np.random.rand(M)) ** 2) # Use average variance from diagonal as scalar variance std = np.sqrt(np.mean(np.diag(Gamma_v_t))) # Generate noise with shape (M, 1) using scalar standard deviation noise = np.random.normal(0, std, size=(M,)) return noise