experiments/laser_and_mirrors_aux.py

Fri, 16 Jan 2026 19:41:32 -0500

author
Tuomo Valkonen <tuomov@iki.fi>
date
Fri, 16 Jan 2026 19:41:32 -0500
changeset 2
69002abe5dcb
parent 1
a4137aedcb3a
child 3
c3a4f4bb87f7
permissions
-rw-r--r--

pointsource_algs step length estimation support

#
# Laser and mirrors convection-diffusion experiment with unknown diffusivity and convectivity;
# Includes `generic_setup` for the known case as well.
#

import numpy as np
from dolfinx import fem
from measures import DiscreteMeasure_2_f64
from numpy.linalg import norm
from pointsource_pde import Problem, RegTerm
from pointsource_pde.compose import (
    ComposeFnWithOperator,
    SumOfSeparableFunctions,
)
from pointsource_pde.convection_diffusion import (
    ConvectionDiffusion,
    PlotFactory,
    QuadraticRegularisation,
    XBound,
)
from pointsource_pde.laser_sampling import LaserSampling
from pointsource_pde.quadratic_dataterm import QuadraticDataTerm

# Give name to the problem
name = "laser_and_mirrors_aux"

# Fine-tune algorithms
alg_finetune = {
    "tau0": 0.99,
    "theta0": 0.01,
    "inner_method": "PP",
    "inner_pdps_τσ0": (19.8, 0.05),
    "inner_pp_τ": [1000.0, 1000.0],
    "merge": True,
    "fitness_merging": True,
}

algorithm_overrides = {
    "RadonFB": alg_finetune,
    "RadonSlidingFB": alg_finetune,
    "RadonPDPS": alg_finetune,
    "RadonSlidingPDPS": alg_finetune,
}


# Setup shared between experiment with known/unknown conductivity and diffusivity
def generic_setup(prefix, simple=True):

    # Create true source
    μ_true = DiscreteMeasure_2_f64(
        [([0.1, 0.3], 0.08), ([0.4, 0.25], 0.05), ([0.25, 0.13], 0.06)]
    )

    # Create convection-diffusion PDE
    pde = ConvectionDiffusion(
        u0=lambda x: 0.0 * x[1],
        g=lambda x: 0.0 * x[1],
        domain_size=[0, 0.5, 0, 0.5],
        num_steps=50,
    )

    # Set up lasers and mirrors
    lasers = [
        np.array([0.1, 0.1]),
        np.array([0.1, 0.4]),
        np.array([0.4, 0.1]),
        np.array([0.4, 0.1]),
    ]
    mirrors_per_side = 10
    x0, x1, y0, y1 = (
        pde.domain_size[0],
        pde.domain_size[1],
        pde.domain_size[2],
        pde.domain_size[3],
    )
    beams = [
        (src, np.array(dst))
        for src in lasers
        for i in range(0, mirrors_per_side)
        for z in [(i + 1) / (mirrors_per_side + 1)]
        for dst in [
            [z * (x1 - x0) + x0, y0],
            [z * (x1 - x0) + x0, y1],
            [x0, z * (y1 - y0) + y0],
            [x1, z * (y1 - y0) + y0],
        ]
    ]
    L_i = LaserSampling(pde.V, pde.domain, pde.nx, pde.ny, beams)

    v0 = 1.0

    if simple:
        # Scalar convectivity and diffusivity
        r = 0.5
        θ = 30 * np.pi / 180
        c1 = r * np.cos(θ)
        c2 = r * np.sin(θ)
        k = 0.01  # diffusive
        # k = 0.001  # high wind
    else:
        # Convectivity and diffusivity fields
        def velocity_x(x):
            r = np.sqrt(x[0] ** 2 + x[1] ** 2 + 1e-10)
            return v0 * x[0] / r

        def velocity_y(x):
            r = np.sqrt(x[0] ** 2 + x[1] ** 2 + 1e-10)
            return v0 * x[1] / r

        c1 = fem.Function(pde.V)
        c1.interpolate(velocity_x)
        c2 = fem.Function(pde.V)
        c2.interpolate(velocity_y)
        k = fem.Function(pde.V)
        k0 = 1.0
        k.interpolate(lambda x: k0 + 0.2 * np.sin(5 * x[0]) * np.cos(5 * x[1]))  # k is

    auxtrue = (k, (c1, c2))

    # Simulate measurements
    u_n_list = pde.apply((μ_true, (k, (c1, c2))))
    b_true = [L_i.apply(u) for u in u_n_list]
    b = [b_i + L_i.noise(0.01 * norm(b_i) / np.sqrt(np.size(b_i))) for b_i in b_true]

    # Estimate bounds on domains and values
    dat_bound = 10 * max([norm(b_i) for b_i in b])
    μ_bound = 1.3 * sum([alpha for _point, alpha in μ_true.iter_padded()])
    xbound = XBound(mu_dual=μ_bound, k_min=k, k_max=k, c_Linf=r)

    # Create data term
    dat = ComposeFnWithOperator(
        SumOfSeparableFunctions(
            [
                QuadraticDataTerm(
                    L_i,
                    data,
                    xbound=dat_bound,
                    b_norm=norm(b),
                    λ=pde.dt / len(beams),
                )
                for data in b
            ]
        ),
        pde,
        xbound=[xbound],
        xbound_pair=[xbound],
    )

    # Plot true and initial data
    plot_factory = PlotFactory(pde)
    plotter = plot_factory.produce(prefix)
    ω, _ = dat.diff((DiscreteMeasure_2_f64([]), auxtrue))
    ω_true, _ = dat.diff((μ_true, auxtrue))
    np.savez_compressed(
        "%s/omega_0.npz" % prefix,
        true_mu=[[point[0], point[1], alpha] for point, alpha in μ_true.iter_padded()],
        omega0=ω.x.array,
        true_omega=ω_true.x.array,
        true_u_n_list_array=[u_n_list[i].x.array for i in plotter.plot_frames],
    )

    return dat, auxtrue, μ_bound, μ_true, plot_factory


def setup(prefix):
    dat, auxtrue, _μ_bound, μ_true, plot_factory = generic_setup(prefix)

    reg = RegTerm.NonnegRadon(10e-7)

    μ0 = DiscreteMeasure_2_f64([])
    aux0 = auxtrue
    aux = QuadraticRegularisation(0.001, aux0)
    ax = aux.apply(aux0)
    inival = dat.apply((μ0, aux0))
    print("Initial value:", inival + ax, ax)
    print("Value at true μ:", dat.apply((μ_true, auxtrue)) + ax)

    # auxinit = np.c_[
    #     np.zeros_like(k),
    #     np.zeros_like(c1),
    #     np.zeros_like(c2),
    # ]
    # print(np.shape(auxinit))

    # We need Θ² such that ⟨F'(μ+Δ, aux-F'(μ, aux)|Δ⟩ ≤ Θ²|γ|(c_2), where Δ=(π_♯^1-π_♯^0)γ.
    # We have
    #
    #   ⟨F'(μ+Δ, aux)-F'(μ, aux)|Δ⟩ ≤ ‖⟨F'(μ+Δ, aux)-F'(μ, aux)‖ ‖Δ‖
    #                               ≤ L_{F'(·, aux)} ‖Δ‖²,
    #
    # so Θ² ≥ L_{F'(·, aux)} ‖Δ‖² / |γ|(c_2). Thsi doesn't give anything useful if the
    # transport is very small in distance.

    dat.curvature_bound_components = lambda: (None, None)  # 1.0

    return Problem(dat, reg, aux, aux0, μ0, plot_factory=plot_factory)

mercurial