experiments/laser_and_mirrors_aux.py

Wed, 22 Apr 2026 23:46:40 -0500

author
Tuomo Valkonen <tuomov@iki.fi>
date
Wed, 22 Apr 2026 23:46:40 -0500
changeset 6
64bd740b12ed
parent 3
c3a4f4bb87f7
permissions
-rw-r--r--

Add packaging script, remove alg_tools, measures, and pointsource_pde installation instruction from README.

#
# Laser and mirrors convection-diffusion experiment with unknown diffusivity and convectivity;
# Includes `generic_setup` for the known case as well.
#

import numpy as np
from dolfinx import fem
from measures import DiscreteMeasure_2_f64
from numpy.linalg import norm
from pointsource_pde import Problem, RegTerm
from pointsource_pde.compose import (
    ComposeFnWithOperator,
    SumOfSeparableFunctions,
)
from pointsource_pde.convection_diffusion import (
    BoxedQuadraticRegularisation,
    ConvectionDiffusion,
    PlotFactory,
    XBound,
)
from pointsource_pde.laser_sampling import LaserSampling
from pointsource_pde.quadratic_dataterm import QuadraticDataTerm

# Give name to the problem
name = "laser_and_mirrors_aux"

# Fine-tune algorithms
alg_finetune = {
    "tau0": 0.99,
    "theta0": 100,
    "inner_method": "PP",
    "inner_pdps_τσ0": (19.8, 0.05),
    "inner_pp_τ": [1000.0, 1000.0],
    "merge": True,
    "fitness_merging": True,
    "merge_radius": 0.1,
    "merge_interp": False,
}

algorithm_overrides = {
    "RadonFB": alg_finetune,
    "RadonSlidingFB": alg_finetune,
    "RadonPDPS": alg_finetune,
    "RadonSlidingPDPS": alg_finetune,
}


