src/full_sampling.py

changeset 1
a4137aedcb3a
child 3
c3a4f4bb87f7
equal deleted inserted replaced
0:7ec1cfe19a24 1:a4137aedcb3a
1 import numpy as np
2 from dolfinx import fem
3 from petsc4py import PETSc
4 from pointsource_pde.dolfinx_extras import get_mass_matrix
5 from slepc4py import SLEPc
6
7
8 class FullSampling:
9 """
10 A linear operator class for full sampling of a concentration (fem.Function)
11 """
12
13 def __init__(
14 self,
15 V,
16 ):
17 """
18 M: number of laser beams
19 num_points: number of discrete points per beam
20 domain_size: [xmin, xmax, ymin, ymax]
21 """
22 self.V = V
23 self.M = V.dofmap.index_map.size_global
24 A = get_mass_matrix(V)
25 solver = PETSc.KSP().create(A.comm)
26 solver.setOperators(A)
27 solver.setType(PETSc.KSP.Type.PREONLY)
28 solver.getPC().setType(PETSc.PC.Type.LU)
29 self.solver = solver
30
31 # ‖x‖^2 = ‖Ax.array‖^2
32 # ‖Rx‖^2 = ‖x.array‖^2 = ‖A^{-1}Ax.array‖^2 ≤ ‖A^{-1}‖‖x‖^2,
33 # so we need the inverse of the minimal eigenvalue.
34 E = SLEPc.EPS().create(A.comm)
35 E.setOperators(A)
36 E.setProblemType(SLEPc.EPS.ProblemType.NHEP)
37 E.setWhichEigenpairs(SLEPc.EPS.Which.SMALLEST_MAGNITUDE)
38 # E.setWhichEigenpairs(SLEPc.EPS.Which.LARGEST_MAGNITUDE)
39 E.setFromOptions()
40 E.solve()
41
42 sigma = abs(E.getEigenvalue(0))
43 self._opnorm = 1.0 / sigma
44 print("Full sampling opnorm = %f" % self._opnorm)
45
46 def apply(self, x):
47 """
48 This does not work with MPI
49 """
50 return x.x.array
51
52 def diff_adjdir(self, j, _x):
53 """
54 This does not work with MPI
55 """
56 # We need ⟨Rx,v⟩_{ℝ^n} = ⟨x,R^*v⟩_V
57 # We have ⟨x,R^*v⟩_V = ⟨Ax.array,[R^*v].array⟩_{ℝ^m}
58 # But Rx = x.array, so we need
59 # ⟨x.array,v⟩_{ℝ^n} = ⟨Ax.array,[R^*v].array⟩_{ℝ^m}
60 # Taking [R^*v].array=A^{-1}v, the conjugate works correctly.
61 tmp = fem.Function(self.V)
62 tmp.x.array[:] = j
63 tmp.x.scatter_forward()
64 return tmp
65 # TODO: is convection_diffusion actualy based on norms in ℝ^n?
66 # res = fem.Function(self.V)
67 # res.x.array[:] = 0.0
68 # self.solver.solve(tmp.x.petsc_vec, res.x.petsc_vec)
69 # res.x.scatter_forward()
70 # return res
71
72 def opnorm(self, *args):
73 # raise NotImplementedError("Need mesh weighting?")
74 return self._opnorm
75
76 def lipschitz_factor(self, *args):
77 return self.opnorm(*args)
78
79 def diff_adj_lipschitz_factor(self, *args):
80 return self.opnorm(*args)
81
82 def diff_bound(self, *args):
83 return self.opnorm(*args)
84
85 def bound(self, *args):
86 return self.opnorm(*args)
87
88 # Construct observation noise
89 def noise(self, noise_level):
90 M = self.M
91 # Create diagonal covariance matrix Gamma_v_t of shape (M, M)
92 Gamma_v_t = np.diag((noise_level * np.random.rand(M)) ** 2)
93 # Use average variance from diagonal as scalar variance
94 std = np.sqrt(np.mean(np.diag(Gamma_v_t)))
95 # Generate noise with shape (M, 1) using scalar standard deviation
96 noise = np.random.normal(0, std, size=(M,))
97 return noise

mercurial