diff --git a/src/festim/__init__.py b/src/festim/__init__.py index 0be6a96c6..b2095f861 100644 --- a/src/festim/__init__.py +++ b/src/festim/__init__.py @@ -26,6 +26,8 @@ ) from .exports.average_surface import AverageSurface from .exports.average_volume import AverageVolume +from .exports.custom_quantity import CustomQuantity +from .exports.derived_quantity import DerivedQuantity from .exports.maximum_surface import MaximumSurface from .exports.maximum_volume import MaximumVolume from .exports.minimum_surface import MinimumSurface diff --git a/src/festim/exports/__init__.py b/src/festim/exports/__init__.py index 28c309bca..ad3988734 100644 --- a/src/festim/exports/__init__.py +++ b/src/festim/exports/__init__.py @@ -1,5 +1,7 @@ from .average_surface import AverageSurface from .average_volume import AverageVolume +from .custom_quantity import CustomQuantity +from .derived_quantity import DerivedQuantity from .maximum_surface import MaximumSurface from .maximum_volume import MaximumVolume from .minimum_surface import MinimumSurface @@ -23,6 +25,8 @@ "AverageSurface", "AverageVolume", "CustomFieldExport", + "CustomQuantity", + "DerivedQuantity", "ExportBaseClass", "MaximumSurface", "MaximumVolume", diff --git a/src/festim/exports/custom_quantity.py b/src/festim/exports/custom_quantity.py new file mode 100644 index 000000000..4d1df7679 --- /dev/null +++ b/src/festim/exports/custom_quantity.py @@ -0,0 +1,131 @@ +from collections.abc import Callable + +import ufl +from dolfinx import fem +from scifem import assemble_scalar + +from festim.exports.derived_quantity import DerivedQuantity +from festim.subdomain.surface_subdomain import SurfaceSubdomain +from festim.subdomain.volume_subdomain import VolumeSubdomain + + +class CustomQuantity(DerivedQuantity): + r""" + Export CustomQuantity. + + Args: + expr: function that returns a UFL expression + subdomain: subdomain on which the quantity is evaluated + title: title of the exported quantity + filename: name of the file to which the quantity is exported + + Attributes: + expr: function that returns a UFL expression + subdomain: subdomain on which the quantity is evaluated + title: title of the exported quantity + filename: name of the file to which the quantity is exported + t: list of time values + data: list of values of the quantity + + Usage: + + .. testcode:: + + import numpy as np + import festim as F + + material = F.Material(D_0=1, E_D=0) + volume = F.VolumeSubdomain(id=1, material=material) + surface = F.SurfaceSubdomain(id=1, locator=lambda x: np.isclose(x[1], 1)) + + def total_concentration(**kwargs): + return kwargs["A"] + kwargs["B"] + + quantity = F.CustomQuantity( + expr=total_concentration, + subdomain=volume, + title="Total quantity", + ) + + surface_quantity = F.CustomQuantity( + expr=lambda **kwargs: -kwargs["D_A"] * ufl.dot( + ufl.grad(kwargs["A"]), kwargs["n"] + ), + subdomain=surface, + title="Surface flux", + ) + + The callable passed to ``expr`` receives keyword arguments assembled by the + problem class. Common entries are: + + ``A``, ``B``, ... + Concentrations of the species present in the problem (here A and B). + ``n`` + The facet normal on the selected surface subdomain. + ``T`` + The temperature field. + ``D_A``, ``D_B``, ... + Species-specific diffusion coefficients. + ``D`` + The diffusion coefficient data, either a single field for one species or + a dictionary keyed by species name when several species are present. + ``x`` + The spatial coordinate (x[0], x[1], x[2]). + + For a surface quantity, the returned UFL expression can represent a flux such as + + .. math:: + + q = -D\,\nabla c \cdot n + + and FESTIM will assemble + + .. math:: + + Q = \int_{\Gamma} q\,\mathrm{d}\Gamma + + over the selected surface subdomain. + + The expression returned by ``expr`` is treated as an integrand and assembled over + the chosen subdomain. + + .. math:: + + Q = \int_{\Omega} q\,\mathrm{d}\Omega + + where ``q`` is the UFL expression returned by ``expr`` and ``\Omega`` is either a + surface or a volume subdomain. + """ + + def __init__( + self, + expr: Callable, + subdomain: SurfaceSubdomain | VolumeSubdomain, + title: str = "Custom Quantity", + filename: str | None = None, + ) -> None: + super().__init__(filename=filename) + self.expr = expr + self.subdomain = subdomain + self._title = title + self.ufl_expr = None + + @property + def title(self): + return self._title + + def compute(self, measure: ufl.Measure, entity_maps: dict | None = None): + """Computes the value of the custom quantity and appends it to the data list. + + Args: + measure: volume or surface measure of the model + entity_maps: entity maps relating parent mesh and submesh + """ + if self.ufl_expr is None: + raise ValueError("The UFL expression has not been evaluated yet.") + + form = fem.form( + self.ufl_expr * measure(self.subdomain.id), entity_maps=entity_maps + ) + self.value = assemble_scalar(form) + self.data.append(self.value) diff --git a/src/festim/exports/derived_quantity.py b/src/festim/exports/derived_quantity.py new file mode 100644 index 000000000..8b2625209 --- /dev/null +++ b/src/festim/exports/derived_quantity.py @@ -0,0 +1,60 @@ +import csv +from abc import ABC, abstractmethod + + +class DerivedQuantity(ABC): + """Base class for all derived quantities. + + Attributes: + filename: name of the file to which the quantity is exported + t: list of time values + data: list of values of the quantity + """ + + filename: str | None + t: list[float] + data: list[float] + + def __init__(self, filename: str | None = None) -> None: + self.filename = filename + self.t = [] + self.data = [] + self._first_time_export = True + + @property + @abstractmethod + def title(self): + pass + + @abstractmethod + def compute(self, *args, **kwargs): + pass + + @property + def filename(self): + return self._filename + + @filename.setter + def filename(self, value): + if value is None: + self._filename = None + elif not isinstance(value, str): + raise TypeError("filename must be of type str") + elif not value.endswith(".csv") and not value.endswith(".txt"): + raise ValueError("filename must end with .csv or .txt") + self._filename = value + + def write(self, t): + """If the filename doesnt exist yet, create it and write the header, then append + the time and value to the file.""" + + if self.filename is not None: + if self._first_time_export: + header = ["t(s)", f"{self.title}"] + with open(self.filename, mode="w+", newline="") as file: + writer = csv.writer(file) + writer.writerow(header) + self._first_time_export = False + with open(self.filename, mode="a", newline="") as file: + writer = csv.writer(file) + writer.writerow([t, self.value]) diff --git a/src/festim/exports/maximum_surface.py b/src/festim/exports/maximum_surface.py index 1a29f5d7e..a65ca92be 100644 --- a/src/festim/exports/maximum_surface.py +++ b/src/festim/exports/maximum_surface.py @@ -19,6 +19,8 @@ class MaximumSurface(SurfaceQuantity): see `festim.SurfaceQuantity` """ + facet_meshtags: dolfinx.mesh.MeshTags + @property def title(self): return f"Maximum {self.field.name} surface {self.surface.id}" diff --git a/src/festim/exports/maximum_volume.py b/src/festim/exports/maximum_volume.py index d3bb257e3..2e3aa35a5 100644 --- a/src/festim/exports/maximum_volume.py +++ b/src/festim/exports/maximum_volume.py @@ -19,6 +19,8 @@ class MaximumVolume(VolumeQuantity): see `festim.VolumeQuantity` """ + volume_meshtags: dolfinx.mesh.MeshTags + @property def title(self): return f"Maximum {self.field.name} volume {self.volume.id}" diff --git a/src/festim/exports/minimum_surface.py b/src/festim/exports/minimum_surface.py index 1d2e934d5..468532c5f 100644 --- a/src/festim/exports/minimum_surface.py +++ b/src/festim/exports/minimum_surface.py @@ -19,6 +19,8 @@ class MinimumSurface(SurfaceQuantity): see `festim.SurfaceQuantity` """ + facet_meshtags: dolfinx.mesh.MeshTags + @property def title(self): return f"Minimum {self.field.name} surface {self.surface.id}" diff --git a/src/festim/exports/minimum_volume.py b/src/festim/exports/minimum_volume.py index e17ce5aff..a2cfc560c 100644 --- a/src/festim/exports/minimum_volume.py +++ b/src/festim/exports/minimum_volume.py @@ -19,6 +19,8 @@ class MinimumVolume(VolumeQuantity): see `festim.VolumeQuantity` """ + volume_meshtags: dolfinx.mesh.MeshTags + @property def title(self): return f"Minimum {self.field.name} volume {self.volume.id}" diff --git a/src/festim/exports/surface_quantity.py b/src/festim/exports/surface_quantity.py index 515ab368e..1c2d0b06e 100644 --- a/src/festim/exports/surface_quantity.py +++ b/src/festim/exports/surface_quantity.py @@ -1,10 +1,11 @@ -import csv +from abc import abstractmethod +from festim.exports.derived_quantity import DerivedQuantity from festim.species import Species from festim.subdomain.surface_subdomain import SurfaceSubdomain -class SurfaceQuantity: +class SurfaceQuantity(DerivedQuantity): """Export SurfaceQuantity. Args: @@ -28,29 +29,23 @@ class SurfaceQuantity: data: list[float] def __init__( - self, field: Species, surface: SurfaceSubdomain, filename: str | None = None + self, + field: Species | str, + surface: SurfaceSubdomain | int, + filename: str | None = None, ) -> None: + super().__init__(filename=filename) self.field = field self.surface = surface - self.filename = filename - - self.t = [] - self.data = [] - self._first_time_export = True @property - def filename(self): - return self._filename - - @filename.setter - def filename(self, value): - if value is None: - self._filename = None - elif not isinstance(value, str): - raise TypeError("filename must be of type str") - elif not value.endswith(".csv") and not value.endswith(".txt"): - raise ValueError("filename must end with .csv or .txt") - self._filename = value + @abstractmethod + def title(self): + pass + + @abstractmethod + def compute(self, *args, **kwargs): + pass @property def surface(self): @@ -60,6 +55,7 @@ def surface(self): def surface(self, value): if not isinstance(value, int | SurfaceSubdomain) or isinstance(value, bool): raise TypeError("surface should be an int or F.SurfaceSubdomain") + self._surface = value @property @@ -68,23 +64,7 @@ def field(self): @field.setter def field(self, value): - # check that field is festim.Species if not isinstance(value, Species | str): - raise TypeError("field must be of type festim.Species") + raise TypeError("field must be of type F.Species or str") self._field = value - - def write(self, t): - """If the filename doesnt exist yet, create it and write the header, then append - the time and value to the file.""" - - if self.filename is not None: - if self._first_time_export: - header = ["t(s)", f"{self.title}"] - with open(self.filename, mode="w+", newline="") as file: - writer = csv.writer(file) - writer.writerow(header) - self._first_time_export = False - with open(self.filename, mode="a", newline="") as file: - writer = csv.writer(file) - writer.writerow([t, self.value]) diff --git a/src/festim/exports/volume_quantity.py b/src/festim/exports/volume_quantity.py index 1daaaac6a..7ebcace4c 100644 --- a/src/festim/exports/volume_quantity.py +++ b/src/festim/exports/volume_quantity.py @@ -1,10 +1,11 @@ -import csv +from abc import abstractmethod +from festim.exports.derived_quantity import DerivedQuantity from festim.species import Species from festim.subdomain.volume_subdomain import VolumeSubdomain -class VolumeQuantity: +class VolumeQuantity(DerivedQuantity): """Export VolumeQuantity. Args: @@ -27,28 +28,24 @@ class VolumeQuantity: t: list[float] data: list[float] - def __init__(self, field, volume, filename: str | None = None) -> None: + def __init__( + self, + field: Species | str, + volume: VolumeSubdomain | int, + filename: str | None = None, + ) -> None: + super().__init__(filename=filename) self.field = field self.volume = volume - self.filename = filename - - self.t = [] - self.data = [] - self._first_time_export = True @property - def filename(self): - return self._filename + @abstractmethod + def title(self): + pass - @filename.setter - def filename(self, value): - if value is None: - self._filename = None - elif not isinstance(value, str): - raise TypeError("filename must be of type str") - elif not value.endswith(".csv") and not value.endswith(".txt"): - raise ValueError("filename must end with .csv or .txt") - self._filename = value + @abstractmethod + def compute(self, *args, **kwargs): + pass @property def field(self): @@ -56,23 +53,7 @@ def field(self): @field.setter def field(self, value): - # check that field is festim.Species if not isinstance(value, Species | str): - raise TypeError("field must be of type festim.Species") + raise TypeError("field must be of type F.Species or str") self._field = value - - def write(self, t): - """If the filename doesnt exist yet, create it and write the header, then append - the time and value to the file.""" - - if self.filename is not None: - if self._first_time_export: - header = ["t(s)", f"{self.title}"] - with open(self.filename, mode="w+", newline="") as file: - writer = csv.writer(file) - writer.writerow(header) - self._first_time_export = False - with open(self.filename, mode="a", newline="") as file: - writer = csv.writer(file) - writer.writerow([t, self.value]) diff --git a/src/festim/hydrogen_transport_problem.py b/src/festim/hydrogen_transport_problem.py index 2a83bd75a..7b181366e 100644 --- a/src/festim/hydrogen_transport_problem.py +++ b/src/festim/hydrogen_transport_problem.py @@ -135,6 +135,8 @@ class HydrogenTransportProblem(problem.ProblemBase): """ _temperature_as_function: fem.Function + _species_to_D_global: dict[_species.Species, fem.Function] + _species_to_D_global_expr: dict[_species.Species, fem.Expression] def __init__( self, @@ -190,6 +192,8 @@ def __init__( self._element_immobile = element_immobile self._temperature_as_function = None + self._species_to_D_global = None + self._species_to_D_global_expr = None @property def temperature(self): @@ -463,7 +467,7 @@ def initialise_exports(self): ) continue - elif isinstance(export, exports.SurfaceQuantity | exports.VolumeQuantity): + elif isinstance(export, exports.DerivedQuantity): # raise not implemented error if the derived quantity don't match the # type of mesh eg. SurfaceFlux is used with cylindrical mesh if self.mesh.coordinate_system != CoordinateSystem.CARTESIAN: @@ -473,16 +477,17 @@ def initialise_exports(self): ) # if name of species is given then replace with species object - if isinstance(export.field, list): - for idx, field in enumerate(export.field): - if isinstance(field, str): - export.field[idx] = _species.find_species_from_name( - field, self.species - ) - elif isinstance(export.field, str): - export.field = _species.find_species_from_name( - export.field, self.species - ) + if hasattr(export, "field"): + if isinstance(export.field, list): + for idx, field in enumerate(export.field): + if isinstance(field, str): + export.field[idx] = _species.find_species_from_name( + field, self.species + ) + elif isinstance(export.field, str): + export.field = _species.find_species_from_name( + export.field, self.species + ) # Initialize XDMFFile for writer if isinstance(export, exports.XDMFExport): @@ -494,38 +499,53 @@ def initialise_exports(self): export.t = [] # compute diffusivity function for surface fluxes - spe_to_D_global = {} # links species to global D function - spe_to_D_global_expr = {} # links species to D expression + # TODO: probably a better way to handle things would be to follow what's done in + # https://jsdokken.com/dolfinx-tutorial/chapter3/subdomains.html + spe_to_D_global_func_expr = { + spe: self.define_D_global(spe) for spe in self.species if spe.mobile + } + self._species_to_D_global_expr = { + k: v[1] for k, v in spe_to_D_global_func_expr.items() + } # links species to D expression + self._species_to_D_global = { + k: v[0] for k, v in spe_to_D_global_func_expr.items() + } # links species to global D function for export in self.exports: if isinstance(export, exports.SurfaceQuantity): - if export.field in spe_to_D_global: - # if already computed then use the same D - D = spe_to_D_global[export.field] - D_expr = spe_to_D_global_expr[export.field] - else: - # compute D and add it to the dict - D, D_expr = self.define_D_global(export.field) - spe_to_D_global[export.field] = D - spe_to_D_global_expr[export.field] = D_expr - # add the global D to the export - export.D = D - export.D_expr = D_expr - if isinstance( - export, - exports.MaximumVolume - | exports.MaximumSurface - | exports.MinimumVolume - | exports.MinimumSurface, - ): + export.D = self._species_to_D_global.get(export.field) + export.D_expr = self._species_to_D_global_expr.get(export.field) + if isinstance(export, exports.MaximumVolume | exports.MinimumVolume): export.volume_meshtags = self.volume_meshtags + if isinstance(export, exports.MaximumSurface | exports.MinimumSurface): export.facet_meshtags = self.facet_meshtags + # reset the data and time for SurfaceQuantity and VolumeQuantity - if isinstance(export, exports.SurfaceQuantity | exports.VolumeQuantity): + if isinstance(export, exports.DerivedQuantity): export.t = [] export.data = [] + if isinstance(export, exports.CustomQuantity): + kwargs = { + species.name: species.post_processing_solution + for species in self.species + } + kwargs["n"] = ufl.FacetNormal(self.mesh.mesh) + kwargs["t"] = self.t + kwargs["T"] = self.temperature_fenics + + # NOTE we need to change our D_global approach + D_kwargs = { + f"D_{sp.name}": self._species_to_D_global[sp] for sp in self.species + } + kwargs.update(D_kwargs) + kwargs["D"] = {sp.name: D_kwargs[f"D_{sp.name}"] for sp in self.species} + if len(self.species) == 1: + kwargs["D"] = kwargs[f"D_{self.species[0].name}"] + kwargs["x"] = ufl.SpatialCoordinate(self.mesh.mesh) + export.ufl_expr = export.expr(**kwargs) + def _get_temperature_field_as_function(self) -> dolfinx.fem.Function: """Based on the type of the temperature_fenics attribute, converts it as a Function to be used in VTX export. @@ -959,13 +979,11 @@ def post_processing(self): if self.temperature_time_dependent: # update global D if temperature time dependent or internal # variables time dependent - species_not_updated = self.species.copy() # make a copy of the species - for export in self.exports: - if isinstance(export, exports.SurfaceFlux): - # if the D of the species has not been updated yet - if export.field in species_not_updated: - export.D.interpolate(export.D_expr) - species_not_updated.remove(export.field) + # TODO: honestly, we probably don't need to do this at all + # SurfaceFlux quantities should use ufl.Expr for D instead of a fem.Function + + for spe, D_global in self._species_to_D_global.items(): + D_global.interpolate(self._species_to_D_global_expr[spe]) for export in self.exports: # skip if it isn't time to export @@ -1032,6 +1050,18 @@ def post_processing(self): # if filename given write export data to file if export.filename is not None: export.write(t=float(self.t)) + elif isinstance(export, exports.CustomQuantity): + is_surface = isinstance(export.subdomain, _subdomain.SurfaceSubdomain) + measure = self.ds if is_surface else self.dx + export.compute(measure) + + # update export data + export.t.append(float(self.t)) + + # if filename given write export data to file + if export.filename is not None: + export.write(t=float(self.t)) + if isinstance(export, exports.XDMFExport): export.write(float(self.t)) @@ -1728,10 +1758,46 @@ def initialise_exports(self): export.D = D # reset the data and time for SurfaceQuantity and VolumeQuantity - if isinstance(export, exports.SurfaceQuantity | exports.VolumeQuantity): + if isinstance(export, exports.DerivedQuantity): export.t = [] export.data = [] + if isinstance(export, exports.CustomQuantity): + volume = ( + export.subdomain + if not isinstance(export.subdomain, _subdomain.SurfaceSubdomain) + else self.surface_to_volume[ + export.subdomain + if isinstance(export.subdomain, _subdomain.SurfaceSubdomain) + else next( + s + for s in self.surface_subdomains + if s.id == export.subdomain + ) + ] + ) + + kwargs = { + species.name: species.subdomain_to_post_processing_solution[volume] + for species in self.species + } + kwargs["n"] = ufl.FacetNormal(self.mesh.mesh) + kwargs["t"] = self.t + kwargs["T"] = self.temperature_fenics + + D_kwargs = { + f"D_{sp.name}": volume.material.get_diffusion_coefficient( + self.mesh.mesh, self.temperature_fenics, sp + ) + for sp in self.species + } + kwargs.update(D_kwargs) + kwargs["D"] = {sp.name: D_kwargs[f"D_{sp.name}"] for sp in self.species} + if len(self.species) == 1: + kwargs["D"] = kwargs[f"D_{self.species[0].name}"] + kwargs["x"] = ufl.SpatialCoordinate(self.mesh.mesh) + export.ufl_expr = export.expr(**kwargs) + def post_processing(self): # update post-processing solutions (for each species in each subdomain) # with new solution @@ -1816,7 +1882,7 @@ def post_processing(self): elif isinstance(export, exports.VolumeQuantity): if isinstance(export, exports.TotalVolume | exports.AverageVolume): try: - from dolfinx.mesh import EntityMap # noqa: F401 + from dolfinx.mesh import EntityMap entity_maps = [sd.cell_map for sd in self.volume_subdomains] except ImportError: @@ -1835,7 +1901,23 @@ def post_processing(self): else: export.compute() - if isinstance(export, exports.SurfaceQuantity | exports.VolumeQuantity): + elif isinstance(export, exports.CustomQuantity): + is_surface = isinstance(export.subdomain, _subdomain.SurfaceSubdomain) + measure = self.ds if is_surface else self.dx + + # getting entity_maps + try: + from dolfinx.mesh import EntityMap # noqa: F401 + + entity_maps = [sd.cell_map for sd in self.volume_subdomains] + except ImportError: + entity_maps = { + sd.submesh: sd.parent_to_submesh + for sd in self.volume_subdomains + } + export.compute(measure, entity_maps=entity_maps) + + if isinstance(export, exports.DerivedQuantity): # update export data export.t.append(float(self.t)) diff --git a/test/system_tests/test_surface_reactions.py b/test/system_tests/test_surface_reactions.py index 7efaa2bad..dedc4013c 100644 --- a/test/system_tests/test_surface_reactions.py +++ b/test/system_tests/test_surface_reactions.py @@ -9,7 +9,7 @@ class FluxFromSurfaceReaction(F.SurfaceFlux): def __init__(self, reaction: F.SurfaceReactionBC): super().__init__( - F.Species(), # just a dummy species here + F.Species(mobile=False), # just a dummy species here reaction.subdomain, ) self.reaction = reaction.flux_bcs[0] diff --git a/test/test_derived_quantities.py b/test/test_derived_quantities.py index edfd6ae37..fd6945b29 100644 --- a/test/test_derived_quantities.py +++ b/test/test_derived_quantities.py @@ -1,6 +1,10 @@ import os +from mpi4py import MPI + +import numpy as np import pytest +import ufl import festim as F @@ -22,8 +26,6 @@ max_surface = F.MaximumSurface(mobile_H, surface=surf_2, filename=results) avg_surface = F.AverageSurface(mobile_D, surface=surf_1, filename=results) avg_vol = F.AverageVolume(mobile_H, volume=vol_2, filename=results) -surf_quant = F.SurfaceQuantity(mobile_H, surface=surf_1, filename=results) -vol_quant = F.VolumeQuantity(mobile_H, volume=vol_1, filename=results) @pytest.mark.parametrize( @@ -46,3 +48,466 @@ def test_title(quantity, expected_title, tmp_path): quantity.value = 1 assert quantity.title == expected_title + + +class TestCustomQuantity: + """Test suite for CustomQuantity export""" + + def test_custom_quantity_title_volume(self): + """Test that CustomQuantity has correct title for volume subdomain""" + + def expr(**kwargs): + return kwargs.get("A", 0) + + quantity = F.CustomQuantity( + expr=expr, subdomain=vol_1, title="My Custom Quantity" + ) + assert quantity.title == "My Custom Quantity" + + def test_custom_quantity_title_surface(self): + """Test that CustomQuantity has correct title for surface subdomain""" + + def expr(**kwargs): + return kwargs.get("A", 0) + + quantity = F.CustomQuantity(expr=expr, subdomain=surf_1, title="Surface Custom") + assert quantity.title == "Surface Custom" + + def test_custom_quantity_subdomain_volume(self): + """Test that CustomQuantity stores volume subdomain correctly""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1) + assert quantity.subdomain == vol_1 + assert quantity.subdomain.id == 1 + + def test_custom_quantity_subdomain_surface(self): + """Test that CustomQuantity stores surface subdomain correctly""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=surf_2) + assert quantity.subdomain == surf_2 + assert quantity.subdomain.id == 2 + + def test_custom_quantity_accepts_kwargs(self): + """Test that expression callable receives kwargs""" + received_kwargs = {} + + def expr(**kwargs): + received_kwargs.update(kwargs) + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1) + quantity.expr(**{"A": 1, "B": 2, "n": None}) + assert received_kwargs == {"A": 1, "B": 2, "n": None} + + def test_custom_quantity_data_append(self): + """Test that values are appended to data list""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1) + assert quantity.data == [] + quantity.value = 5 + quantity.data.append(quantity.value) + assert quantity.data == [5] + quantity.value = 10 + quantity.data.append(quantity.value) + assert quantity.data == [5, 10] + + def test_custom_quantity_t_list(self): + """Test that time values are stored in t list""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1) + assert quantity.t == [] + quantity.t.append(0.0) + quantity.t.append(0.1) + assert quantity.t == [0.0, 0.1] + + def test_custom_quantity_filename(self, tmp_path): + """Test that filename can be set and retrieved""" + + def expr(**kwargs): + return 1 + + filename = os.path.join(tmp_path, "custom.csv") + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1, filename=filename) + assert quantity.filename == filename + + def test_custom_quantity_expr_stored(self): + """Test that expression callable is stored correctly""" + + def my_expr(**kwargs): + return kwargs.get("value", 0) + + quantity = F.CustomQuantity(expr=my_expr, subdomain=vol_1) + assert quantity.expr == my_expr + assert quantity.expr(value=42) == 42 + + +class TestCustomQuantityWithHydrogenTransportProblem: + """Integration tests for CustomQuantity with HydrogenTransportProblem""" + + def test_custom_quantity_volume_integration(self): + """Test CustomQuantity with volume subdomain in HydrogenTransportProblem""" + # BUILD + material = F.Material(D_0=1, E_D=0, K_S_0=1, E_K_S=0) + vol = F.VolumeSubdomain(id=1, material=material) + top = F.SurfaceSubdomain(id=1, locator=lambda x: np.isclose(x[1], 1)) + bottom = F.SurfaceSubdomain(id=2, locator=lambda x: np.isclose(x[1], 0)) + + from dolfinx.mesh import create_unit_square + + mesh = create_unit_square(MPI.COMM_WORLD, 5, 5) + + my_model = F.HydrogenTransportProblem() + my_model.mesh = F.Mesh(mesh) + my_model.subdomains = [vol, top, bottom] + my_model.temperature = 300 + my_model.settings = F.Settings( + final_time=0.02, atol=1e-6, rtol=1e-6, stepsize=0.01 + ) + + species = F.Species("A") + my_model.species = [species] + + my_model.boundary_conditions = [ + F.FixedConcentrationBC(species=species, subdomain=top, value=1), + F.FixedConcentrationBC(species=species, subdomain=bottom, value=0), + ] + + # Custom quantity that computes the total concentration + def total_conc(**kwargs): + return kwargs["A"] + + custom_qty = F.CustomQuantity( + expr=total_conc, subdomain=vol, title="Total concentration" + ) + + my_model.exports = [custom_qty] + + # RUN + my_model.initialise() + my_model.run() + + # TEST + assert len(custom_qty.data) > 0 + assert len(custom_qty.t) > 0 + assert len(custom_qty.t) == len(custom_qty.data) + # Check that values are non-negative (concentration) + assert all(v >= 0 for v in custom_qty.data) + + def test_custom_quantity_surface_integration(self): + """Test CustomQuantity with surface subdomain in HydrogenTransportProblem""" + # BUILD + material = F.Material(D_0=1, E_D=0, K_S_0=1, E_K_S=0) + vol = F.VolumeSubdomain(id=1, material=material) + top = F.SurfaceSubdomain(id=1, locator=lambda x: np.isclose(x[1], 1)) + bottom = F.SurfaceSubdomain(id=2, locator=lambda x: np.isclose(x[1], 0)) + + from dolfinx.mesh import create_unit_square + + mesh = create_unit_square(MPI.COMM_WORLD, 5, 5) + + my_model = F.HydrogenTransportProblem() + my_model.mesh = F.Mesh(mesh) + my_model.subdomains = [vol, top, bottom] + my_model.temperature = 300 + my_model.settings = F.Settings( + final_time=0.02, atol=1e-6, rtol=1e-6, stepsize=0.01 + ) + + species = F.Species("A") + my_model.species = [species] + + my_model.boundary_conditions = [ + F.FixedConcentrationBC(species=species, subdomain=top, value=1), + F.FixedConcentrationBC(species=species, subdomain=bottom, value=0), + ] + + # Custom quantity on surface + def surface_flux(**kwargs): + D = kwargs["D_A"] + c = kwargs["A"] + n = kwargs["n"] + return -D * ufl.dot(ufl.grad(c), n) + + custom_qty = F.CustomQuantity( + expr=surface_flux, subdomain=top, title="Surface flux" + ) + + my_model.exports = [custom_qty] + + # RUN + my_model.initialise() + my_model.run() + + # TEST + assert len(custom_qty.data) > 0 + assert len(custom_qty.t) > 0 + assert len(custom_qty.t) == len(custom_qty.data) + + def test_custom_quantity_with_multiple_species(self): + """Test CustomQuantity with multiple species""" + # BUILD + material = F.Material(D_0=1, E_D=0, K_S_0=1, E_K_S=0) + vol = F.VolumeSubdomain(id=1, material=material) + top = F.SurfaceSubdomain(id=1, locator=lambda x: np.isclose(x[1], 1)) + bottom = F.SurfaceSubdomain(id=2, locator=lambda x: np.isclose(x[1], 0)) + + from dolfinx.mesh import create_unit_square + + mesh = create_unit_square(MPI.COMM_WORLD, 5, 5) + + my_model = F.HydrogenTransportProblem() + my_model.mesh = F.Mesh(mesh) + my_model.subdomains = [vol, top, bottom] + my_model.temperature = 300 + my_model.settings = F.Settings( + final_time=0.02, atol=1e-6, rtol=1e-6, stepsize=0.01 + ) + + speciesA = F.Species("A") + speciesB = F.Species("B") + my_model.species = [speciesA, speciesB] + + my_model.boundary_conditions = [ + F.FixedConcentrationBC(species=speciesA, subdomain=top, value=1), + F.FixedConcentrationBC(species=speciesA, subdomain=bottom, value=0), + F.FixedConcentrationBC(species=speciesB, subdomain=top, value=0), + F.FixedConcentrationBC(species=speciesB, subdomain=bottom, value=1), + ] + + # Custom quantity combining both species + def combined_conc(**kwargs): + return kwargs["A"] + kwargs["B"] + + custom_qty = F.CustomQuantity( + expr=combined_conc, subdomain=vol, title="Combined concentration" + ) + + my_model.exports = [custom_qty] + + # RUN + my_model.initialise() + my_model.run() + + # TEST + assert len(custom_qty.data) > 0 + assert len(custom_qty.t) > 0 + # Combined concentration should be <= 2 (max of each species) + assert all(0 <= v <= 2 for v in custom_qty.data) + + +class TestDerivedQuantityFilename: + """Test suite for DerivedQuantity filename property validation""" + + def test_filename_none(self): + """Test that filename can be set to None""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1, filename=None) + assert quantity.filename is None + + def test_filename_csv(self, tmp_path): + """Test that filename accepts .csv extension""" + + def expr(**kwargs): + return 1 + + filename = os.path.join(tmp_path, "output.csv") + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1, filename=filename) + assert quantity.filename == filename + + def test_filename_txt(self, tmp_path): + """Test that filename accepts .txt extension""" + + def expr(**kwargs): + return 1 + + filename = os.path.join(tmp_path, "output.txt") + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1, filename=filename) + assert quantity.filename == filename + + def test_filename_invalid_extension(self): + """Test that filename rejects invalid extensions""" + + def expr(**kwargs): + return 1 + + with pytest.raises(ValueError, match="filename must end with .csv or .txt"): # noqa: RUF043 + F.CustomQuantity(expr=expr, subdomain=vol_1, filename="output.dat") + + def test_filename_invalid_type(self): + """Test that filename rejects non-string types""" + + def expr(**kwargs): + return 1 + + with pytest.raises(TypeError, match="filename must be of type str"): + F.CustomQuantity(expr=expr, subdomain=vol_1, filename=123) + + def test_filename_property_setter_none(self): + """Test setting filename property to None after initialization""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1) + quantity.filename = None + assert quantity.filename is None + + def test_filename_property_setter_csv(self, tmp_path): + """Test setting filename property to a .csv file""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1) + filename = os.path.join(tmp_path, "new_output.csv") + quantity.filename = filename + assert quantity.filename == filename + + def test_filename_property_setter_txt(self, tmp_path): + """Test setting filename property to a .txt file""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1) + filename = os.path.join(tmp_path, "new_output.txt") + quantity.filename = filename + assert quantity.filename == filename + + def test_filename_property_setter_invalid_extension(self): + """Test that setting invalid extension raises ValueError""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1) + with pytest.raises(ValueError, match="filename must end with .csv or .txt"): # noqa: RUF043 + quantity.filename = "output.json" + + def test_filename_property_setter_invalid_type(self): + """Test that setting non-string filename raises TypeError""" + + def expr(**kwargs): + return 1 + + quantity = F.CustomQuantity(expr=expr, subdomain=vol_1) + with pytest.raises(TypeError, match="filename must be of type str"): + quantity.filename = 42 + + +class TestCustomQuantityWithDiscontinuousProblem: + """ + Integration tests for CustomQuantity with HydrogenTransportProblemDiscontinuous + """ + + def test_custom_quantity_discontinuous_volume(self): + """Test CustomQuantity with volume subdomain in discontinuous problem""" + # BUILD + material = F.Material(D_0=1, E_D=0, K_S_0=1, E_K_S=0) + vol = F.VolumeSubdomain(id=1, material=material) + top = F.SurfaceSubdomain(id=1, locator=lambda x: np.isclose(x[1], 1)) + bottom = F.SurfaceSubdomain(id=2, locator=lambda x: np.isclose(x[1], 0)) + + from dolfinx.mesh import create_unit_square + + mesh = create_unit_square(MPI.COMM_WORLD, 5, 5) + + my_model = F.HydrogenTransportProblemDiscontinuous() + my_model.mesh = F.Mesh(mesh) + my_model.subdomains = [vol, top, bottom] + my_model.temperature = 300 + my_model.settings = F.Settings( + final_time=0.02, atol=1e-6, rtol=1e-6, stepsize=0.01 + ) + + species = F.Species("A", subdomains=[vol]) + my_model.species = [species] + + my_model.boundary_conditions = [ + F.FixedConcentrationBC(species=species, subdomain=top, value=1), + F.FixedConcentrationBC(species=species, subdomain=bottom, value=0), + ] + + # Custom quantity + def total_conc(**kwargs): + return kwargs["A"] + + custom_qty = F.CustomQuantity( + expr=total_conc, subdomain=vol, title="Total concentration" + ) + + my_model.exports = [custom_qty] + + # RUN + my_model.initialise() + my_model.run() + + # TEST + assert len(custom_qty.data) > 0 + assert len(custom_qty.t) > 0 + assert len(custom_qty.t) == len(custom_qty.data) + + def test_custom_quantity_discontinuous_surface(self): + """Test CustomQuantity with surface subdomain in discontinuous problem""" + # BUILD + material = F.Material(D_0=1, E_D=0, K_S_0=1, E_K_S=0) + vol = F.VolumeSubdomain(id=1, material=material) + top = F.SurfaceSubdomain(id=1, locator=lambda x: np.isclose(x[1], 1)) + bottom = F.SurfaceSubdomain(id=2, locator=lambda x: np.isclose(x[1], 0)) + + from dolfinx.mesh import create_unit_square + + mesh = create_unit_square(MPI.COMM_WORLD, 5, 5) + + my_model = F.HydrogenTransportProblemDiscontinuous() + my_model.mesh = F.Mesh(mesh) + my_model.subdomains = [vol, top, bottom] + my_model.temperature = 300 + my_model.settings = F.Settings( + final_time=0.02, atol=1e-6, rtol=1e-6, stepsize=0.01 + ) + + species = F.Species("A", subdomains=[vol]) + my_model.species = [species] + + my_model.boundary_conditions = [ + F.FixedConcentrationBC(species=species, subdomain=top, value=1), + F.FixedConcentrationBC(species=species, subdomain=bottom, value=0), + ] + + # Custom quantity on surface + def surface_flux(**kwargs): + D = kwargs["D_A"] + c = kwargs["A"] + n = kwargs["n"] + return -D * ufl.dot(ufl.grad(c), n) + + custom_qty = F.CustomQuantity( + expr=surface_flux, subdomain=top, title="Surface flux" + ) + + my_model.exports = [custom_qty] + + # RUN + my_model.initialise() + my_model.run() + + # TEST + assert len(custom_qty.data) > 0 + assert len(custom_qty.t) > 0 + assert len(custom_qty.t) == len(custom_qty.data) diff --git a/test/test_surface_quantity.py b/test/test_surface_quantity.py index c2943bcaf..a94378d44 100644 --- a/test/test_surface_quantity.py +++ b/test/test_surface_quantity.py @@ -101,29 +101,6 @@ def test_write_overwrite(tmp_path): assert file_length == expected_length -def test_filename_setter_raises_TypeError(): - """Test that a TypeError is raised when the filename is not a string.""" - - with pytest.raises(TypeError, match="filename must be of type str"): - F.SurfaceQuantity( - filename=1, - field=F.Species("test"), - surface=F.SurfaceSubdomain1D(id=1, x=0), - ) - - -def test_filename_setter_raises_ValueError(tmp_path): - """Test that a ValueError is raised when the filename does not end with .csv or - .txt.""" - - with pytest.raises(ValueError): - F.SurfaceQuantity( - filename=os.path.join(tmp_path, "my_export.xdmf"), - field=F.Species("test"), - surface=F.SurfaceSubdomain1D(id=1, x=0), - ) - - def test_field_setter_raises_TypeError(): """Test that a TypeError is raised when the field is not a F.Species.""" @@ -151,15 +128,3 @@ def test_writer(tmp_path, value): expected_length = i + 2 assert file_length == expected_length - - -def test_surface_setter_raises_TypeError(): - """Test that a TypeError is raised when the surface is not a F.SurfaceSubdomain.""" - - with pytest.raises( - TypeError, match=r"surface should be an int or F.SurfaceSubdomain" - ): - F.SurfaceQuantity( - field=F.Species("H"), - surface="1", - ) diff --git a/test/test_volume_quantity.py b/test/test_volume_quantity.py index 7f530ef58..eb3a44478 100644 --- a/test/test_volume_quantity.py +++ b/test/test_volume_quantity.py @@ -6,29 +6,6 @@ dummy_volume = F.VolumeSubdomain1D(id=1, borders=[0, 1], material=dummy_mat) -def test_filename_setter_raises_TypeError(): - """Test that a TypeError is raised when the filename is not a string.""" - - with pytest.raises(TypeError, match="filename must be of type str"): - F.VolumeQuantity( - filename=1, - field=F.Species("test"), - volume=dummy_volume, - ) - - -def test_filename_setter_raises_ValueError(): - """Test that a ValueError is raised when the filename does not end with .csv or - .txt.""" - - with pytest.raises(ValueError): - F.VolumeQuantity( - filename="my_export.xdmf", - field=F.Species("test"), - volume=dummy_volume, - ) - - def test_field_setter_raises_TypeError(): """Test that a TypeError is raised when the field is not a F.Species."""