src/convection_diffusion.py

changeset 1
a4137aedcb3a
child 3
c3a4f4bb87f7
equal deleted inserted replaced
0:7ec1cfe19a24 1:a4137aedcb3a
1 import os
2 from dataclasses import dataclass
3 from posix import mkdir
4 from termios import B0
5 from typing import Optional, Tuple, Union
6
7 import dolfinx.geometry as geo
8 import numpy as np
9 import ufl
10 from dolfinx import fem, mesh
11 from dolfinx.fem.petsc import (
12 apply_lifting,
13 assemble_matrix,
14 assemble_vector,
15 create_vector,
16 set_bc,
17 )
18 from mpi4py import MPI
19 from petsc4py import PETSc
20
21 try:
22 import pointsource_pde.dolfinx_extras as dx
23
24 from dolfinx_access import (
25 cell_FunctionSpace_f64,
26 max_Function_f64_p2,
27 min_Function_f64_p2,
28 )
29 except:
30 import src.dolfinx_extras as dx
31
32
33 @dataclass
34 class XBound:
35 # Parameter bounds for convection-diffusion operator estimates
36 mu_dual: float = 3.0 # ||μ||_M(Ω) bound
37 k_min: float = 0.1 # min diffusion > 0 (coercivity)
38 k_max: float = 2.0 # max diffusion
39 c_Linf: float = 0.5 # max(|c1|,|c2|) L∞ bound
40 diam: float = 1.41 # domain diameter (√2 for unit square)
41 T: float = 1.0 # final time
42
43 def __add__(self, other: "XBound") -> "XBound":
44 # Conservative combination: min(k_min), max(c_Linf) for energy bounds"""
45 other = XBound(
46 **{
47 k: getattr(other, k, getattr(self, k))
48 for k in self.__dataclass_fields__
49 }
50 )
51 return XBound(
52 mu_dual=max(self.mu_dual, other.mu_dual),
53 k_min=min(self.k_min, other.k_min),
54 k_max=max(self.k_max, other.k_max),
55 c_Linf=max(self.c_Linf, other.c_Linf),
56 diam=max(self.diam, other.diam),
57 T=max(self.T, other.T),
58 )
59
60
61 class ConvectionDiffusion:
62 def __init__(
63 self,
64 u0,
65 g,
66 # w0,
67 domain_size=[-2, 2, -2, 2],
68 nx=32,
69 ny=32,
70 degree=2,
71 t0=0.0,
72 T=1.0,
73 num_steps=50,
74 p=None,
75 Delta_t=None,
76 C_stab=1.0,
77 alpha=1.0, # \alpha = k_m/2, k convective coefficient, k>=km
78 beta=1.0, # \beta = \| c \|_\infty^2 /k_m
79 C_emb=1.0,
80 C_reg=1.0,
81 ):
82 self.p = p # L^p, p=2
83 self.C_stab = C_stab # adjoint stability constant
84 self.alpha = alpha # coercivity lower bound for k
85 self.beta = beta # theoretical parameter
86 self.C_emb = C_emb # embedding constant
87 self.C_reg = C_reg
88 if Delta_t is None:
89 self.Delta_t = T / num_steps # Matches your N=50 intent
90 else:
91 self.Delta_t = Delta_t
92
93 self.domain_size = domain_size
94 self.nx = nx
95 self.ny = ny
96 self.degree = degree
97 self.T = T
98
99 self.num_steps = num_steps
100 self.dt = (T - t0) / num_steps
101 self.t0 = t0
102
103 self.save_for_plot = False
104
105 domain = mesh.create_rectangle(
106 MPI.COMM_SELF,
107 [
108 np.array([domain_size[0], domain_size[2]]),
109 np.array([domain_size[1], domain_size[3]]),
110 ],
111 [nx, ny],
112 mesh.CellType.triangle,
113 )
114 self.domain = domain
115 # Compute the domain volume for the adjoint Lipschitz factor
116 self.domain_size = domain_size
117 # Domain diameter from your domain_size
118 self.diam = np.sqrt(
119 (domain_size[1] - domain_size[0]) ** 2
120 + (domain_size[3] - domain_size[2]) ** 2
121 )
122
123 V = fem.functionspace(domain, ("Lagrange", degree))
124 self.V = V
125
126 self.u0 = fem.Function(V)
127 self.u0.name = "u0"
128 self.u0.interpolate(u0)
129
130 self.g = fem.Function(V)
131 self.g.name = "g"
132 self.g.interpolate(g)
133 self.bcs = [] # Initialize BCS as LIST
134 self.adjoint_bcs = [] # Initialize ADJOINT BCS as LIST (w = 0)
135
136 # Identify boundary facets for Dirichlet BC location (e.g. left/right boundaries at x=domain_size[0] or x=domain_size[1])
137 fdim = domain.topology.dim - 1
138 # Γ₁ facets: TOP/BOTTOM boundaries (y = ymin, ymax) - STATE Dirichlet u = g
139 gamma1_facets = mesh.locate_entities_boundary(
140 domain,
141 fdim,
142 lambda x: (
143 np.isclose(x[1], domain_size[2]) # x[1] = y = ymin
144 | np.isclose(x[1], domain_size[3])
145 ), # x[1] = y = ymax
146 )
147 # Γ₂ facets: LEFT/RIGHT boundaries (x = xmin, xmax) - ADJOINT Dirichlet w = 0
148 gamma2_facets = mesh.locate_entities_boundary(
149 domain,
150 fdim,
151 lambda x: (
152 np.isclose(x[0], domain_size[0]) # x[0] = x = xmin
153 | np.isclose(x[0], domain_size[1])
154 ), # x[0] = x = xmax
155 )
156 # STATE BC: u = g on Γ₁ (top/bottom)
157 dofs_gamma1 = fem.locate_dofs_topological(V, fdim, gamma1_facets)
158 # Create BC and ADD TO LIST
159 bc = fem.dirichletbc(self.g, dofs_gamma1) # V inferred from self.g!
160 self.bcs.append(bc)
161
162 # ADJOINT BC: w = 0 on Γ₁ (same facets, homogenized)
163 zero = fem.Constant(domain, PETSc.ScalarType(0.0))
164 dofs_gamma2 = fem.locate_dofs_topological(V, fdim, gamma2_facets)
165 bc_adjoint = fem.dirichletbc(zero, dofs_gamma2, V)
166 self.adjoint_bcs.append(bc_adjoint)
167
168 self.fdim = fdim
169 self.boundary_facets = np.unique(np.concatenate((gamma1_facets, gamma2_facets)))
170
171 ux = fem.Function(V)
172 wpx = fem.Function(V)
173 self.ux = ux
174 self.wpx = wpx
175 dim = V.mesh.topology.dim
176 self.expr2_form = fem.form(ufl.inner(ufl.grad(ux), ufl.grad(wpx)) * ufl.dx)
177 self.expr3_forms = tuple(
178 fem.form(wpx * ufl.grad(ux)[i] * ufl.dx) for i in range(dim)
179 )
180
181 # Only scalar case
182 self.k = fem.Constant(domain, PETSc.ScalarType(0.0)) # c[0] scalar
183 self.c0 = fem.Constant(domain, PETSc.ScalarType(0.0)) # c[0] scalar
184 self.c1 = fem.Constant(domain, PETSc.ScalarType(0.0)) # c[1] scalar
185 u_n = fem.Function(V) # New copy!
186 u_n.name = "u_n"
187 self.u_n = u_n
188 u, w, v = ufl.TrialFunction(V), ufl.TrialFunction(V), ufl.TestFunction(V)
189 dt = self.dt
190 # Forward bilinear form (backward Euler)
191 a = (
192 u * v * ufl.dx
193 + dt * ufl.dot(self.k * ufl.grad(u), ufl.grad(v)) * ufl.dx
194 + dt * (self.c0 * u.dx(0) + self.c1 * u.dx(1)) * v * ufl.dx
195 )
196 self.bilinear_form = fem.form(a)
197 L = u_n * v * ufl.dx
198 self.linear_form = fem.form(L)
199
200 # Adjoint bilinear form
201 w_n = fem.Function(V) # New copy!
202 w_n.name = "w_n"
203 self.w_n = w_n
204 j_u = fem.Function(V) # New copy!
205 j_u.name = "j_u"
206 self.j_u = j_u
207 a2 = (
208 w * v * ufl.dx
209 + dt
210 * (
211 ufl.dot(self.k * ufl.grad(w), ufl.grad(v))
212 - (self.c0 * w.dx(0) * v + self.c1 * w.dx(1) * v)
213 )
214 * ufl.dx
215 )
216 self.bilinear_form_w = fem.form(a2)
217 # Step n=N-1: L2 uses w_n = w^N=0 → solves w^{N-1}. (∂t)_primal^*
218 # Step n=N-2: L2 uses w_n = w^{N-1} → solves w^{N-2}
219 # Replace your L2 with 2 simple forms (same notation)
220
221 L2 = w_n * v * ufl.dx + dt * j_u * v * ufl.dx # kown w_n (previous step)
222 self.linear_form_w = fem.form(L2)
223
224 # Solving forward PDE
225 def apply(self, x):
226 μ, (k, c) = x
227
228 domain = self.domain
229 V = self.V
230 dt = self.dt
231 bcs = self.bcs
232
233 # initial condition
234 u_n = self.u_n
235 u_n.x.array[:] = self.u0.x.array
236 u_n.x.scatter_forward()
237
238 # Linear form: only contains u_n * v (Dirichlet on Γ1 is handled via bc)
239
240 # Assemble matrix WITH Dirichlet BCs applied
241 self.c0.value = c[0]
242 self.c1.value = c[1]
243 self.k.value = k
244 linear_form = self.linear_form
245 bilinear_form = self.bilinear_form
246 A = assemble_matrix(bilinear_form, bcs=bcs)
247 A.assemble()
248
249 # Create reusable RHS vector (will be filled each timestep)
250 b = create_vector(linear_form)
251 b_ps = create_vector(linear_form)
252
253 # Prepare solver once
254 solver = PETSc.KSP().create(domain.comm)
255 solver.setOperators(A)
256 solver.setType(PETSc.KSP.Type.PREONLY)
257 solver.getPC().setType(PETSc.PC.Type.LU)
258
259 # Form point source contribution
260 mesh = V.mesh
261 element = V.element.basix_element
262 mesh_nodes = mesh.geometry.x
263 cmap = mesh.geometry.cmap
264
265 b_ps.assemblyBegin()
266
267 with b_ps.localForm() as loc_b:
268 loc_b.set(0)
269
270 for point, alpha in μ.iter_padded():
271 cell_id = cell_FunctionSpace_f64(V._cpp_object, point)
272
273 if cell_id < 0:
274 print(f"Point {point} is outside the mesh.")
275 else:
276 cell_dofs = V.dofmap.cell_dofs(cell_id)
277
278 geom_dofs = mesh.geometry.dofmap[cell_id]
279 ref = cmap.pull_back(
280 np.array([[point[0], point[1]]]), mesh_nodes[geom_dofs]
281 )
282 phi = element.tabulate(0, ref)
283
284 b_ps.setValuesLocal(
285 cell_dofs,
286 alpha * phi,
287 addv=PETSc.InsertMode.ADD_VALUES,
288 )
289
290 b_ps.assemblyEnd()
291
292 t = self.t0
293 num_steps = self.num_steps
294 bcs = self.bcs
295
296 u_n_list = []
297 for i in range(num_steps):
298 t += dt
299
300 # b.assemblyBegin()
301
302 # zero-out and assemble RHS (M * u_n part)
303 with b.localForm() as loc_b:
304 loc_b.set(0)
305 assemble_vector(b, linear_form) # b = (u^n, v)
306
307 b.axpy(1.0, b_ps)
308
309 # Apply homogeneous Dirichlet BC correction to RHS and finalize vector
310 apply_lifting(b, [bilinear_form], [bcs]) # [[bc1, bc2, ...]]
311 b.ghostUpdate(addv=PETSc.InsertMode.ADD, mode=PETSc.ScatterMode.REVERSE)
312 set_bc(b, bcs) # list [bc1, bc2, ...]
313 # b.assemble()
314 #
315 # b.assemblyEnd()
316
317 # b.ghostUpdate(addv=PETSc.InsertMode.INSERT, mode=PETSc.ScatterMode.FORWARD)
318
319 # Solve linear problem A * uh = b
320 uh = fem.Function(V)
321 uh.name = "uh"
322 solver.solve(b, uh.x.petsc_vec)
323 uh.x.scatter_forward()
324
325 # Update u_n for next step, store uh
326 u_n.x.array[:] = uh.x.array
327 u_n.x.scatter_forward()
328 u_n_list.append(uh) # Unique objects
329
330 return u_n_list
331
332 # Solving the adjoint problem
333
334 def solve_adjoint_pde(self, j, x, u_n_list):
335 μ, (k, c) = x
336
337 domain = self.domain
338 V = self.V
339 dt = self.dt
340 adjoint_bcs = self.adjoint_bcs
341
342 num_steps = len(u_n_list)
343
344 assert len(j) == num_steps, f"MISMATCH: j({len(j)}) != u_n_list({num_steps})"
345 # Terminal condition: w(T) = 0
346 w_n = self.w_n
347 w_n.x.array[:] = 0.0
348 w_n.x.scatter_forward()
349
350 self.c0.value = c[0]
351 self.c1.value = c[1]
352 self.k.value = k
353 j_u = self.j_u
354 bilinear_form_w = self.bilinear_form_w
355 linear_form_w = self.linear_form_w
356
357 A2 = assemble_matrix(bilinear_form_w, bcs=adjoint_bcs) # Fixed bcs
358 A2.assemble()
359
360 # Create vector for RHS to be updated each step
361 b2 = create_vector(linear_form_w)
362
363 solver = PETSc.KSP().create(domain.comm)
364 solver.setOperators(A2)
365 solver.setType(PETSc.KSP.Type.PREONLY)
366 solver.getPC().setType(PETSc.PC.Type.LU)
367
368 t = self.t0 + num_steps * dt # Start at time T
369
370 # N+1 adjoint values to match N PDE steps
371 w_n_list = [None] * (num_steps + 1)
372 whT = fem.Function(V)
373 whT.x.array[:] = 0.0 # w^N = 0
374 whT.x.scatter_forward()
375 w_n_list[num_steps] = whT # Store at FINAL index
376
377 # backward steps**: compute w^{N-1}, ..., w^0
378
379 for i in range(num_steps - 1, -1, -1):
380 t -= dt
381
382 # Source term at current time step: j(t_i)
383
384 j_u.x.array[:] = j[i].x.array
385 j_u.x.scatter_forward()
386
387 # Update RHS
388 with b2.localForm() as loc_b2:
389 loc_b2.set(0)
390 assemble_vector(b2, linear_form_w)
391
392 apply_lifting(b2, [bilinear_form_w], [adjoint_bcs]) # [forms] → [bcs]
393 b2.ghostUpdate(addv=PETSc.InsertMode.ADD, mode=PETSc.ScatterMode.REVERSE)
394 set_bc(b2, adjoint_bcs)
395
396 # set_bc(b2, adjoint_bcs) # list BC object
397 # b2.assemble()
398
399 wh = fem.Function(V)
400 wh.name = "wh"
401 solver.solve(b2, wh.x.petsc_vec)
402 wh.x.scatter_forward()
403
404 # Update w_n for next iteration and store
405 w_n.x.array[:] = wh.x.array
406 w_n.x.scatter_forward()
407 w_n_list[i] = wh
408
409 return w_n_list
410
411 # Compute the differential operator
412 def diff_adjdir(self, j, x, apply_result=None):
413 """
414 Returns dual space elements: (dual_μ, (dual_k, (dual_c1, dual_c2)))
415 - scalar coeff → float sensitivity
416 - Function coeff → fem.Function sensitivity (Fréchet derivative)
417 """
418 u_n_list = apply_result if apply_result is not None else self.apply(x)
419 w_n_list = self.solve_adjoint_pde(j, x, u_n_list)
420 dt = self.dt
421 V = self.V
422 dim = V.mesh.topology.dim
423
424 # Extract coefficients and check types
425 _mu, (k, c) = x
426 # is_scalar_mu = isinstance(mu, (int, float, np.number))
427 is_scalar_k = isinstance(k, (int, float, np.number))
428 is_scalar_c = all(isinstance(ci, (int, float, np.number)) for ci in c)
429
430 # 1. expr1: dual sensitivity w.r.t. μ = ∫₀^T w_p dt (always Function)
431 wp_bar = fem.Function(V)
432 wp_bar.x.array[:] = (
433 0.5
434 * dt
435 * (
436 w_n_list[0].x.array
437 + w_n_list[-1].x.array
438 + 2 * np.sum([w.x.array for w in w_n_list[1:-1]], axis=0)
439 )
440 )
441
442 ux = self.ux
443 wpx = self.wpx
444
445 # 2. expr2: dual sensitivity w.r.t. k
446 if is_scalar_k:
447 # k scalar → ⟨expr2, δk⟩ = δk ∫ ∇u·∇w_p → scalar
448 expr2 = 0.0
449
450 form = self.expr2_form
451
452 for n, (u, wp) in enumerate(zip(u_n_list, w_n_list)):
453 ux.x.array[:] = u.x.array[:]
454 ux.x.scatter_forward()
455 wpx.x.array[:] = wp.x.array[:]
456 wpx.x.scatter_forward()
457 v2 = fem.assemble_scalar(form)
458 weight = 0.5 * dt if (n == 0 or n == len(u_n_list) - 1) else dt
459 expr2 -= weight * v2
460
461 else:
462 raise Exception("Unimplemented - out of date")
463
464 # k Function → ⟨expr2, δk⟩ = ∫ expr2 δk → expr2 = ∇u·∇w_p (pointwise)
465 expr2 = fem.Function(V)
466 expr2a = expr2.x.array
467
468 for n, (u, wp) in enumerate(zip(u_n_list, w_n_list)):
469 form = ufl.inner(ufl.grad(u), ufl.grad(wp)) * ufl.dx
470 v2 = fem.assemble_scalar(fem.form(form))
471 weight = 0.5 * dt if (n == 0 or n == len(u_n_list) - 1) else dt
472 expr2a -= weight * v2
473
474 # 3. expr3: dual sensitivity w.r.t. c = (c1, c2)
475 if is_scalar_c:
476 # Scalar c → return tuple of 2 floats
477
478 forms = self.expr3_forms
479
480 def val(i):
481 expr3i = 0.0
482 for n, (u, wp) in enumerate(zip(u_n_list, w_n_list)):
483 ux.x.array[:] = u.x.array[:]
484 ux.x.scatter_forward()
485 wpx.x.array[:] = wp.x.array[:]
486 wpx.x.scatter_forward()
487 v3 = fem.assemble_scalar(forms[i])
488 weight = 0.5 * dt if (n == 0 or n == len(u_n_list) - 1) else dt
489 expr3i -= weight * v3
490 return expr3i
491
492 expr3 = tuple(val(i) for i in range(dim))
493
494 else:
495 raise Exception("Unimplemented - out of date")
496
497 # Function c → return tuple of 2 Functions (pointwise derivatives)
498 def val(i):
499 # expr3[i]. x.array[:] = 0
500 expr3i = fem.Function(V)
501 expr3ia = expr3i.x.array[:]
502 for n, (u, wp) in enumerate(zip(u_n_list, w_n_list)):
503 form = wp * ufl.grad(u)[i] * ufl.dx
504 v3 = fem.assemble_scalar(fem.form(form))
505 weight = 0.5 * dt if (n == 0 or n == len(u_n_list) - 1) else dt
506 expr3ia -= weight * v3
507 return expr3i
508
509 expr3 = tuple(val(i) for i in range(dim))
510
511 if self.save_for_plot:
512 self.plot_wp_bar = wp_bar
513 self.plot_u_n_list = u_n_list
514 self.plot_w_n_list = w_n_list
515 self.plot_j_n_list = j
516
517 return wp_bar, (expr2, expr3)
518
519 def do_plot(self, iter, μ, frames, prefix):
520 n = self.num_steps
521 np.savez_compressed(
522 "%s/res_%d.npz" % (prefix, iter),
523 wp_bar_array=self.plot_wp_bar.x.array,
524 u_n_list_array=[self.plot_u_n_list[i].x.array for i in frames],
525 w_n_list_array=[self.plot_w_n_list[i].x.array for i in frames],
526 j_n_list_array=[self.plot_j_n_list[i].x.array for i in frames],
527 frames=frames,
528 times=list(map(lambda i: self.T * i / (n - 1), frames)),
529 mu=[[point[0], point[1], alpha] for point, alpha in μ.iter_padded()],
530 )
531
532 def _stability_prefactor(
533 self, *, C_stab=None, alpha=None, beta=None, T=None, C_emb=None, C_reg=None
534 ):
535 """
536 Return the stability prefactor
537 P := (C_stab/alpha) * (1 + sqrt(T)*(1 + sqrt(beta)/sqrt(alpha)))
538 and the measure-to-functional constant L_e_mu := C_emb * C_reg * sqrt(T).
539 Return (P, L_e_mu):
540 """
541 # self._stability_prefactor(alpha=0.5, T=2.0)
542
543 # Read from arguments or instance attributes
544 C_stab = C_stab if C_stab is not None else self.C_stab
545 alpha = alpha if alpha is not None else self.alpha
546 beta = beta if beta is not None else self.beta
547 T = T if T is not None else self.T
548 C_emb = C_emb if C_emb is not None else self.C_emb
549 C_reg = C_reg if C_reg is not None else self.C_reg
550
551 # Stability factor P
552
553 P = (C_stab / alpha) * (1 + np.sqrt(T) * (1 + np.sqrt(beta) / np.sqrt(alpha)))
554 L_e_mu = C_emb * C_reg * np.sqrt(T)
555
556 return P, L_e_mu
557
558 # Solution operator Lipschitz factor
559 def lipschitz_factor(self):
560 """
561 Conservative Lipschitz factor for the forward solution operator mapping
562 (k,c,mu) -> u. Returns a single scalar C_Lip such that
563 ||u2 - u1|| <= C_Lip * (||k2-k1|| + ||c2-c1|| + ||mu2-mu1||_* )
564 (we use a conservative form combining the measure and (k,c) pieces).
565 """
566 P, L_e_mu = self._stability_prefactor()
567 # Conservative combined choice: multiply P by max(1, L_e_mu)
568 C_Lip = P * max(1.0, L_e_mu)
569 return C_Lip
570
571 # Adjoint solution operator Lipschitz factor
572 """
573 def diff_adj_lipschitz_factor(self):
574
575 Compute adjoint-solution Lipschitz factor A for 2D domain.
576
577 For p = 2, A = 1.
578 For p > 2, A = |Omega|^(1/2 - 1/p), where |Omega| is the mesh area.
579
580 p = self.p
581 omega_vol = self.domain_volume
582 return 1.0 if p == 2 else omega_vol ** (0.5 - 1.0 / p)
583 """
584
585 def diff_adj_lipschitz_factor(self):
586 """
587 Always return 1 for p=2
588 """
589 return 1.0
590
591 def diff_bound(self, xbound=None):
592 """
593 Differential bound for the solution operator derivative.
594
595 Returns the tuple: (L_e_mu, L_e_k, L_e_c)
596
597 where
598 L_e_k = ||∇u||_{L^2(Ω_T)}
599 L_e_c = ||∇u||_{L^2(Ω_T)}
600 L_e_mu = C_emb * C_reg * sqrt(T)
601 """
602
603 # If grad_u_norm is None, compute it from u0
604 grad_u_norm = getattr(self, "grad_u_norm", None)
605 if grad_u_norm is None:
606 u = self.u0
607 V = self.V
608 # Compute L2 norm of gradient of u0 over the domain
609 grad_u = ufl.grad(u)
610 grad_u_sq = ufl.inner(grad_u, grad_u)
611 dx = ufl.Measure("dx", domain=self.domain)
612 # Integral of |∇u|^2 over Ω
613 grad_u_norm = fem.assemble_scalar(fem.form(grad_u_sq * dx)) ** 0.5
614 # Optionally store it for later reuse
615 self.grad_u_norm = grad_u_norm
616
617 return grad_u_norm
618
619 def diff_bound_pair(self, xbound=None, Delta_t=None):
620 if xbound is None:
621 return self.diff_bound(xbound=None)
622 else:
623 xb_combined = xbound[0] + xbound[1] if len(xbound) > 1 else xbound[0]
624
625 k_min = getattr(xb_combined, "k_min", self.alpha)
626 c_Linf = getattr(xb_combined, "c_Linf", np.sqrt(self.beta * k_min))
627 diam = getattr(xb_combined, "diam", self.diam)
628 T = getattr(xb_combined, "T", self.T)
629
630 Pe = c_Linf * diam / k_min
631 gamma = c_Linf**2 / k_min
632 # adaptive time step size Delta_t (Delta_t = 0.01 in the test)
633 if Delta_t is None:
634 Delta_t = min(0.1, k_min / c_Linf**2, 0.01 * T) # CFL-like
635
636 # Accumulate N = T/Delta_t local steps
637 N = int(np.ceil(T / Delta_t))
638
639 # Per-step factors
640 exp_local = np.exp(0.5 * gamma * Delta_t)
641 C_adj_step = np.sqrt(Delta_t * (exp_local + Delta_t / k_min))
642 C_state_step = np.sqrt(Delta_t) * np.sqrt(1 + Pe**2)
643
644 # Total accumulated bound (geometric sum <= N * max_step)
645 C_adj_PDE = N * C_adj_step
646 C_state = N * C_state_step
647
648 # Use the embedding constant
649 C_mu_adj = self.C_emb * C_adj_PDE # ∫φ δμ
650 C_k_adj = C_state * C_adj_PDE # ∫∇u·∇φ δk
651 C_c1_adj = C_state * C_adj_PDE * c_Linf # ∫∂x u φ δc1
652 C_c2_adj = C_state * C_adj_PDE * c_Linf # ∫∂y u φ δc2
653
654 return C_mu_adj, C_k_adj, C_c1_adj, C_c2_adj
655
656 # ||S(x)||_Y→X ≤ C_state ||μ||_M(Ω) [time-independent μ]
657 def codomain_bound(self, xbound=None):
658 if xbound is None:
659 return 1.0
660
661 # Extract parameters
662 if hasattr(xbound, "k_min"):
663 xb = xbound
664 else:
665 if hasattr(xbound, "__len__") and len(xbound) > 0:
666 xb = xbound[0] + xbound[1] if len(xbound) > 1 else xbound[0]
667 else:
668 xb = xbound
669 k_min = getattr(xb, "k_min", self.alpha)
670 c_Linf = getattr(xb, "c_Linf", np.sqrt(self.beta * k_min))
671 mu_dual = getattr(xb, "mu_dual", 3.0)
672 diam = getattr(xb, "diam", self.diam)
673 T = getattr(xb, "T", self.T)
674
675 Pe = c_Linf * diam / k_min
676 if not hasattr(self, "Delta_t"):
677 Delta_t = min(0.1, k_min / c_Linf**2, 0.01 * T)
678 else:
679 Delta_t = self.Delta_t
680
681 N = int(np.ceil(T / Delta_t))
682 gamma = c_Linf**2 / k_min
683 exp_local = np.exp(0.25 * gamma * Delta_t)
684
685 C_state_step = np.sqrt(Delta_t) * np.sqrt(1 + 0.5 * Pe**2) * exp_local
686 C_state = N * C_state_step
687
688 return C_state * mu_dual
689
690 # ||S(x1)-S(x2)|| ≤ Cμ||Δμ|| + Ck||Δk|| + Cc1||Δc1|| + Cc2||Δc2||
691
692 def codomain_bound_pair(self, xbound=None):
693 if xbound is None:
694 return self.codomain_bound(xbound=None)
695 else:
696 xb = xbound[0] + xbound[1] if len(xbound) > 1 else xbound[0]
697 C_base = self.codomain_bound(xb)
698 # Component
699 C_mu = C_base
700 C_k = C_base * 2.0
701 C_c1 = C_base * 1.5
702 C_c2 = C_base * 1.5
703
704 return (C_mu, C_k, C_c1, C_c2)
705
706 """
707 def codomain_bound(self, xbound=None):
708 return 1.0
709
710 def codomain_bound_pair(self, xbound=None):
711 # TODO: implement properly
712 if xbound is None:
713 return self.codomain_bound(xbound=None)
714 else:
715 return self.codomain_bound(xbound=xbound[0] + xbound[1])
716 """
717
718 def diff_bound3(self):
719 """
720 Returns full operator differential bounds:
721
722 L_e_mu = C_emb * C_reg * sqrt(T)
723 L_e_k = ||∇u||_{L^2(Ω)}
724 L_e_c = ||∇u||_{L^2(Ω)}
725
726 """
727
728 # First get L_e_k = L_e_c
729 grad_u_norm = self.diff_bound()
730
731 # Compute L_e_mu from stability prefactor
732 _P, L_e_mu = self._stability_prefactor()
733
734 L_e_k = grad_u_norm
735 L_e_c = grad_u_norm
736
737 return L_e_mu, L_e_k, L_e_c
738
739 # Solution operator Lipschitz factor separately wrt. μ and (k, c)
740 def lipschitz_factor_pair(self):
741 """
742 Compute the forward solution operator Lipschitz factors
743 separately with respect to the parameters (k, c) and μ.
744
745 Returns:
746 tuple: (L_mu, L_kc)
747 L_mu - Lipschitz factor w.r.t. μ
748 L_kc - Lipschitz factor w.r.t. (k, c)
749 """
750 # Get differential bounds for the solution operator
751 L_e_mu, L_e_k, L_e_c = self.diff_bound3()
752
753 # Lipschitz factor w.r.t μ
754 L_mu = L_e_mu
755
756 # Lipschitz factor w.r.t (k, c)
757 # Conservative choice: max of L_e_k and L_e_c
758 L_kc = max(L_e_k, L_e_c)
759
760 return L_mu, L_kc
761
762 # Adjoint solution operator Lipschitz factor separately wrt. μ and (k, c)
763 def diff_adj_lipschitz_factor_pair(self):
764 A = self.diff_adj_lipschitz_factor()
765
766 # No separate analytic dependence, so return same A for both parts
767 A_mu = A
768 A_kc = A
769
770 return A_mu, A_kc
771
772
773 def own(u):
774 # """
775 # Return a version of u that is guaranteed to be uniquely owned.
776 # If u has other Python references, return a deep copy.
777 # """
778 # # sys.getrefcount(u) includes the temporary reference inside getrefcount,
779 # # so '2' means exactly one external reference.
780 # if sys.getrefcount(u) <= 2:
781 # return u # safe: no other references
782
783 # deep copy into a new Function
784 u_new = fem.Function(u.function_space)
785 # Does not work
786 # u_new.x.petsc_vec.copy(u.x.petsc_vec)
787 # u_new.x.scatter_forward()
788 np.copyto(u_new.x.array, u.x.array)
789 return u_new
790
791
792 def nn2(x):
793 return None if isinstance(x, float) else dx.norm2_squared(x)
794
795
796 class PlotFactory:
797 def __init__(self, pde, n_plots=5):
798 self.pde = pde
799 self.n_plots = n_plots
800
801 m = n_plots
802 n = pde.num_steps
803 pde.save_for_plot = True
804 self.plot_frames = list(map(lambda i: i * (n - 1) // (m - 1), range(0, m)))
805 self.pde = pde
806
807 def produce(self, prefix):
808 return Plotter(self.pde, self.n_plots, prefix)
809
810
811 class Plotter:
812 def __init__(self, pde, n_plots, prefix):
813 m = n_plots
814 n = pde.num_steps
815 pde.save_for_plot = True
816 self.plot_frames = list(map(lambda i: i * (n - 1) // (m - 1), range(0, m)))
817 self.pde = pde
818 os.makedirs(prefix, exist_ok=True)
819 self.prefix = prefix
820
821 def plot(self, iter, μ):
822 self.pde.do_plot(iter, μ, self.plot_frames, self.prefix)
823
824
825 class QuadraticRegularisation:
826 """
827 Quadratic regularisation functional for the auxiliary variables
828 """
829
830 def _own(self, x):
831 return x if isinstance(x, float) else own(x)
832
833 def __init__(self, α, base=None):
834 self.α = α
835 if base is not None:
836 (k, (c1, c2)) = base
837 self.base = (self._own(k), (self._own(c1), self._own(c2)))
838 self.base_norm_squared = (nn2(k), (nn2(c1), nn2(c2)))
839 else:
840 self.base = None
841
842 def _apply1(self, x, b, n):
843 if isinstance(x, float):
844 d = x if b is None else x - b
845 return d * d / 2.0
846 else:
847 if b is not None:
848 return (dx.norm2_squared(x) + n) / 2.0 + dx.dot(x, b)
849 return dx.norm2_squared(x) / 2.0
850
851 def _get_base(self):
852 return (None, (None, None)) if self.base is None else self.base
853
854 def apply(self, x):
855 """
856 Apply the quadratic regularisation fucntional to `x`.
857 """
858 (k, (c1, c2)) = x
859 (kb, (c1b, c2b)) = self._get_base()
860 (nkb, (nc1b, nc2b)) = self.base_norm_squared
861
862 return self.α * (
863 self._apply1(k, kb, nkb)
864 + self._apply1(c1, c1b, nc1b)
865 + self._apply1(c2, c2b, nc2b)
866 )
867
868 def _prox1(self, τ, x, b):
869 γ = self.α * τ
870 β = 1.0 / (1.0 + γ)
871 if isinstance(x, float):
872 return β * (x if b is None else x + b * γ)
873 else:
874 # x = own(x)
875 vx = x.x.array
876 if b is not None:
877 vx += b.x.array * γ
878 vx *= β
879 return x
880
881 def prox(self, τ, x):
882 """
883 Apply the proximal map of the quadratic regularisation fucntional to `x`.
884 WARNING: This function is unsafe. It may modify `x´.
885 That is ok for our purposes, as Rust, being a safe language, already needs to pass a
886 copied or owned instance to Python.
887 """
888 k, (c1, c2) = x
889 (kb, (c1b, c2b)) = self._get_base()
890
891 return (
892 self._prox1(τ, k, kb),
893 (self._prox1(τ, c1, c1b), self._prox1(τ, c2, c2b)),
894 )

mercurial