Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions simpeg_drivers/utils/regularization.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,24 +221,29 @@ def rotate_xy_3d(mesh: TreeMesh, phi: np.ndarray) -> ssp.csr_matrix:
return z_rotation_matrix(phi)


def get_cell_normals(n_cells: int, axis: str, outward: bool) -> np.ndarray:
def get_cell_normals(n_cells: int, axis: str, outward: bool, dim: int) -> np.ndarray:
"""
Returns cell normals for given axis and all cells.

:param n_cells: Number of cells in the mesh.
:param axis: Cartesian axis (one of 'x', 'y', or 'z'
:param outward: Direction of the normal. True for outward facing,
False for inward facing normals.
:param dim: Dimension of the mesh. Either 2 for drape model or 3
for octree.
"""

ind = 1 if outward else -1

if axis == "x":
normals = np.kron(np.ones(n_cells), np.c_[ind, 0, 0])
n = np.c_[ind, 0] if dim == 2 else np.c_[ind, 0, 0]
normals = np.kron(np.ones(n_cells), n)
elif axis == "y":
normals = np.kron(np.ones(n_cells), np.c_[0, ind, 0])
n = np.c_[0, ind] if dim == 2 else np.c_[0, ind, 0]
normals = np.kron(np.ones(n_cells), n)
elif axis == "z":
normals = np.kron(np.ones(n_cells), np.c_[0, 0, ind])
n = np.c_[0, ind] if dim == 2 else np.c_[0, 0, ind]
normals = np.kron(np.ones(n_cells), n)
else:
raise ValueError("Axis must be one of 'x', 'y', or 'z'.")

