--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/full_sampling.py Thu Feb 26 09:32:12 2026 -0500 @@ -0,0 +1,97 @@ +import numpy as np +from dolfinx import fem +from petsc4py import PETSc +from pointsource_pde.dolfinx_extras import get_mass_matrix +from slepc4py import SLEPc + + +class FullSampling: + """ + A linear operator class for full sampling of a concentration (fem.Function) + """ + + def __init__( + self, + V, + ): + """ + M: number of laser beams + num_points: number of discrete points per beam + domain_size: [xmin, xmax, ymin, ymax] + """ + self.V = V + self.M = V.dofmap.index_map.size_global + A = get_mass_matrix(V) + solver = PETSc.KSP().create(A.comm) + solver.setOperators(A) + solver.setType(PETSc.KSP.Type.PREONLY) + solver.getPC().setType(PETSc.PC.Type.LU) + self.solver = solver + + # ‖x‖^2 = ‖Ax.array‖^2 + # ‖Rx‖^2 = ‖x.array‖^2 = ‖A^{-1}Ax.array‖^2 ≤ ‖A^{-1}‖‖x‖^2, + # so we need the inverse of the minimal eigenvalue. + E = SLEPc.EPS().create(A.comm) + E.setOperators(A) + E.setProblemType(SLEPc.EPS.ProblemType.NHEP) + E.setWhichEigenpairs(SLEPc.EPS.Which.SMALLEST_MAGNITUDE) + # E.setWhichEigenpairs(SLEPc.EPS.Which.LARGEST_MAGNITUDE) + E.setFromOptions() + E.solve() + + sigma = abs(E.getEigenvalue(0)) + self._opnorm = 1.0 / sigma + print("Full sampling opnorm = %f" % self._opnorm) + + def apply(self, x): + """ + This does not work with MPI + """ + return x.x.array + + def diff_adjdir(self, j, _x): + """ + This does not work with MPI + """ + # We need ⟨Rx,v⟩_{ℝ^n} = ⟨x,R^*v⟩_V + # We have ⟨x,R^*v⟩_V = ⟨Ax.array,[R^*v].array⟩_{ℝ^m} + # But Rx = x.array, so we need + # ⟨x.array,v⟩_{ℝ^n} = ⟨Ax.array,[R^*v].array⟩_{ℝ^m} + # Taking [R^*v].array=A^{-1}v, the conjugate works correctly. + tmp = fem.Function(self.V) + tmp.x.array[:] = j + tmp.x.scatter_forward() + return tmp + # TODO: is convection_diffusion actualy based on norms in ℝ^n? + # res = fem.Function(self.V) + # res.x.array[:] = 0.0 + # self.solver.solve(tmp.x.petsc_vec, res.x.petsc_vec) + # res.x.scatter_forward() + # return res + + def opnorm(self, *args): + # raise NotImplementedError("Need mesh weighting?") + return self._opnorm + + def lipschitz_factor(self, *args): + return self.opnorm(*args) + + def diff_adj_lipschitz_factor(self, *args): + return self.opnorm(*args) + + def diff_bound(self, *args): + return self.opnorm(*args) + + def bound(self, *args): + return self.opnorm(*args) + + # Construct observation noise + def noise(self, noise_level): + M = self.M + # Create diagonal covariance matrix Gamma_v_t of shape (M, M) + Gamma_v_t = np.diag((noise_level * np.random.rand(M)) ** 2) + # Use average variance from diagonal as scalar variance + std = np.sqrt(np.mean(np.diag(Gamma_v_t))) + # Generate noise with shape (M, 1) using scalar standard deviation + noise = np.random.normal(0, std, size=(M,)) + return noise