src/full_sampling.py

changeset 1
a4137aedcb3a
child 3
c3a4f4bb87f7
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/full_sampling.py	Thu Feb 26 09:32:12 2026 -0500
@@ -0,0 +1,97 @@
+import numpy as np
+from dolfinx import fem
+from petsc4py import PETSc
+from pointsource_pde.dolfinx_extras import get_mass_matrix
+from slepc4py import SLEPc
+
+
+class FullSampling:
+    """
+    A linear operator class for full sampling of a concentration (fem.Function)
+    """
+
+    def __init__(
+        self,
+        V,
+    ):
+        """
+        M: number of laser beams
+        num_points: number of discrete points per beam
+        domain_size: [xmin, xmax, ymin, ymax]
+        """
+        self.V = V
+        self.M = V.dofmap.index_map.size_global
+        A = get_mass_matrix(V)
+        solver = PETSc.KSP().create(A.comm)
+        solver.setOperators(A)
+        solver.setType(PETSc.KSP.Type.PREONLY)
+        solver.getPC().setType(PETSc.PC.Type.LU)
+        self.solver = solver
+
+        # ‖x‖^2 = ‖Ax.array‖^2
+        # ‖Rx‖^2 = ‖x.array‖^2 = ‖A^{-1}Ax.array‖^2 ≤ ‖A^{-1}‖‖x‖^2,
+        # so we need the inverse of the minimal eigenvalue.
+        E = SLEPc.EPS().create(A.comm)
+        E.setOperators(A)
+        E.setProblemType(SLEPc.EPS.ProblemType.NHEP)
+        E.setWhichEigenpairs(SLEPc.EPS.Which.SMALLEST_MAGNITUDE)
+        # E.setWhichEigenpairs(SLEPc.EPS.Which.LARGEST_MAGNITUDE)
+        E.setFromOptions()
+        E.solve()
+
+        sigma = abs(E.getEigenvalue(0))
+        self._opnorm = 1.0 / sigma
+        print("Full sampling opnorm = %f" % self._opnorm)
+
+    def apply(self, x):
+        """
+        This does not work with MPI
+        """
+        return x.x.array
+
+    def diff_adjdir(self, j, _x):
+        """
+        This does not work with MPI
+        """
+        # We need ⟨Rx,v⟩_{ℝ^n} = ⟨x,R^*v⟩_V
+        # We have  ⟨x,R^*v⟩_V = ⟨Ax.array,[R^*v].array⟩_{ℝ^m}
+        # But Rx = x.array, so we need
+        # ⟨x.array,v⟩_{ℝ^n} = ⟨Ax.array,[R^*v].array⟩_{ℝ^m}
+        # Taking [R^*v].array=A^{-1}v, the conjugate works correctly.
+        tmp = fem.Function(self.V)
+        tmp.x.array[:] = j
+        tmp.x.scatter_forward()
+        return tmp
+        # TODO: is convection_diffusion actualy based on norms in ℝ^n?
+        # res = fem.Function(self.V)
+        # res.x.array[:] = 0.0
+        # self.solver.solve(tmp.x.petsc_vec, res.x.petsc_vec)
+        # res.x.scatter_forward()
+        # return res
+
+    def opnorm(self, *args):
+        # raise NotImplementedError("Need mesh weighting?")
+        return self._opnorm
+
+    def lipschitz_factor(self, *args):
+        return self.opnorm(*args)
+
+    def diff_adj_lipschitz_factor(self, *args):
+        return self.opnorm(*args)
+
+    def diff_bound(self, *args):
+        return self.opnorm(*args)
+
+    def bound(self, *args):
+        return self.opnorm(*args)
+
+    # Construct observation noise
+    def noise(self, noise_level):
+        M = self.M
+        # Create diagonal covariance matrix Gamma_v_t of shape (M, M)
+        Gamma_v_t = np.diag((noise_level * np.random.rand(M)) ** 2)
+        # Use average variance from diagonal as scalar variance
+        std = np.sqrt(np.mean(np.diag(Gamma_v_t)))
+        # Generate noise with shape (M, 1) using scalar standard deviation
+        noise = np.random.normal(0, std, size=(M,))
+        return noise

mercurial