Thu, 26 Feb 2026 09:32:12 -0500
Initial working version for known convectivity and diffusivity
# # Laser and mirrors convection-diffusion experiment with unknown diffusivity and convectivity; # Includes `generic_setup` for the known case as well. # import numpy as np from dolfinx import fem from measures import DiscreteMeasure_2_f64 from numpy.linalg import norm from pointsource_pde import Problem, RegTerm from pointsource_pde.compose import ( ComposeFnWithOperator, SumOfSeparableFunctions, ) from pointsource_pde.convection_diffusion import ( ConvectionDiffusion, PlotFactory, QuadraticRegularisation, XBound, ) from pointsource_pde.laser_sampling import LaserSampling from pointsource_pde.quadratic_dataterm import QuadraticDataTerm # Give name to the problem name = "laser_and_mirrors_aux" # Fine-tune algorithms alg_finetune = { "tau0": 0.99, "theta0": 0.01, "inner_method": "PP", "inner_pdps_τσ0": (19.8, 0.05), "inner_pp_τ": [1000.0, 1000.0], "merge": True, "fitness_merging": True, } algorithm_overrides = { "RadonFB": alg_finetune, "RadonSlidingFB": alg_finetune, "RadonPDPS": alg_finetune, "RadonSlidingPDPS": alg_finetune, } # Setup shared between experiment with known/unknown conductivity and diffusivity def generic_setup(prefix, simple=True): # Create true source μ_true = DiscreteMeasure_2_f64( [([0.1, 0.3], 0.08), ([0.4, 0.25], 0.05), ([0.25, 0.13], 0.06)] ) # Create convection-diffusion PDE pde = ConvectionDiffusion( u0=lambda x: 0.0 * x[1], g=lambda x: 0.0 * x[1], domain_size=[0, 0.5, 0, 0.5], num_steps=50, ) # Set up lasers and mirrors lasers = [ np.array([0.1, 0.1]), np.array([0.1, 0.4]), np.array([0.4, 0.1]), np.array([0.4, 0.1]), ] mirrors_per_side = 10 x0, x1, y0, y1 = ( pde.domain_size[0], pde.domain_size[1], pde.domain_size[2], pde.domain_size[3], ) beams = [ (src, np.array(dst)) for src in lasers for i in range(0, mirrors_per_side) for z in [(i + 1) / (mirrors_per_side + 1)] for dst in [ [z * (x1 - x0) + x0, y0], [z * (x1 - x0) + x0, y1], [x0, z * (y1 - y0) + y0], [x1, z * (y1 - y0) + y0], ] ] L_i = LaserSampling(pde.V, pde.domain, pde.nx, pde.ny, beams) v0 = 1.0 if simple: # Scalar convectivity and diffusivity r = 0.5 θ = 30 * np.pi / 180 c1 = r * np.cos(θ) c2 = r * np.sin(θ) k = 0.01 # diffusive # k = 0.001 # high wind else: # Convectivity and diffusivity fields def velocity_x(x): r = np.sqrt(x[0] ** 2 + x[1] ** 2 + 1e-10) return v0 * x[0] / r def velocity_y(x): r = np.sqrt(x[0] ** 2 + x[1] ** 2 + 1e-10) return v0 * x[1] / r c1 = fem.Function(pde.V) c1.interpolate(velocity_x) c2 = fem.Function(pde.V) c2.interpolate(velocity_y) k = fem.Function(pde.V) k0 = 1.0 k.interpolate(lambda x: k0 + 0.2 * np.sin(5 * x[0]) * np.cos(5 * x[1])) # k is auxtrue = (k, (c1, c2)) # Simulate measurements u_n_list = pde.apply((μ_true, (k, (c1, c2)))) b_true = [L_i.apply(u) for u in u_n_list] b = [b_i + L_i.noise(0.01 * norm(b_i) / np.sqrt(np.size(b_i))) for b_i in b_true] # Estimate bounds on domains and values dat_bound = 10 * max([norm(b_i) for b_i in b]) μ_bound = 1.3 * sum([alpha for _point, alpha in μ_true.iter_padded()]) xbound = XBound(mu_dual=μ_bound, k_min=k, k_max=k, c_Linf=r) # Create data term dat = ComposeFnWithOperator( SumOfSeparableFunctions( [ QuadraticDataTerm( L_i, data, xbound=dat_bound, b_norm=norm(b), λ=pde.dt / len(beams), ) for data in b ] ), pde, xbound=[xbound], xbound_pair=[xbound], ) # Plot true and initial data plot_factory = PlotFactory(pde) plotter = plot_factory.produce(prefix) ω, _ = dat.diff((DiscreteMeasure_2_f64([]), auxtrue)) ω_true, _ = dat.diff((μ_true, auxtrue)) np.savez_compressed( "%s/omega_0.npz" % prefix, true_mu=[[point[0], point[1], alpha] for point, alpha in μ_true.iter_padded()], omega0=ω.x.array, true_omega=ω_true.x.array, true_u_n_list_array=[u_n_list[i].x.array for i in plotter.plot_frames], ) return dat, auxtrue, μ_bound, μ_true, plot_factory def setup(prefix): dat, auxtrue, _μ_bound, μ_true, plot_factory = generic_setup(prefix) reg = RegTerm.NonnegRadon(10e-7) μ0 = DiscreteMeasure_2_f64([]) aux0 = auxtrue aux = QuadraticRegularisation(0.001, aux0) ax = aux.apply(aux0) inival = dat.apply((μ0, aux0)) print("Initial value:", inival + ax, ax) print("Value at true μ:", dat.apply((μ_true, auxtrue)) + ax) # auxinit = np.c_[ # np.zeros_like(k), # np.zeros_like(c1), # np.zeros_like(c2), # ] # print(np.shape(auxinit)) # We need Θ² such that ⟨F'(μ+Δ, aux-F'(μ, aux)|Δ⟩ ≤ Θ²|γ|(c_2), where Δ=(π_♯^1-π_♯^0)γ. # We have # # ⟨F'(μ+Δ, aux)-F'(μ, aux)|Δ⟩ ≤ ‖⟨F'(μ+Δ, aux)-F'(μ, aux)‖ ‖Δ‖ # ≤ L_{F'(·, aux)} ‖Δ‖², # # so Θ² ≥ L_{F'(·, aux)} ‖Δ‖² / |γ|(c_2). Thsi doesn't give anything useful if the # transport is very small in distance. dat.curvature_bound_components = lambda: (None, None) # 1.0 return Problem(dat, reg, aux, aux0, μ0, plot_factory=plot_factory)