Fri, 08 May 2026 17:28:21 -0500
Change README title
# # Laser and mirrors convection-diffusion experiment with unknown diffusivity and convectivity; # Includes `generic_setup` for the known case as well. # import numpy as np from dolfinx import fem from measures import DiscreteMeasure_2_f64 from numpy.linalg import norm from pointsource_pde import Problem, RegTerm from pointsource_pde.compose import ( ComposeFnWithOperator, SumOfSeparableFunctions, ) from pointsource_pde.convection_diffusion import ( BoxedQuadraticRegularisation, ConvectionDiffusion, PlotFactory, XBound, ) from pointsource_pde.laser_sampling import LaserSampling from pointsource_pde.quadratic_dataterm import QuadraticDataTerm # Give name to the problem name = "laser_and_mirrors_aux" # Fine-tune algorithms alg_finetune = { "tau0": 0.99, "theta0": 100, "inner_method": "PP", "inner_pdps_τσ0": (19.8, 0.05), "inner_pp_τ": [1000.0, 1000.0], "merge": True, "fitness_merging": True, "merge_radius": 0.1, "merge_interp": False, } algorithm_overrides = { "RadonFB": alg_finetune, "RadonSlidingFB": alg_finetune, "RadonPDPS": alg_finetune, "RadonSlidingPDPS": alg_finetune, } # Setup shared between experiment with known/unknown conductivity and diffusivity def generic_setup( prefix, simple=True, rng=None, μ_true=DiscreteMeasure_2_f64( [([0.1, 0.3], 0.08), ([0.4, 0.25], 0.05), ([0.25, 0.13], 0.06)] ), k=0.01, r=0.5, θ=30 * np.pi / 180, ): # Create convection-diffusion PDE pde = ConvectionDiffusion( u0=lambda x: 0.0 * x[1], g=lambda x: 0.0 * x[1], domain_size=[0, 0.5, 0, 0.5], num_steps=50, # override_lipschitz=4.0, # override_lipschitz_pair=(4.0, 40.0), ) # Set up lasers and mirrors lasers = [ np.array([0.1, 0.1]), np.array([0.1, 0.4]), np.array([0.4, 0.1]), np.array([0.4, 0.1]), ] mirrors_per_side = 10 x0, x1, y0, y1 = ( pde.domain_size[0], pde.domain_size[1], pde.domain_size[2], pde.domain_size[3], ) beams = [ (src, np.array(dst)) for src in lasers for i in range(0, mirrors_per_side) for z in [(i + 1) / (mirrors_per_side + 1)] for dst in [ [z * (x1 - x0) + x0, y0], [z * (x1 - x0) + x0, y1], [x0, z * (y1 - y0) + y0], [x1, z * (y1 - y0) + y0], ] ] L_i = LaserSampling(pde.V, pde.domain, pde.nx, pde.ny, beams) v0 = 1.0 if simple: # Scalar convectivity and diffusivity c1 = r * np.cos(θ) c2 = r * np.sin(θ) # k = 0.001 # high wind print("True aux variables: k = %f, c1 = %f, c2 = %f" % (k, c1, c2)) else: # Convectivity and diffusivity fields def velocity_x(x): r = np.sqrt(x[0] ** 2 + x[1] ** 2 + 1e-10) return v0 * x[0] / r def velocity_y(x): r = np.sqrt(x[0] ** 2 + x[1] ** 2 + 1e-10) return v0 * x[1] / r c1 = fem.Function(pde.V) c1.interpolate(velocity_x) c2 = fem.Function(pde.V) c2.interpolate(velocity_y) k = fem.Function(pde.V) k0 = 1.0 k.interpolate(lambda x: k0 + 0.2 * np.sin(5 * x[0]) * np.cos(5 * x[1])) # k is auxtrue = (k, (c1, c2)) # Simulate measurements u_n_list = pde.apply((μ_true, (k, (c1, c2)))) b_true = [L_i.apply(u) for u in u_n_list] b = [ b_i + L_i.noise(0.01 * norm(b_i) / np.sqrt(np.size(b_i)), rng) for b_i in b_true ] # Estimate bounds on domains and values dat_bound = 2 * max([norm(b_i) for b_i in b]) μ_bound = 1.3 * sum([alpha for _point, alpha in μ_true.iter_padded()]) pde.xbound = XBound(mu_dual=μ_bound, k_min=k * 0.9, k_max=k * 1.1, c_Linf=r * 0.9) # Create data term dat = ComposeFnWithOperator( SumOfSeparableFunctions( [ QuadraticDataTerm( L_i, data, xbound=dat_bound, b_norm=norm(b), λ=pde.dt / len(beams), ) for data in b ], ), pde, ) # Save true and initial data plot_factory = PlotFactory(pde) plotter = plot_factory.produce(prefix) ω, _ = dat.diff((DiscreteMeasure_2_f64([]), auxtrue)) ω_true, _ = dat.diff((μ_true, auxtrue)) np.savez_compressed( "%s/omega_0.npz" % prefix, true_mu=[[point[0], point[1], alpha] for point, alpha in μ_true.iter_padded()], omega0=ω.x.array, true_omega=ω_true.x.array, k=k, c1=c1, c2=c2, true_u_n_list_array=[u_n_list[i].x.array for i in plotter.plot_frames], ) return dat, auxtrue, μ_bound, μ_true, plot_factory, pde def relnoise(v, σ, rng, level=None): if level is None: level = abs(v) return float(rng.normal(v, σ * level)) # Setup the problem def setup(prefix): rng = np.random.default_rng(seed=31337) dat, auxtrue, _μ_bound, μ_true, plot_factory, pde = generic_setup(prefix, rng=rng) # Override Lipschitz parameter pde.override_lipschitz = (4.0,) pde.override_lipschitz_pair = (4.0, 4.0) # print("diff_chain_lipschitz_factor (modified)", pde.diff_chain_lipschitz_factor()) # l1, l2 = pde.diff_chain_lipschitz_factor_pair() # print("diff_chain_lipschitz_factor_pair (modified) ", l1, ", ", l2) reg = RegTerm.NonnegRadon(1.5e-6) μ0 = DiscreteMeasure_2_f64([]) (k, (c1, c2)) = auxtrue aux0 = ( max(0.001, relnoise(k, 0.02, rng)), ( relnoise(c1, 0.2, rng), relnoise(c2, 0.2, rng), ), ) print("aux init ", aux0) aux = BoxedQuadraticRegularisation((0.001, -1.0), (1.0, 1.0), (3.0, 0.0005), aux0) ax = aux.apply(aux0) inival = dat.apply((μ0, aux0)) print("Initial data term value:", inival + ax) print("Data term value at true μ:", dat.apply((μ_true, auxtrue)) + ax) # No curvature bound given: θ is aboslute. dat.curvature_bound_components = lambda: (None, None) return Problem(dat, reg, aux, aux0, μ0, plot_factory=plot_factory)