# Setup shared between experiment with known/unknown conductivity and diffusivity
def generic_setup(
    prefix,
    simple=True,
    rng=None,
    μ_true=DiscreteMeasure_2_f64(
        [([0.1, 0.3], 0.08), ([0.4, 0.25], 0.05), ([0.25, 0.13], 0.06)]
    ),
    k=0.01,
    r=0.5,
    θ=30 * np.pi / 180,
):

    # Create convection-diffusion PDE
    pde = ConvectionDiffusion(
        u0=lambda x: 0.0 * x[1],
        g=lambda x: 0.0 * x[1],
        domain_size=[0, 0.5, 0, 0.5],
        num_steps=50,
        # override_lipschitz=4.0,
        # override_lipschitz_pair=(4.0, 40.0),
    )

    # Set up lasers and mirrors
    lasers = [
        np.array([0.1, 0.1]),
        np.array([0.1, 0.4]),
        np.array([0.4, 0.1]),
        np.array([0.4, 0.1]),
    ]
    mirrors_per_side = 10
    x0, x1, y0, y1 = (
        pde.domain_size[0],
        pde.domain_size[1],
        pde.domain_size[2],
        pde.domain_size[3],
    )
    beams = [
        (src, np.array(dst))
        for src in lasers
        for i in range(0, mirrors_per_side)
        for z in [(i + 1) / (mirrors_per_side + 1)]
        for dst in [
            [z * (x1 - x0) + x0, y0],
            [z * (x1 - x0) + x0, y1],
            [x0, z * (y1 - y0) + y0],
            [x1, z * (y1 - y0) + y0],
        ]
    ]
    L_i = LaserSampling(pde.V, pde.domain, pde.nx, pde.ny, beams)

    v0 = 1.0

    if simple:
        # Scalar convectivity and diffusivity
        c1 = r * np.cos(θ)
        c2 = r * np.sin(θ)
        # k = 0.001  # high wind
        print("True aux variables: k = %f, c1 = %f, c2 = %f" % (k, c1, c2))
    else:
        # Convectivity and diffusivity fields
        def velocity_x(x):
            r = np.sqrt(x[0] ** 2 + x[1] ** 2 + 1e-10)
            return v0 * x[0] / r

        def velocity_y(x):
            r = np.sqrt(x[0] ** 2 + x[1] ** 2 + 1e-10)
            return v0 * x[1] / r

        c1 = fem.Function(pde.V)
        c1.interpolate(velocity_x)
        c2 = fem.Function(pde.V)
        c2.interpolate(velocity_y)
        k = fem.Function(pde.V)
        k0 = 1.0
        k.interpolate(lambda x: k0 + 0.2 * np.sin(5 * x[0]) * np.cos(5 * x[1]))  # k is

    auxtrue = (k, (c1, c2))

    # Simulate measurements
    u_n_list = pde.apply((μ_true, (k, (c1, c2))))
    b_true = [L_i.apply(u) for u in u_n_list]
    b = [
        b_i + L_i.noise(0.01 * norm(b_i) / np.sqrt(np.size(b_i)), rng) for b_i in b_true
    ]

    # Estimate bounds on domains and values
    dat_bound = 2 * max([norm(b_i) for b_i in b])
    μ_bound = 1.3 * sum([alpha for _point, alpha in μ_true.iter_padded()])
    pde.xbound = XBound(mu_dual=μ_bound, k_min=k * 0.9, k_max=k * 1.1, c_Linf=r * 0.9)

    # Create data term
    dat = ComposeFnWithOperator(
        SumOfSeparableFunctions(
            [
                QuadraticDataTerm(
                    L_i,
                    data,
                    xbound=dat_bound,
                    b_norm=norm(b),
                    λ=pde.dt / len(beams),
                )
                for data in b
            ],
        ),
        pde,
    )

    # Save true and initial data
    plot_factory = PlotFactory(pde)
    plotter = plot_factory.produce(prefix)
    ω, _ = dat.diff((DiscreteMeasure_2_f64([]), auxtrue))
    ω_true, _ = dat.diff((μ_true, auxtrue))
    np.savez_compressed(
        "%s/omega_0.npz" % prefix,
        true_mu=[[point[0], point[1], alpha] for point, alpha in μ_true.iter_padded()],
        omega0=ω.x.array,
        true_omega=ω_true.x.array,
        k=k,
        c1=c1,
        c2=c2,
        true_u_n_list_array=[u_n_list[i].x.array for i in plotter.plot_frames],
    )

    return dat, auxtrue, μ_bound, μ_true, plot_factory, pde


def relnoise(v, σ, rng, level=None):
    if level is None:
        level = abs(v)
    return float(rng.normal(v, σ * level))


# Setup the problem
def setup(prefix):
    rng = np.random.default_rng(seed=31337)

    dat, auxtrue, _μ_bound, μ_true, plot_factory, pde = generic_setup(prefix, rng=rng)

    # Override Lipschitz parameter
    pde.override_lipschitz = (4.0,)
    pde.override_lipschitz_pair = (4.0, 4.0)
    # print("diff_chain_lipschitz_factor (modified)", pde.diff_chain_lipschitz_factor())
    # l1, l2 = pde.diff_chain_lipschitz_factor_pair()
    # print("diff_chain_lipschitz_factor_pair (modified) ", l1, ", ", l2)

    reg = RegTerm.NonnegRadon(1.5e-6)

    μ0 = DiscreteMeasure_2_f64([])
    (k, (c1, c2)) = auxtrue
    aux0 = (
        max(0.001, relnoise(k, 0.02, rng)),
        (
            relnoise(c1, 0.2, rng),
            relnoise(c2, 0.2, rng),
        ),
    )
    print("aux init ", aux0)
    aux = BoxedQuadraticRegularisation((0.001, -1.0), (1.0, 1.0), (3.0, 0.0005), aux0)
    ax = aux.apply(aux0)
    inival = dat.apply((μ0, aux0))
    print("Initial data term value:", inival + ax)
    print("Data term value at true μ:", dat.apply((μ_true, auxtrue)) + ax)

    # No curvature bound given: θ is aboslute.
    dat.curvature_bound_components = lambda: (None, None)

    return Problem(dat, reg, aux, aux0, μ0, plot_factory=plot_factory)

mercurial