src/full_sampling.py

Fri, 08 May 2026 17:16:34 -0500

author
Tuomo Valkonen <tuomov@iki.fi>
date
Fri, 08 May 2026 17:16:34 -0500
changeset 4
49b062acace9
parent 3
c3a4f4bb87f7
permissions
-rw-r--r--

Do not directly depend on ndarray, but through numpy

import numpy as np
from dolfinx import fem
from petsc4py import PETSc
from pointsource_pde.dolfinx_extras import get_mass_matrix
from slepc4py import SLEPc


class FullSampling:
    """
    A linear operator class for full sampling of a concentration (fem.Function)
    """

    def __init__(
        self,
        V,
    ):
        """
        M: number of laser beams
        num_points: number of discrete points per beam
        domain_size: [xmin, xmax, ymin, ymax]
        """
        self.V = V
        self.M = V.dofmap.index_map.size_global
        A = get_mass_matrix(V)
        solver = PETSc.KSP().create(A.comm)
        solver.setOperators(A)
        solver.setType(PETSc.KSP.Type.PREONLY)
        solver.getPC().setType(PETSc.PC.Type.LU)
        self.solver = solver

        # ‖x‖^2 = ‖Ax.array‖^2
        # ‖Rx‖^2 = ‖x.array‖^2 = ‖A^{-1}Ax.array‖^2 ≤ ‖A^{-1}‖‖x‖^2,
        # so we need the inverse of the minimal eigenvalue.
        E = SLEPc.EPS().create(A.comm)
        E.setOperators(A)
        E.setProblemType(SLEPc.EPS.ProblemType.NHEP)
        E.setWhichEigenpairs(SLEPc.EPS.Which.SMALLEST_MAGNITUDE)
        # E.setWhichEigenpairs(SLEPc.EPS.Which.LARGEST_MAGNITUDE)
        E.setFromOptions()
        E.solve()

        sigma = abs(E.getEigenvalue(0))
        self._opnorm = 1.0 / sigma
        print("Full sampling opnorm = %f" % self._opnorm)

    def apply(self, x):
        """
        This does not work with MPI
        """
        return x.x.array

    def diff_adjdir(self, j, _x):
        """
        This does not work with MPI
        """
        # We need ⟨Rx,v⟩_{ℝ^n} = ⟨x,R^*v⟩_V
        # We have  ⟨x,R^*v⟩_V = ⟨Ax.array,[R^*v].array⟩_{ℝ^m}
        # But Rx = x.array, so we need
        # ⟨x.array,v⟩_{ℝ^n} = ⟨Ax.array,[R^*v].array⟩_{ℝ^m}
        # Taking [R^*v].array=A^{-1}v, the conjugate works correctly.
        tmp = fem.Function(self.V)
        tmp.x.array[:] = j
        tmp.x.scatter_forward()
        return tmp
        # TODO: is convection_diffusion actualy based on norms in ℝ^n?
        # res = fem.Function(self.V)
        # res.x.array[:] = 0.0
        # self.solver.solve(tmp.x.petsc_vec, res.x.petsc_vec)
        # res.x.scatter_forward()
        # return res

    def opnorm(self, *args):
        # raise NotImplementedError("Need mesh weighting?")
        return self._opnorm

    def lipschitz_factor(self, *args):
        return self.opnorm(*args)

    def diff_chain_lipschitz_factor(self, *args):
        return self.opnorm(*args)

    def diff_bound(self, *args):
        return self.opnorm(*args)

    def bound(self, *args):
        return self.opnorm(*args)

    # Construct observation noise
    def noise(self, noise_level):
        M = self.M
        # Create diagonal covariance matrix Gamma_v_t of shape (M, M)
        Gamma_v_t = np.diag((noise_level * np.random.rand(M)) ** 2)
        # Use average variance from diagonal as scalar variance
        std = np.sqrt(np.mean(np.diag(Gamma_v_t)))
        # Generate noise with shape (M, 1) using scalar standard deviation
        noise = np.random.normal(0, std, size=(M,))
        return noise

mercurial