| |
1 import numpy as np |
| |
2 from dolfinx import fem |
| |
3 from petsc4py import PETSc |
| |
4 from pointsource_pde.dolfinx_extras import get_mass_matrix |
| |
5 from slepc4py import SLEPc |
| |
6 |
| |
7 |
| |
8 class FullSampling: |
| |
9 """ |
| |
10 A linear operator class for full sampling of a concentration (fem.Function) |
| |
11 """ |
| |
12 |
| |
13 def __init__( |
| |
14 self, |
| |
15 V, |
| |
16 ): |
| |
17 """ |
| |
18 M: number of laser beams |
| |
19 num_points: number of discrete points per beam |
| |
20 domain_size: [xmin, xmax, ymin, ymax] |
| |
21 """ |
| |
22 self.V = V |
| |
23 self.M = V.dofmap.index_map.size_global |
| |
24 A = get_mass_matrix(V) |
| |
25 solver = PETSc.KSP().create(A.comm) |
| |
26 solver.setOperators(A) |
| |
27 solver.setType(PETSc.KSP.Type.PREONLY) |
| |
28 solver.getPC().setType(PETSc.PC.Type.LU) |
| |
29 self.solver = solver |
| |
30 |
| |
31 # ‖x‖^2 = ‖Ax.array‖^2 |
| |
32 # ‖Rx‖^2 = ‖x.array‖^2 = ‖A^{-1}Ax.array‖^2 ≤ ‖A^{-1}‖‖x‖^2, |
| |
33 # so we need the inverse of the minimal eigenvalue. |
| |
34 E = SLEPc.EPS().create(A.comm) |
| |
35 E.setOperators(A) |
| |
36 E.setProblemType(SLEPc.EPS.ProblemType.NHEP) |
| |
37 E.setWhichEigenpairs(SLEPc.EPS.Which.SMALLEST_MAGNITUDE) |
| |
38 # E.setWhichEigenpairs(SLEPc.EPS.Which.LARGEST_MAGNITUDE) |
| |
39 E.setFromOptions() |
| |
40 E.solve() |
| |
41 |
| |
42 sigma = abs(E.getEigenvalue(0)) |
| |
43 self._opnorm = 1.0 / sigma |
| |
44 print("Full sampling opnorm = %f" % self._opnorm) |
| |
45 |
| |
46 def apply(self, x): |
| |
47 """ |
| |
48 This does not work with MPI |
| |
49 """ |
| |
50 return x.x.array |
| |
51 |
| |
52 def diff_adjdir(self, j, _x): |
| |
53 """ |
| |
54 This does not work with MPI |
| |
55 """ |
| |
56 # We need ⟨Rx,v⟩_{ℝ^n} = ⟨x,R^*v⟩_V |
| |
57 # We have ⟨x,R^*v⟩_V = ⟨Ax.array,[R^*v].array⟩_{ℝ^m} |
| |
58 # But Rx = x.array, so we need |
| |
59 # ⟨x.array,v⟩_{ℝ^n} = ⟨Ax.array,[R^*v].array⟩_{ℝ^m} |
| |
60 # Taking [R^*v].array=A^{-1}v, the conjugate works correctly. |
| |
61 tmp = fem.Function(self.V) |
| |
62 tmp.x.array[:] = j |
| |
63 tmp.x.scatter_forward() |
| |
64 return tmp |
| |
65 # TODO: is convection_diffusion actualy based on norms in ℝ^n? |
| |
66 # res = fem.Function(self.V) |
| |
67 # res.x.array[:] = 0.0 |
| |
68 # self.solver.solve(tmp.x.petsc_vec, res.x.petsc_vec) |
| |
69 # res.x.scatter_forward() |
| |
70 # return res |
| |
71 |
| |
72 def opnorm(self, *args): |
| |
73 # raise NotImplementedError("Need mesh weighting?") |
| |
74 return self._opnorm |
| |
75 |
| |
76 def lipschitz_factor(self, *args): |
| |
77 return self.opnorm(*args) |
| |
78 |
| |
79 def diff_adj_lipschitz_factor(self, *args): |
| |
80 return self.opnorm(*args) |
| |
81 |
| |
82 def diff_bound(self, *args): |
| |
83 return self.opnorm(*args) |
| |
84 |
| |
85 def bound(self, *args): |
| |
86 return self.opnorm(*args) |
| |
87 |
| |
88 # Construct observation noise |
| |
89 def noise(self, noise_level): |
| |
90 M = self.M |
| |
91 # Create diagonal covariance matrix Gamma_v_t of shape (M, M) |
| |
92 Gamma_v_t = np.diag((noise_level * np.random.rand(M)) ** 2) |
| |
93 # Use average variance from diagonal as scalar variance |
| |
94 std = np.sqrt(np.mean(np.diag(Gamma_v_t))) |
| |
95 # Generate noise with shape (M, 1) using scalar standard deviation |
| |
96 noise = np.random.normal(0, std, size=(M,)) |
| |
97 return noise |