Expand Down Expand Up @@ -371,20 +376,31 @@ def rotated_gradient(
"""

n_cells = mesh.n_cells
dim = mesh.dim
if any(len(k) != n_cells for k in [dip, direction]):
raise ValueError(
"Input angle arrays are not the same size as the number of "
"cells in the mesh."
)

Rx = rotate_yz_3d(mesh, dip)
Rz = rotate_xy_3d(mesh, direction)
normals = get_cell_normals(n_cells, axis, forward)
rotated_normals = (Rz * (Rx * normals.T)).reshape(n_cells, mesh.dim)
volumes, neighbors = partial_volumes(mesh, neighbors, rotated_normals)
normals = get_cell_normals(n_cells, axis, forward, dim)
if dim == 3:
Rx = rotate_yz_3d(mesh, dip)
Rz = rotate_xy_3d(mesh, direction)
rotated_normals = (Rz * (Rx * normals.T)).reshape(n_cells, dim)
elif dim == 2:
Ry = rotate_xz_2d(mesh, dip)
rotated_normals = (Ry * normals.T).reshape(n_cells, dim)

volumes, neighbors = partial_volumes(
mesh,
neighbors,
rotated_normals, # pylint: disable=possibly-used-before-assignment
)

unit_grad = gradient_operator(neighbors, volumes, n_cells)
return sdiag(1 / mesh.h_gridded[:, "xyz".find(axis)]) @ unit_grad
axes = "xyz" if dim == 3 else "xz"
return sdiag(1 / mesh.h_gridded[:, axes.find(axis)]) @ unit_grad


def ensure_dip_direction_convention(
Expand Down
184 changes: 184 additions & 0 deletions tests/run_tests/driver_2d_rotated_gradients_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# '''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
# Copyright (c) 2025 Mira Geoscience Ltd. '
# '
# This file is part of simpeg-drivers package. '
# '
# simpeg-drivers is distributed under the terms and conditions of the MIT License '
# (see LICENSE file at the root of this source code package). '
# '
# '''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''

from __future__ import annotations

from pathlib import Path
from unittest.mock import patch

import numpy as np
from geoapps_utils.utils.importing import GeoAppsError
from geoh5py.groups.property_group import PropertyGroup
from geoh5py.workspace import Workspace
from pytest import raises

from simpeg_drivers.electricals.direct_current.two_dimensions.driver import (
DC2DForwardDriver,
DC2DInversionDriver,
)
from simpeg_drivers.electricals.direct_current.two_dimensions.options import (
DC2DForwardOptions,
DC2DInversionOptions,
)
from simpeg_drivers.options import (
ActiveCellsOptions,
DrapeModelOptions,
LineSelectionOptions,
)
from simpeg_drivers.utils.testing import check_target, setup_inversion_workspace
from simpeg_drivers.utils.utils import get_inversion_output


# To test the full run and validate the inversion.
# Move this file out of the test directory and run.

target_run = {"data_norm": 0.5963828772690819, "phi_d": 2870, "phi_m": 18.7}


def test_dc2d_rotated_grad_fwr_run(
tmp_path: Path,
n_electrodes=10,
n_lines=3,
refinement=(4, 6),
):
# Run the forward
geoh5, _, model, survey, topography = setup_inversion_workspace(
tmp_path,
background=0.01,
anomaly=10,
n_electrodes=n_electrodes,
n_lines=n_lines,
refinement=refinement,
inversion_type="dcip_2d",
drape_height=0.0,
flatten=False,
)
line_selection = LineSelectionOptions(
line_object=geoh5.get_entity("line_ids")[0],
line_id=101,
)
params = DC2DForwardOptions(
geoh5=geoh5,
data_object=survey,
line_selection=line_selection,
drape_model=DrapeModelOptions(
u_cell_size=5.0,
v_cell_size=5.0,
depth_core=100.0,
horizontal_padding=100.0,
vertical_padding=100.0,
expansion_factor=1.1,
),
starting_model=model,
active_cells=ActiveCellsOptions(topography_object=topography),
)
fwr_driver = DC2DForwardDriver(params)
fwr_driver.run()


def test_dc2d_rotated_grad_run(
tmp_path: Path,
max_iterations=1,
pytest=True,
):
workpath = tmp_path / "inversion_test.ui.geoh5"
if pytest:
workpath = (
tmp_path.parent
/ "test_dc2d_rotated_grad_fwr_run0"
/ "inversion_test.ui.geoh5"
)

with Workspace(workpath) as geoh5:
potential = geoh5.get_entity("Iteration_0_dc")[0]
topography = geoh5.get_entity("topography")[0]

orig_potential = potential.values.copy()
mesh = geoh5.get_entity("Models")[0]

# Create property group with orientation
dip = np.ones(mesh.n_cells) * 45
azimuth = np.ones(mesh.n_cells) * 90

data_list = mesh.add_data(
{
"azimuth": {"values": azimuth},
"dip": {"values": dip},
}
)
pg = PropertyGroup(
mesh, properties=data_list, property_group_type="Dip direction & dip"
)

# Run the inverse
params = DC2DInversionOptions(
geoh5=geoh5,
drape_model=DrapeModelOptions(
u_cell_size=5.0,
v_cell_size=5.0,
depth_core=100.0,
horizontal_padding=100.0,
vertical_padding=100.0,
expansion_factor=1.1,
),
active_cells=ActiveCellsOptions(topography_object=topography),
line_selection=LineSelectionOptions(
line_object=geoh5.get_entity("line_ids")[0],
line_id=101,
),
data_object=potential.parent,
gradient_rotation=pg,
potential_channel=potential,
potential_uncertainty=1e-3,
model_type="Resistivity (Ohm-m)",
starting_model=100.0,
reference_model=100.0,
s_norm=1.0,
x_norm=0.0,
z_norm=0.0,
gradient_type="components",
max_global_iterations=max_iterations,
initial_beta=None,
initial_beta_ratio=1e0,
percentile=100,
lower_bound=0.1,
cooling_rate=1,
)
params.write_ui_json(path=tmp_path / "Inv_run.ui.json")

driver = DC2DInversionDriver.start(str(tmp_path / "Inv_run.ui.json"))

with Workspace(driver.params.geoh5.h5file) as run_ws:
output = get_inversion_output(
driver.params.geoh5.h5file, driver.params.out_group.uid
)
output["data"] = orig_potential

if pytest:
check_target(output, target_run)
nan_ind = np.isnan(run_ws.get_entity("Iteration_0_model")[0].values)
inactive_ind = run_ws.get_entity("active_cells")[0].values == 0
assert np.all(nan_ind == inactive_ind)


if __name__ == "__main__":
# Full run
test_dc2d_rotated_grad_fwr_run(
Path("./"),
n_electrodes=20,
n_lines=3,
refinement=(4, 8),
)

test_dc2d_rotated_grad_run(
Path("./"),
max_iterations=20,
pytest=False,
)