diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f7ca711..0f89826 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,10 +27,10 @@ jobs: - name: Run tests shell: bash -l {0} run: | - python -m pytest test + python -m pytest test --cov src/sparging --cov-report xml - # - name: Upload to codecov - # uses: codecov/codecov-action@v4 - # with: - # token: ${{ secrets.CODECOV_TOKEN }} - # files: ./coverage.xml \ No newline at end of file + - name: Upload to codecov + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ./coverage.xml \ No newline at end of file diff --git a/.gitignore b/.gitignore index 60cb967..06849bf 100644 --- a/.gitignore +++ b/.gitignore @@ -56,7 +56,6 @@ cover/ *.pot # Django stuff: -*.log local_settings.py db.sqlite3 db.sqlite3-journal @@ -211,5 +210,6 @@ __marimo__/ *.csv mwe.py *.json +!standard_input.json *.ipynb *.code-workspace diff --git a/example2.py b/example2.py new file mode 100644 index 0000000..3382f5c --- /dev/null +++ b/example2.py @@ -0,0 +1,76 @@ +from sparging.config import ureg +from sparging import all_correlations +from sparging import animation +from sparging.model import Simulation +from sparging.inputs import ( + ColumnGeometry, + BreederMaterial, + OperatingParameters, + SpargingParameters, + SimulationInput, +) +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import pint + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.WARNING) + + +geom = ColumnGeometry( + area=0.2 * ureg.m**2, + height=1.0 * ureg.m, + nozzle_diameter=0.001 * ureg.m, + nb_nozzle=10 * ureg.dimensionless, +) + +flibe = BreederMaterial( + name="FLiBe", +) + +operating_params = OperatingParameters( + temperature=600 * ureg.celsius, + P_top=1 * ureg.atm, + flow_g_mol=400 * ureg.sccm, + tbr=0.1 * ureg("triton / neutron"), + n_gen_rate=1e9 * ureg("neutron / s"), +) + +sparging_params = SpargingParameters( + h_l=all_correlations("h_l_briggs"), +) + + +# class method from_parameters that takes in objects like ColumnGeometry, BreederMaterial, OperatingParameters and returns a SimulationInput object with the appropriate correlations for the given parameters. This method should be able to handle cases where some of the parameters are already provided as correlations and should not overwrite them. +my_input = SimulationInput.from_parameters( + geom, flibe, operating_params, sparging_params +) +logger.info(my_input) + + +def profile_source_T(z: pint.Quantity): + import numpy as np + + # return np.sin(np.pi / (1 * ureg.m) * z) + return 0.5 * (1 + np.cos(0.5 * np.pi / (1 * ureg.m) * z)) + + +my_simulation = Simulation( + my_input, + t_final=3 * ureg.days, + signal_irr=lambda t: 1 if t < 12 * ureg.hour else 0, + signal_sparging=lambda t: 1, +) +output = my_simulation.solve() + +# # save output to file +# output.profiles_to_csv(f"output_{tank_height}m.csv") + +# # plot results +# from sparging import plotting +# plotting.plot_animation(output) + + +animation.create_animation(output, show_activity=True) diff --git a/main.py b/main.py deleted file mode 100644 index b6e62df..0000000 --- a/main.py +++ /dev/null @@ -1,41 +0,0 @@ -import src.sparging.model as model -import sys -import os -from sparging.animation import create_animation -from sparging.helpers import get_input - - -ANIMATE = True -SHOW_ACTIVITY = True -YAML_INPUT_PATH = os.path.join(os.getcwd(), sys.argv[1]) -OUTPUT_FOLDER = os.path.join( - os.getcwd(), sys.argv[1].split(".")[0].replace("_input", "") -) - -if __name__ == "__main__": - params = get_input(YAML_INPUT_PATH) - sim_input = model.SimulationInput(params) - - # TODO integrate to input file - t_sparging_hr = [24, 1e20] # time interval when sparger is ON - t_irr_hr = [0, 96] # time interval when irradiation is ON - t_final = 6 * model.days_to_seconds - - results = model.solve( - sim_input, - t_final=t_final, - t_irr=[t * model.hours_to_seconds for t in t_irr_hr], - t_sparging=[t * model.hours_to_seconds for t in t_sparging_hr], - ) - - os.makedirs(OUTPUT_FOLDER, exist_ok=True) - results.to_yaml(os.path.join(OUTPUT_FOLDER, "restart.yaml")) - results.to_json(os.path.join(OUTPUT_FOLDER, "output.json")) - # results.profiles_to_csv(os.path.join(OUTPUT_FOLDER, "profiles")) - - if ANIMATE is True: - # Create interactive animation - try: - create_animation(results, show_activity=SHOW_ACTIVITY) - except KeyboardInterrupt: - pass diff --git a/pyproject.toml b/pyproject.toml index b4c3316..219edf5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,3 +50,9 @@ addopts = "-v" [tool.setuptools.package-data] sparging = ["*.yaml", "*.yml"] + +[tool.coverage.run] +source = ["src/sparging"] +omit = [ + "src/sparging/animation.py" +] diff --git a/src/sparging/__init__.py b/src/sparging/__init__.py index fc633f4..fd03b56 100644 --- a/src/sparging/__init__.py +++ b/src/sparging/__init__.py @@ -2,11 +2,10 @@ libra_sparging: A finite element model for sparging processes using FEniCSx/DOLFINX. """ -from .config import ureg, const_R, const_g, VERBOSE +from .config import ureg, const_R, const_g from .model import SimulationResults from .animation import ConcentrationAnimator -from .helpers import * -from .correlations import * +from .correlations import all_correlations, CorrelationGroup, Correlation __all__ = [ "SimulationInput", @@ -15,5 +14,4 @@ "ureg", "const_R", "const_g", - "VERBOSE", ] diff --git a/src/sparging/config.py b/src/sparging/config.py index c67e2a3..1ec8f5c 100644 --- a/src/sparging/config.py +++ b/src/sparging/config.py @@ -1,8 +1,6 @@ from pint import UnitRegistry import scipy.constants as const -VERBOSE = False - ureg = UnitRegistry( autoconvert_offset_to_baseunit=True ) # to deal with offset units (eg: degree celsius) diff --git a/src/sparging/correlations.py b/src/sparging/correlations.py index 394cb9d..9f8f09b 100644 --- a/src/sparging/correlations.py +++ b/src/sparging/correlations.py @@ -1,67 +1,399 @@ from __future__ import annotations -from sparging.config import ureg, const_R, const_g, VERBOSE +from sparging.config import ureg, const_R, const_g from typing import TYPE_CHECKING if TYPE_CHECKING: from sparging.model import SimulationInput + import pint import numpy as np import scipy.constants as const import warnings +from dataclasses import dataclass +import enum + U_G0_DEFAULT = 0.25 # m/s, typical bubble velocity according to Chavez 2021 -correlations_dict = { - "rho_l": lambda input: ureg.Quantity( - 2245 - 0.424 * input.T.to("celsius").magnitude, "kg/m**3" +class CorrelationType(enum.Enum): # TODO do we really use it ? + MASS_TRANSFER_COEFF = "h_l" + DENSITY = "rho_l" + DIFFUSIVITY = "D_l" + SOLUBILITY = "K_S" + VISCOSITY = "mu" + SURFACE_TENSION = "sigma" + GAS_VOID_FRACTION = "eps_g" + BUBBLE_DIAMETER = "d_b" + EOTVOS_NUMBER = "Eo" + MORTON_NUMBER = "Mo" + SCHMIDT_NUMBER = "Sc" + REYNOLDS_NUMBER = "Re" + BUBBLE_VELOCITY = "u_g0" + GAS_PHASE_DISPERSION = "E_g" + PRESSURE = "P" + FLOW_RATE = "flow_g_mol" + INTERFACIAL_AREA = "a" + TRITIUM_SOURCE = "source_T" + + +@dataclass +class Correlation: + identifier: str + function: callable + corr_type: CorrelationType + input_units: list[str] + source: str | None = None + description: str | None = None + output_units: str | None = None + + def __call__(self, **kwargs: pint.Quantity) -> pint.Quantity: + # check the dimensions are correct + for arg_name, expected_dimension in zip(kwargs, self.input_units): + arg = kwargs[arg_name] + if not isinstance(arg, ureg.Quantity): + raise ValueError( + f"Invalid input: expected a pint.Quantity with units of {expected_dimension}, got {arg} of type {type(arg)}" + ) + if not arg.dimensionality == ureg(expected_dimension).dimensionality: + raise ValueError( + f"Invalid input when resolving for {self.identifier}: expected dimensions of {expected_dimension}, got {arg.dimensionality}" + ) + result = self.function(**kwargs) + if self.output_units is not None: + return result.to(self.output_units) + else: + return result.to_base_units() + + # TODO add a method that checks the validity of the input parameters based on the range of validity of the correlation, if provided in the description or source. This method could be called before running the simulation to warn the user if they are using a correlation outside of its validated range. + # TODO add __post_init__ to check that user defined correlation has same number of input_units as the number of arguments in the function, and that the output of the function is a pint.Quantity with the correct units if output_units is provided + + +class CorrelationGroup(list[Correlation]): + def __call__(self, identifier: str) -> Correlation: + for corr in self: + if corr.identifier == identifier: + return corr + raise ValueError( + f"Correlation with identifier {identifier} not found in correlation group" + ) + + def __contains__(self, key: str | Correlation): + if isinstance(key, str): + for corr in self: + if corr.identifier == key: + return True + return False + elif isinstance(key, Correlation): + return super().__contains__(key) + else: + raise TypeError( + f"Type not valid for correlation group membership check, expected str or Correlation, got {type(key)}" + ) + + +all_correlations = CorrelationGroup([]) + + +rho_l = Correlation( + identifier="rho_l", + function=lambda temperature: ureg.Quantity( + 2245 - 0.424 * temperature.to("celsius").magnitude, "kg/m**3" ), # density of Li2BeF4, Vidrio 2022 - "mu_l": lambda input: ureg.Quantity( - 0.116e-3 * np.exp(3755 / input.T.to("kelvin").magnitude), "Pa*s" + corr_type=CorrelationType.DENSITY, + source="Vidrio 2022", + description="density of Li2BeF4 as a function of temperature", + input_units=["kelvin"], +) +all_correlations.append(rho_l) + +mu_l = Correlation( + identifier="mu_l", + function=lambda temperature: ureg.Quantity( + 0.116e-3 * np.exp(3755 / temperature.to("kelvin").magnitude), "Pa*s" ), # kinematic viscosity of Li2BeF4, Cantor 1968 - "sigma_l": lambda input: ureg.Quantity( - 260 - 0.12 * input.T.to("celsius").magnitude, "dyn/cm" + corr_type=CorrelationType.VISCOSITY, + source="Cantor 1968", + description="dynamic viscosity of Li2BeF4 as a function of temperature", + input_units=["kelvin"], +) +all_correlations.append(mu_l) + +nu_l = Correlation( + identifier="nu_l", + function=lambda mu_l, rho_l: ( + mu_l / rho_l + ), # kinematic viscosity of Li2BeF4 calculated from dynamic viscosity and density + corr_type=CorrelationType.VISCOSITY, + source="calculated from mu_l and rho_l", + description="kinematic viscosity of Li2BeF4 calculated from dynamic viscosity and density", + input_units=["Pa*s", "kg/m**3"], + output_units="m**2/s", +) +all_correlations.append(nu_l) + +sigma_l = Correlation( + identifier="sigma_l", + function=lambda temperature: ureg.Quantity( + 260 - 0.12 * temperature.to("celsius").magnitude, "dyn/cm" ).to("N/m"), # surface tension of Li2BeF4,Cantor 1968 - "D_l": lambda input: ureg.Quantity( - 9.3e-7 * np.exp(-42e3 / (const.R * input.T.to("kelvin").magnitude)), "m**2/s" - ), # diffusivity of T in FLiBe, Calderoni 2008 - "K_s": lambda input: ureg.Quantity( - 7.9e-2 * np.exp(-35e3 / (const.R * input.T.to("kelvin").magnitude)), - "mol/m**3/Pa", - ), # solubility of T in FLiBe, Calderoni 2008 - "d_b": lambda input: get_d_b( - flow_g_vol=input.flow_g_vol, - nozzle_diameter=input.nozzle_diameter, - nb_nozzle=input.nb_nozzle, + corr_type=CorrelationType.SURFACE_TENSION, + source="Cantor 1968", + description="surface tension of Li2BeF4 as a function of temperature", + input_units=["kelvin"], +) +all_correlations.append(sigma_l) + +# TODO this could leverage HTM +D_l = Correlation( + identifier="D_l", + function=lambda temperature: ( + 9.3e-7 + * ureg("m**2/s") + * np.exp(-42e3 * ureg("J/mol") / (const_R * temperature.to("kelvin"))) + ), + corr_type=CorrelationType.DIFFUSIVITY, + source="Calderoni 2008", + description="diffusivity of tritium in liquid FLiBe as a function of temperature", + input_units=["kelvin"], + output_units="m**2/s", +) +all_correlations.append(D_l) + +K_s = Correlation( + identifier="K_s", + function=lambda temperature: ( + 7.9e-2 + * ureg("mol/m**3/Pa") + * np.exp(-35e3 * ureg("J/mol") / (const_R * temperature.to("kelvin"))) + ), + corr_type=CorrelationType.SOLUBILITY, + source="Calderoni 2008", + description="solubility of tritium in liquid FLiBe as a function of temperature", + input_units=["kelvin"], + output_units="mol/m**3/Pa", +) +all_correlations.append(K_s) + +d_b = Correlation( + identifier="d_b", + function=lambda flow_g_vol, nozzle_diameter, nb_nozzle: get_d_b( + flow_g_vol=flow_g_vol, nozzle_diameter=nozzle_diameter, nb_nozzle=nb_nozzle ), # mean bubble diameter, Kanai 2017 - "Eo": lambda input: (const_g * input.drho * input.d_b**2 / input.sigma_l).to( + corr_type=CorrelationType.BUBBLE_DIAMETER, + input_units=["m**3/s", "m", "dimensionless"], + output_units="m", +) +all_correlations.append(d_b) + +E_o = Correlation( + identifier="Eo", + function=lambda drho, d_b, sigma_l: (const_g * drho * d_b**2 / sigma_l).to( "dimensionless" ), # Eotvos number - "Mo": lambda input: ( - input.drho * const_g * input.mu_l**4 / (input.rho_l**2 * input.sigma_l**3) + corr_type=CorrelationType.EOTVOS_NUMBER, + input_units=["kg/m**3", "m", "N/m"], +) +all_correlations.append(E_o) + +Mo = Correlation( + identifier="Mo", + function=lambda drho, mu_l, rho_l, sigma_l: ( + drho * const_g * mu_l**4 / (rho_l**2 * sigma_l**3) ).to("dimensionless"), # Morton number - "Sc": lambda input: (input.nu_l / input.D_l).to("dimensionless"), # Schmidt number - "Re": lambda input: get_Re(input), # Reynolds number - "u_g0": lambda input: get_u_g0(input), # initial gas velocity - "eps_g": lambda input: get_eps_g(input), # gas void fraction - "h_l_malara": lambda input: get_h_malara( - input.D_l, input.d_b + corr_type=CorrelationType.MORTON_NUMBER, + input_units=["kg/m**3", "Pa*s", "kg/m**3", "N/m"], +) +all_correlations.append(Mo) + +Sc = Correlation( + identifier="Sc", + function=lambda nu_l, D_l: (nu_l / D_l).to("dimensionless"), # Schmidt number + corr_type=CorrelationType.SCHMIDT_NUMBER, + input_units=["m**2/s", "m**2/s"], +) +all_correlations.append(Sc) + +Re = Correlation( + identifier="Re", + function=lambda rho_l, u_g0, d_b, mu_l: (rho_l * u_g0 * d_b / mu_l).to( + "dimensionless" + ), # Reynolds number + corr_type=CorrelationType.REYNOLDS_NUMBER, + input_units=["kg/m**3", "m/s", "m", "Pa*s"], +) +all_correlations.append(Re) + +u_g0 = Correlation( + identifier="u_g0", + function=lambda Eo, Mo, mu_l, rho_l, d_b: get_u_g0( + Eo=Eo, Mo=Mo, mu_l=mu_l, rho_l=rho_l, d_b=d_b + ), # initial gas velocity + corr_type=CorrelationType.BUBBLE_VELOCITY, + input_units=[ + "dimensionless", + "dimensionless", + "Pa*s", + "kg/m**3", + "m", + ], + output_units="m/s", +) +all_correlations.append(u_g0) + +eps_g = Correlation( + identifier="eps_g", + function=lambda temperature, P_bottom, sigma_l, d_b, flow_g_mol, tank_diameter, u_g0: ( + get_eps_g( + T=temperature, + P_0=P_bottom, + sigma_l=sigma_l, + d_b=d_b, + flow_g=flow_g_mol, + tank_diameter=tank_diameter, + u_g0=u_g0, + ) + ), # gas void fraction + corr_type=CorrelationType.GAS_VOID_FRACTION, + input_units=[ + "kelvin", + "Pa", + "N/m", + "m", + "mol/s", + "m", + "m/s", + ], + output_units="dimensionless", +) +all_correlations.append(eps_g) + +h_l_higbie = Correlation( + identifier="h_l_higbie", + function=lambda D_l, u_g, d_b: get_h_higbie( + D_l=D_l, u_g=u_g, d_b=d_b + ), # mass transfer coefficient with Higbie correlation + corr_type=CorrelationType.MASS_TRANSFER_COEFF, + source="Higbie 1935", + description="mass transfer coefficient for tritium in liquid FLiBe using Higbie penetration model", + input_units=["m**2/s", "m/s", "m"], + output_units="m/s", +) +all_correlations.append(h_l_higbie) + +h_l_malara = Correlation( + identifier="h_l_malara", + function=lambda D_l, d_b: get_h_malara( + D_l=D_l, d_b=d_b ), # mass transfer coefficient with Malara correlation - "h_l_briggs": lambda input: get_h_briggs( - input.Re, input.Sc, input.D_l, input.d_b + corr_type=CorrelationType.MASS_TRANSFER_COEFF, + source="Malara 1995", + description="mass transfer coefficient for tritium in liquid FLiBe using Malara 1995 correlation (used for inert gas stripping from breeder droplets, may not be valid here)", + input_units=["m**2/s", "m"], + output_units="m/s", +) +all_correlations.append(h_l_malara) + +h_l_briggs = Correlation( + identifier="h_l_briggs", + function=lambda Re, Sc, D_l, d_b: get_h_briggs( + Re=Re, Sc=Sc, D_l=D_l, d_b=d_b ), # mass transfer coefficient with Briggs correlation - "h_l_higbie": lambda input: get_h_higbie( - input.D_l, input.u_g0, input.d_b - ), # mass transfer coefficient with Higbie correlation - "E_g": lambda input: get_E_g( - input.tank_diameter, input.u_g0 + corr_type=CorrelationType.MASS_TRANSFER_COEFF, + source="Briggs 1970", + description="mass transfer coefficient for tritium in liquid FLiBe using Briggs 1970 correlation", + input_units=["dimensionless", "dimensionless", "m**2/s", "m"], + output_units="m/s", +) +all_correlations.append(h_l_briggs) + +E_g = Correlation( + identifier="E_g", + function=lambda tank_diameter, u_g0: get_E_g( + diameter=tank_diameter, u_g=u_g0 ), # gas phase axial dispersion coefficient - "source_T": lambda input: ( - input.tbr * input.n_gen_rate / (input.tank_volume) - ), # tritium generation source term -} + corr_type=CorrelationType.GAS_PHASE_DISPERSION, + source="Malara 1995", + description="gas phase axial dispersion coefficient [m2/s], Malara 1995 correlation models dispersion of the gas velocity distribution around the mean bubble velocity", + input_units=["m", "m/s"], + output_units="m**2/s", +) +all_correlations.append(E_g) + +drho = Correlation( + identifier="drho", + function=lambda rho_l, rho_g: (rho_l - rho_g).to( + "kg/m**3" + ), # density difference between liquid and gas + corr_type=CorrelationType.DENSITY, + input_units=["kg/m**3", "kg/m**3"], +) +all_correlations.append(drho) + +he_molar_mass = ureg("4.003e-3 kg/mol") +rho_g = Correlation( + identifier="rho_g", + function=lambda temperature, P_bottom: ureg.Quantity( + (P_bottom * he_molar_mass / (const_R * temperature.to("kelvin"))).to("kg/m**3") + ), # ideal gas law for density of gas phase + corr_type=CorrelationType.DENSITY, + description="density of gas phase calculated using ideal gas law", + input_units=["kelvin", "Pa"], +) +all_correlations.append(rho_g) -def get_d_b(flow_g_vol: float, nozzle_diameter: float, nb_nozzle: int) -> float: +P_bottom = Correlation( + identifier="P_bottom", + function=lambda P_top, rho_l, height: ( + P_top + rho_l * const_g * height + ), # convert pressure to Pascals + corr_type=CorrelationType.PRESSURE, + description="pressure at the bottom of the system, converted to Pascals", + input_units=["Pa", "kg/m**3", "m"], + output_units="Pa", +) +all_correlations.append(P_bottom) + +flow_g_vol = Correlation( + identifier="flow_g_vol", + function=lambda flow_g_mol, temperature, P_bottom: ( + flow_g_mol * const_R * temperature / P_bottom + ), # convert molar flow rate to volumetric flow rate using ideal gas law + corr_type=CorrelationType.FLOW_RATE, + description="volumetric flow rate of gas phase calculated from molar flow rate using ideal gas law", + input_units=["mol/s", "kelvin", "Pa"], + output_units="m**3/s", +) +all_correlations.append(flow_g_vol) + + +specific_interfacial_area = Correlation( + identifier="a", + function=lambda d_b, eps_g: ( + 6 * eps_g / d_b + ), # specific interfacial area for spherical bubbles + corr_type=CorrelationType.INTERFACIAL_AREA, + description="specific interfacial area calculated from bubble diameter and gas void fraction, assuming spherical bubbles", + input_units=["m", "dimensionless"], + output_units="1/m", +) +all_correlations.append(specific_interfacial_area) + +source_T_from_tbr = Correlation( + identifier="source_T", + function=lambda tbr, n_gen_rate, tank_volume: ( + tbr * n_gen_rate / tank_volume + ), # source term for tritium generation calculated from TBR and neutron generation rate + corr_type=CorrelationType.TRITIUM_SOURCE, + input_units=["triton/neutron", "neutron/s", "m**3"], + output_units="molT/m**3/s", +) +all_correlations.append(source_T_from_tbr) + + +def get_d_b( + flow_g_vol: pint.Quantity, nozzle_diameter: pint.Quantity, nb_nozzle: pint.Quantity +) -> float: """ mean bubble diameter [m], Kanai 2017 (reported by Evans 2026) """ @@ -81,62 +413,39 @@ def get_d_b(flow_g_vol: float, nozzle_diameter: float, nb_nozzle: int) -> float: ) -def get_Re(input: "SimulationInput") -> float: - try: - u = input.u_g0 - Re_old = input.Re - except AttributeError: - if VERBOSE: - print("in get_Re, use default bubble velocity") - u = U_G0_DEFAULT * ureg("m/s") - Re_old = 0 - - Re = input.rho_l * u * input.d_b / input.mu_l - if ( - Re_old != 0 and np.abs(np.log10(Re / Re_old)) > 1 - ): # check if Reynolds number changed by more than an order of magnitude - warnings.warn( - f"Re number changed significantly from {Re_old:.2e} to {Re:.2e}, bubble velocity u = {u} might be off. Check assumed bubble velocity in initial Reynolds number calculation" - ) - return Re.to("dimensionless") - - -def get_u_g0(input: "SimulationInput") -> float: # TODO move inside class ? +def get_u_g0(Eo, Mo, mu_l, rho_l, d_b) -> float: # TODO move inside class ? """ bubble initial velocity [m/s], correlation for terminal velocity from Clift 1978 """ - H = (4 / 3 * input.Eo.magnitude * input.Mo.magnitude**-0.149) * ( - input.mu_l.magnitude / 0.0009 + H = (4 / 3 * Eo.magnitude * Mo.magnitude**-0.149) * ( + mu_l.magnitude / 0.0009 ) ** -0.14 if H > 59.3: J = 3.42 * H**0.441 elif H > 2: J = 0.94 * H**0.757 else: - J = input.Re * input.Mo**0.149 + 0.857 - warnings.warn( - f"Warning: low Reynolds number {input.Re:.2e}, bubble size d_b = {input.d_b} m might be too small." - f"Clift correlation will use default value for bubble velocity u_g0 = {U_G0_DEFAULT} m/s" + raise ValueError( + f"Clift correlation is not valid for H = {H}, which is calculated based on the input parameters. Check the input parameters and the validity of the correlation for the given range of parameters." ) - u_g0 = input.mu_l / (input.rho_l * input.d_b) * input.Mo**-0.149 * (J - 0.857) + u_g0 = mu_l / (rho_l * d_b) * Mo**-0.149 * (J - 0.857) if u_g0 > ureg("1 m/s") or u_g0 < ureg("0.1 m/s"): warnings.warn(f"Warning: bubble velocity {u_g0} is out of the typical range") return u_g0 -def get_eps_g(input: "SimulationInput") -> float: - """computes gas void fraction from ideal gas law and Young-Laplace pressure in the bubbles (neglecting hydrostatic pressure variation)""" +def get_eps_g(T, P_0, sigma_l, d_b, flow_g, tank_diameter, u_g0) -> float: eps_g = ( const_R - * input.T - / (input.P_0 + 4 * input.sigma_l / input.d_b) - * input.flow_g - / (np.pi * (input.tank_diameter / 2) ** 2 * input.u_g0) + * T + / (P_0 + 4 * sigma_l / d_b) + * flow_g + / (np.pi * (tank_diameter / 2) ** 2 * u_g0) ) - if eps_g > 1 or eps_g < 0: + if eps_g > 1 * ureg("dimensionless") or eps_g < 0 * ureg("dimensionless"): warnings.warn(f"Warning: unphysical gas fraction: {eps_g}") - elif eps_g > 0.1: + elif eps_g > 0.1 * ureg("dimensionless"): warnings.warn( f"Warning: high gas fraction: {eps_g}, models assumptions may not hold" ) diff --git a/src/sparging/inputs.py b/src/sparging/inputs.py new file mode 100644 index 0000000..b61b7e4 --- /dev/null +++ b/src/sparging/inputs.py @@ -0,0 +1,215 @@ +from dataclasses import dataclass +from sparging.correlations import Correlation, all_correlations +import pint +from typing import List +import inspect +import numpy as np +import logging + +logger = logging.getLogger(__name__) + + +@dataclass +class ColumnGeometry: + area: pint.Quantity + height: pint.Quantity + nozzle_diameter: pint.Quantity + nb_nozzle: int + + @property + def tank_diameter(self): + return np.sqrt(4 * self.area / np.pi) + + @property + def tank_volume(self): + return self.area * self.height + + +@dataclass +class BreederMaterial: + name: str + D_l: pint.Quantity | Correlation | None = None + K_s: pint.Quantity | Correlation | None = None + density: pint.Quantity | Correlation | None = None + viscosity: pint.Quantity | Correlation | None = None + surface_tension: pint.Quantity | Correlation | None = None + + +@dataclass +class OperatingParameters: + temperature: pint.Quantity + flow_g_mol: pint.Quantity + P_top: pint.Quantity + # irradiation_signal: pint.Quantity # TODO implement + # t_sparging: pint.Quantity # TODO implement + flow_g_vol: pint.Quantity | None = None + P_bottom: pint.Quantity | Correlation | None = None + tbr: pint.Quantity | None = None + n_gen_rate: pint.Quantity | None = None + source_T: pint.Quantity | Correlation | None = None + + +@dataclass +class SpargingParameters: + h_l: pint.Quantity | Correlation + eps_g: pint.Quantity | Correlation | None = None + u_g0: pint.Quantity | Correlation | None = None + d_b: pint.Quantity | Correlation | None = None + rho_g: pint.Quantity | Correlation | None = None + E_g: pint.Quantity | Correlation | None = None + a: pint.Quantity | Correlation | None = None + + +@dataclass +class SimulationInput: + height: pint.Quantity + area: pint.Quantity + u_g0: pint.Quantity + temperature: pint.Quantity + a: pint.Quantity + h_l: pint.Quantity + K_s: pint.Quantity + P_bottom: pint.Quantity + eps_g: pint.Quantity + E_g: pint.Quantity + D_l: pint.Quantity + source_T: pint.Quantity + + @property + def volume(self): + return self.area * self.height + + def __post_init__(self): + # make sure there are only pint.Quantity or callables in the input, otherwise raise an error + for key in self.__dataclass_fields__.keys(): + value = getattr(self, key) + if not isinstance(value, pint.Quantity): + raise ValueError( + f"In {self.__class__.__name__}: Invalid type for '{key}': expected a pint.Quantity, got {value} of type {type(value)}" + ) + + def to_json(self, path: str): + import json + + with open(path, "w") as f: + json.dump( + {key: str(value) for key, value in self.__dict__.items()}, f, indent=2 + ) + + @classmethod + def from_parameters( + cls, + column_geometry: ColumnGeometry, + breeder_material: BreederMaterial, + operating_params: OperatingParameters, + sparging_params: SpargingParameters, + ): + input_objects = [ + column_geometry, + breeder_material, + operating_params, + sparging_params, + ] + resolved_parameters = {} + required_keys = ( + cls.__dataclass_fields__.keys() + ) # these parameters will be used to solve the model + + for required_key in required_keys: + find_in_graph(required_key, resolved_parameters, graph=input_objects) + + return cls(**{arg: resolved_parameters[arg] for arg in required_keys}) + + def __str__(self): + return "\n\t".join( + [ + f"{name}: {value}" + for name in self.__dataclass_fields__ + for value in [getattr(self, name)] + ] + ) + + +def find_in_graph( + required_node: str, + discovered_nodes: dict, + graph: List[ + SpargingParameters | OperatingParameters | BreederMaterial | ColumnGeometry + ], +) -> None: + """Abstracts SimulationInput construction as a graph search problem. "Correlation" object are seen as a path to the corresponding node + - required_node: parameter we want to obtain (e.g. h_l) + - discovered_nodes: already discovered parameters as pint.Quantity + - graph: list of objects in which to search + - returns the updated discovered_nodes with the required_node added + """ + # first check if the required node is already discovered + if required_node in discovered_nodes: + logger.info(f"Found required node '{required_node}' in discovered nodes...") + return + + # then check if the required node is given as input (either as a pint.Quantity or as a Correlation) + if (result := check_input(required_node, graph)) is None: + # if it's not, look for default correlation + if required_node in all_correlations: + result = all_correlations(required_node) + logger.info( + f"Found default correlation for required node '{required_node}': {result.identifier}" + ) + else: + raise ValueError( + f"Could not find path to required node '{required_node}' in the graph or in the default correlations" + ) + if isinstance(result, Correlation): + result = resolve_correlation( + corr=result, resolved_quantities=discovered_nodes, graph=graph + ) # also update discovered_nodes with the nodes possibly discovered during recursive search + + assert isinstance(result, pint.Quantity), ( + f"Result for required node '{required_node}' is not a pint.Quantity after resolution, got {result} of type {type(result)}" + ) + discovered_nodes.update({required_node: result}) + + +def check_input( + required_node: str, input_objs: list[object] +) -> pint.Quantity | Correlation | None: + """look for pint.Quantity or Correlation given in input objects""" + result = None + for object in input_objs: + # scan for the required node in the attributes of the object + if (result := getattr(object, required_node, None)) is not None: + if isinstance(result, pint.Quantity): + # required node was found + logger.info( + f"Found Quantity for required node '{required_node}' in graph: {result}" + ) + break + elif isinstance(result, Correlation): + logger.info( + f"Found correlation for required node '{required_node}' in graph: {result.identifier}" + ) + break + else: + raise ValueError( + f"In check_input: found result for '{required_node}': but expected a Correlation or a pint.Quantity, got {result} of type {type(result)}" + ) + return result + + +def resolve_correlation( + corr: Correlation, resolved_quantities: dict, graph: list[object] +) -> pint.Quantity: + corr_args = inspect.signature(corr.function).parameters.keys() + for arg in corr_args: + logger.info( + f"Resolving argument '{arg}' for correlation '{corr.identifier}'..." + ) + find_in_graph(arg, resolved_quantities, graph) + + assert all(arg in resolved_quantities for arg in corr_args), ( + f"Could not resolve all arguments for correlation '{corr.identifier}'. " + f"Missing arguments: {[arg for arg in corr_args if arg not in resolved_quantities]}" + ) + + return corr(**{arg: resolved_quantities[arg] for arg in corr_args}) diff --git a/src/sparging/model.py b/src/sparging/model.py index 8844b76..94fc5bd 100644 --- a/src/sparging/model.py +++ b/src/sparging/model.py @@ -8,18 +8,22 @@ from petsc4py import PETSc from dataclasses import dataclass -import warnings from datetime import datetime # from dolfinx import log import yaml import sparging.helpers as helpers import json -import pint -import inspect -import sparging.correlations as c +from pathlib import Path -from sparging.config import ureg, const_R, const_g, VERBOSE +from sparging.config import ureg + +from sparging.inputs import SimulationInput + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import pint hours_to_seconds = 3600 days_to_seconds = 24 * hours_to_seconds @@ -27,205 +31,11 @@ T_to_T2 = 1 / T2_to_T EPS = 1e-26 -# VERBOSE = False SEPARATOR_KEYWORD = "from" # log.set_log_level(log.LogLevel.INFO) -class SimulationInput: - P_0: pint.Quantity - eps_l: pint.Quantity - eps_g: pint.Quantity - E_g: pint.Quantity - D_l: pint.Quantity - h_l: pint.Quantity - K_s: pint.Quantity - tank_height: pint.Quantity - tank_diameter: pint.Quantity - nozzle_diameter: pint.Quantity - nb_nozzle: int - T: pint.Quantity - flow_g: pint.Quantity - d_b: pint.Quantity - Eo: pint.Quantity - Mo: pint.Quantity - Sc: pint.Quantity - Re: pint.Quantity - - def __init__(self, input_dict: dict): - self.input_dict = input_dict.copy() - self.quantities_dict = { - "inputed": {}, - "computed": {}, - } # to keep track of inputed and calculated quantities and the correlation used, for output purposes - # TODO useless, can use self.__dict__ - - # -- System parameters -- - self.tank_height = self._get("tank_height").to("metre") - self.tank_diameter = self._get("tank_diameter").to("metre") - self.tank_area = np.pi * (self.tank_diameter / 2) ** 2 - self.tank_volume = self.tank_area * self.tank_height - self.source_T = self._get("source_T").to( - "molT/s/m**3" - ) # tritium generation source term - self.nozzle_diameter = self._get("nozzle_diameter").to("millimetre") - self.nb_nozzle = self._get("nb_nozzle") - self.P_top = self._get("P_top").to("bar") - self.T = self._get("T").to("kelvin") # temperature - self.flow_g = self._get("flow_g").to("mol/s") # inlet gas flow rate [mol/s] - - # -- FLiBe and tritium physical properties -- - self.rho_l = self._get("rho_l").to("kg/m**3") - self.mu_l = self._get("mu_l").to("Pa*s") - self.sigma_l = self._get("sigma_l").to("N/m") - self.nu_l = self.mu_l / self.rho_l - self.D_l = self._get("D_l").to("m**2/s") - self.K_s = self._get("K_s").to("mol/m**3/Pa") - self.P_0 = (self.P_top + self.rho_l * const_g * self.tank_height).to("bar") - # gas inlet pressure [Pa] = hydrostatic pressure at the bottom of the tank (neglecting gas fraction) - self.flow_g_vol = (self.flow_g.to("mol/s") * const_R * self.T / self.P_0).to( - "m**3/s" - ) # inlet gas volumetric flow rate - - # -- bubbles properties -- - self.d_b = self._get("d_b").to("millimetre") # bubble diameter - he_molar_mass = ureg("4.003e-3 kg/mol") - self.rho_g = (self.P_0 * he_molar_mass / (const_R * self.T)).to( - "kg/m**3" - ) # bubbles density, using ideal gas law for He - self.drho = self.rho_l - self.rho_g # density difference between liquid and gas - - self.Eo = self._get("Eo") # Eotvos number - self.Mo = self._get("Mo") # Morton number - self.Sc = self._get("Sc") # Schmidt number - self.Re = self._get("Re") # Reynolds number - - self.u_g0 = self._get("u_g0").to("m/s") # mean bubble velocity - - self.Re = self._get("Re").to( - "dimensionless" - ) # update Reynolds number with the calculated bubble velocity - - self.eps_g = self._get("eps_g").to("dimensionless") # gas void fraction - self.eps_l = 1 - self.eps_g - self.a = (6 * self.eps_g / self.d_b).to("1/m") # specific interfacial area - - self.h_l = self._get("h_l", corr_name="h_l_briggs").to( - "m/s" - ) # mass transfer coefficient - - self.E_g = self._get("E_g").to( - "m**2/s" - ) # gas phase axial dispersion coefficient - - if input_dict: - warnings.warn(f"Unused input parameters:{input_dict}") - - if VERBOSE: - print(self) - - def _get( - self, - key: str, - corr_name: str = None, - ): - """get a parameter value from input_dict, or compute it from correlation if not specified in input_dict - - key: name of the parameter to get in the input_dict - - corr_name: name of the correlation to use if not specified in input_dict, will be the key itself by default - """ - # try keys (several names possible in input dictionary, for retrocompatibility) - if key in self.input_dict.keys(): - print(key, f"found {key} in input_dict") - quantity = get_quantity_or_correlation(self.input_dict, key) - if callable(quantity): - # if it's a correlation function, compute the quantity using the correlation - return quantity(self) - return quantity - - else: - quantity = self.get_quantity_from_default_correlation(key, corr_name) - - if VERBOSE: - print(f"{key} = {quantity} \t calculated using default correlation") - - return quantity - - # TODO simplify this - def get_quantity_from_default_correlation(self, key, corr_name=None): - # nothing found in input_dict -> use default correlation - while True: - # default corr_name is the name of the key itself - corr_name = key if corr_name is None else corr_name - # compute quantity using correlation, and if needed retrieve quantities it depends on - if corr_name not in c.correlations_dict.keys(): - raise KeyError( - f"Correlation for '{key}' not found in correlations dictionary. Missing a required input or wrongcorrelation name. Available correlations: {list(c.correlations_dict.keys())}" - ) - try: - # compute the quantity using the default correlation function - quantity = c.correlations_dict[corr_name](self) - break - except AttributeError as e: - missing_attr = str(e).split("attribute '")[1].split("'")[0] - print(f"AttributeError: missing attribute '{missing_attr}'") - setattr( - self, missing_attr, self._get(missing_attr) - ) # recursively get missing attributes - - return quantity - - def __str__(self): - members = inspect.getmembers(self) - return "\n".join( - [ - f"{name}: {value}" - for name, value in members - if not name.startswith("_") and not inspect.isfunction(value) - ] - ) - - -def find_correlation_from_library(corr_name) -> callable: - if corr_name not in c.correlations_dict: - raise KeyError( - f"Correlation '{corr_name}' not found in correlations dictionary. Available correlations: {list(c.correlations_dict.keys())}" - ) - - func = c.correlations_dict[corr_name] - assert callable(func), f"Correlation '{corr_name}' is not a function" - return func - - -def get_correlation_from_string(s) -> callable | None: - """ - If the input is a string that contains the keyword `SEPARATOR_KEYWORD`, we assume it's a correlation name - and return the corresponding function from the correlations library, otherwise return None - """ - # if the value is a string that contains keyword "from", we assume it's a correlation name - if isinstance(s, str) and SEPARATOR_KEYWORD in s: - return s.split(SEPARATOR_KEYWORD + " ")[1] - return None - - -def get_quantity_or_correlation(input_dict, key) -> callable | pint.Quantity: - value = input_dict[key] - input_dict.pop(key) # to keep track of unused input parameters, if any - - corr_name = get_correlation_from_string(value) - if corr_name: - corr_func = find_correlation_from_library(corr_name) - return corr_func - - else: - # The quantity is directly provided as a quantity - # TODO implement logger - if VERBOSE: - print(f"{key} = {value} \t provided by input") - quantity = ureg.parse_expression(str(value)) - return quantity - - @dataclass class SimulationResults: times: list @@ -250,13 +60,7 @@ class SimulationResults: "sim_input", ] - # FIXME - namespace = { - "ramp": lambda s, e: helpers.string_to_ramp(times, s, e), - "step": lambda s: helpers.string_to_step(times, s), - } - - def to_yaml(self, output_path: str): + def to_yaml(self, output_path: Path): sim_dict = self.sim_input.__dict__.copy() helpers.setup_yaml() @@ -267,13 +71,10 @@ def to_yaml(self, output_path: str): "date": datetime.now().isoformat(), }, } - sim_dict.pop( - "quantities_dict" - ) # remove quantities_dict from input for cleaner output - output["input"] = {} + output["simulation parameters"] = {} for key, value in sim_dict.items(): - output["input"][key] = str(value) + output["simulation parameters"][key] = str(value) output["results"] = self.__dict__.copy() # remove c_T2_solutions and y_T2_solutions from results to avoid dumping large arrays in yaml, they can be saved separately if needed @@ -283,8 +84,8 @@ def to_yaml(self, output_path: str): with open(output_path, "w") as f: yaml.dump(output, f, sort_keys=False) - def to_json(self, output_path: str): - sim_dict = self.sim_input.quantities_dict.copy() + def to_json(self, output_path: Path): + sim_dict = self.sim_input.__dict__.copy() # structure the output output = { @@ -293,10 +94,9 @@ def to_json(self, output_path: str): "date": datetime.now().isoformat(), }, } - if sim_dict.get("inputed"): - output["input parameters"] = sim_dict["inputed"] - if sim_dict.get("computed"): - output["calculated properties"] = sim_dict["computed"] + output["simulation parameters"] = {} + for key, value in sim_dict.items(): + output["simulation parameters"][key] = str(value) output["results"] = self.__dict__.copy() # remove c_T2_solutions and y_T2_solutions from results to avoid dumping large arrays in yaml, they can be saved separately if needed @@ -314,7 +114,7 @@ def to_json(self, output_path: str): with open(output_path, "w") as f: json.dump(output, f, indent=3) - def profiles_to_csv(self, output_path: str): + def profiles_to_csv(self, output_path: Path): """save c_T2 and y_T2 profiles at all time steps to csv files, one for c_T2 and one for y_T2, with columns for each time step""" import pandas as pd @@ -328,232 +128,238 @@ def profiles_to_csv(self, output_path: str): df_c_T2[f"c_T2_t{i}"] = c_T2_profile df_y_T2[f"y_T2_t{i}"] = y_T2_profile - df_c_T2.to_csv(output_path + "_c_T2.csv", index=False) - df_y_T2.to_csv(output_path + "_y_T2.csv", index=False) - - -def solve( - input: SimulationInput, t_final: float, t_irr: float | list, t_sparging: list = None -): - dt = 0.2 * ureg("hours").to("seconds").magnitude - # unpack parameters - tank_height = input.tank_height.to("m").magnitude - tank_area = input.tank_area.to("m**2").magnitude - tank_volume = input.tank_volume.to("m**3").magnitude - a = input.a.to("1/m").magnitude - h_l = input.h_l.to("m/s").magnitude - K_s = input.K_s.to("mol/m**3/Pa").magnitude - P_0 = input.P_0.to("Pa").magnitude - T = input.T.to("K").magnitude - eps_g = input.eps_g.to("dimensionless").magnitude - eps_l = input.eps_l.to("dimensionless").magnitude - E_g = input.E_g.to("m**2/s").magnitude - D_l = input.D_l.to("m**2/s").magnitude - u_g0 = input.u_g0.to("m/s").magnitude - # convert T generation rate to T2 generation rate for the gas phase [mol T2 /m3/s], - # assuming bred T immediately combines to T2 - source_T2 = input.source_T.to("molT2/s/m**3").magnitude - - # tank_area = np.pi * (params["D"] / 2) ** 2 - # tank_volume = tank_area * tank_height - - # MESH AND FUNCTION SPACES - mesh = dolfinx.mesh.create_interval( - MPI.COMM_WORLD, 1000, points=[0, input.tank_height.magnitude] - ) - fdim = mesh.topology.dim - 1 - cg_el = basix.ufl.element("Lagrange", mesh.basix_cell(), degree=1, shape=(2,)) - - V = dolfinx.fem.functionspace(mesh, cg_el) - - u = dolfinx.fem.Function(V) - u_n = dolfinx.fem.Function(V) - v_c, v_y = ufl.TestFunctions(V) - - c_T2, y_T2 = ufl.split(u) - c_T2_n, y_T2_n = ufl.split(u_n) - - vel_x = u_g0 # TODO velocity should vary with hydrostatic pressure - vel = dolfinx.fem.Constant(mesh, PETSc.ScalarType([vel_x])) - - """ vel = fem_func(U) - v,interpolate(lambda x: v0 + 2*x[0])""" - - h_l_const = dolfinx.fem.Constant(mesh, PETSc.ScalarType(h_l)) - - gen_T2 = dolfinx.fem.Constant( - mesh, PETSc.ScalarType(source_T2) - ) # generation term [mol T2 /m3/s] - - # VARIATIONAL FORMULATION - - # mass transfer rate - J_T2 = ( - a * h_l_const * (c_T2 - K_s * (P_0 * y_T2 + EPS)) - ) # TODO pressure shouldn't be a constant (use hydrostatic pressure profile), how to deal with this ? -> use fem.Expression ? - - F = 0 # variational formulation - - # transient terms - F += eps_l * ((c_T2 - c_T2_n) / dt) * v_c * ufl.dx - F += eps_g * 1 / (const.R * T) * (P_0 * (y_T2 - y_T2_n) / dt) * v_y * ufl.dx - - # diffusion/dispersion terms #TODO shouldn't use D_l, transport of T in liquid is dominated by dispersive effects due to gas sparging, find dispersion coeff for steady liquid in gas bubbles - F += eps_l * D_l * ufl.dot(ufl.grad(c_T2), ufl.grad(v_c)) * ufl.dx - - # NOTE remove diffusive term for gas for now for mass balance - # F += eps_g * E_g * ufl.dot(ufl.grad(P_0 * y_T2), ufl.grad(v_y)) * ufl.dx - - # mass exchange (coupling term) - F += J_T2 * v_c * ufl.dx - J_T2 * v_y * ufl.dx - - # Generation term in the breeder - F += -gen_T2 * v_c * ufl.dx - - # advection of gas - F += 1 / (const.R * T) * ufl.inner(ufl.dot(ufl.grad(P_0 * y_T2), vel), v_y) * ufl.dx - - # BOUNDARY CONDITIONS - gas_inlet_facets = dolfinx.mesh.locate_entities_boundary( - mesh, fdim, lambda x: np.isclose(x[0], 0.0) - ) - gas_outlet_facets = dolfinx.mesh.locate_entities_boundary( - mesh, fdim, lambda x: np.isclose(x[0], tank_height) - ) - bc1 = dolfinx.fem.dirichletbc( - dolfinx.fem.Constant(mesh, 0.0), - dolfinx.fem.locate_dofs_topological(V.sub(1), fdim, gas_inlet_facets), - V.sub(1), - ) - bc2 = dolfinx.fem.dirichletbc( - dolfinx.fem.Constant(mesh, 0.0), - dolfinx.fem.locate_dofs_topological(V.sub(0), fdim, gas_outlet_facets), - V.sub(0), - ) - - # Custom measure - all_facets = np.concatenate((gas_inlet_facets, gas_outlet_facets)) - all_tags = np.concatenate( - (np.full_like(gas_inlet_facets, 1), np.full_like(gas_outlet_facets, 2)) - ) - facet_markers = dolfinx.mesh.meshtags(mesh, fdim, all_facets, all_tags) - ds = ufl.Measure("ds", domain=mesh, subdomain_data=facet_markers) - - # set up problem - problem = NonlinearProblem( - F, - u, - bcs=[bc1, bc2], - petsc_options_prefix="librasparge", - # petsc_options={"snes_monitor": None}, - ) - - # initialise post processing - V0_ct, ct_dofs = u.function_space.sub(0).collapse() - coords = V0_ct.tabulate_dof_coordinates()[:, 0] - ct_sort_coords = np.argsort(coords) - x_ct = coords[ct_sort_coords] - - V0_y, y_dofs = u.function_space.sub(1).collapse() - coords = V0_y.tabulate_dof_coordinates()[:, 0] - y_sort_coords = np.argsort(coords) - x_y = coords[y_sort_coords] - - times = [] - c_T2_solutions = [] - y_T2_solutions = [] - sources_T2 = [] - fluxes_T2 = [] - inventories_T2_salt = [] - - # SOLVE - t = 0 - while t < t_final: - if isinstance(t_irr, (int, float)): - if t >= t_irr: - gen_T2.value = 0.0 - else: - if t >= t_irr[0] and t < t_irr[1]: - gen_T2.value = source_T2 - else: - gen_T2.value = 0.0 - """ utiliser ufl.conditional TODO""" - if t_sparging is not None: - if t >= t_sparging[0] and t < t_sparging[1]: - h_l_const.value = h_l - else: - h_l_const.value = 0.0 - - problem.solve() - - # update previous solution - u_n.x.array[:] = u.x.array[:] - - # post process - c_T2_post, y_T2_post = u.split() - - c_T2_vals = u.x.array[ct_dofs][ct_sort_coords] - y_T2_vals = u.x.array[y_dofs][y_sort_coords] - - # store time and solution - # TODO give units to the results - times.append(t) - c_T2_solutions.append(c_T2_vals.copy()) - y_T2_solutions.append(y_T2_vals.copy()) - sources_T2.append( - gen_T2.value.copy() * tank_volume - ) # total T generation rate in the tank [mol/s] - - flux_T2 = dolfinx.fem.assemble_scalar( - dolfinx.fem.form( - tank_area * vel_x * P_0 / (const.R * T) * y_T2_post * ds(2) + df_c_T2.to_csv(output_path.joinpath("_c_T2.csv"), index=False) + df_y_T2.to_csv(output_path.joinpath("_y_T2.csv"), index=False) + + +@dataclass +class Simulation: + sim_input: SimulationInput + t_final: pint.Quantity + signal_irr: callable[pint.Quantity] + signal_sparging: callable[pint.Quantity] + profile_source_T: callable[pint.Quantity] | None = None + + def solve(self, dt: pint.Quantity | None = None, dx: pint.Quantity | None = None): + # unpack pint.Quantities + t_final = self.t_final.to("seconds").magnitude + tank_height = self.sim_input.height.to("m").magnitude + tank_area = self.sim_input.area.to("m**2").magnitude + tank_volume = self.sim_input.volume.to("m**3").magnitude + a = self.sim_input.a.to("1/m").magnitude + h_l = self.sim_input.h_l.to("m/s").magnitude + K_s = self.sim_input.K_s.to("mol/m**3/Pa").magnitude + P_0 = self.sim_input.P_bottom.to("Pa").magnitude + T = self.sim_input.temperature.to("K").magnitude + eps_g = self.sim_input.eps_g.to("dimensionless").magnitude + E_g = self.sim_input.E_g.to("m**2/s").magnitude + D_l = self.sim_input.D_l.to("m**2/s").magnitude + u_g0 = self.sim_input.u_g0.to("m/s").magnitude + source_T2 = self.sim_input.source_T.to("molT2/s/m**3").magnitude + + dt = dt.to("seconds").magnitude if dt is not None else t_final / 1000 + dx = dx.to("m").magnitude if dx is not None else tank_height / 1000 + eps_l = 1 - eps_g + + # MESH AND FUNCTION SPACES + mesh = dolfinx.mesh.create_interval( + MPI.COMM_WORLD, int(tank_height / dx), points=[0, tank_height] + ) + fdim = mesh.topology.dim - 1 + cg_el = basix.ufl.element("Lagrange", mesh.basix_cell(), degree=1, shape=(2,)) + profile_el = basix.ufl.element("Lagrange", mesh.basix_cell(), degree=1) + + V = dolfinx.fem.functionspace(mesh, cg_el) + V_profile = dolfinx.fem.functionspace(mesh, profile_el) + + u = dolfinx.fem.Function(V) + u_n = dolfinx.fem.Function(V) + v_c, v_y = ufl.TestFunctions(V) + + c_T2, y_T2 = ufl.split(u) + c_T2_n, y_T2_n = ufl.split(u_n) + + vel_x = u_g0 # TODO velocity should vary with hydrostatic pressure + vel = dolfinx.fem.Constant(mesh, PETSc.ScalarType([vel_x])) + + h_l_const = dolfinx.fem.Constant(mesh, PETSc.ScalarType(h_l)) + + gen_T2_mag = dolfinx.fem.Constant( + mesh, source_T2 * self.signal_irr(0 * ureg.s) + ) # magnitude of the generation term + gen_T2 = None + if self.profile_source_T is not None: # spatially varying profile is provided + gen_T2_prof = dolfinx.fem.Function(V_profile) + gen_T2_prof.interpolate( + lambda x: x[0] * 0 + self.profile_source_T(x[0] * ureg.m) + ) + gen_T2 = gen_T2_mag * gen_T2_prof + else: # homogeneous generation + gen_T2 = gen_T2_mag + + # VARIATIONAL FORMULATION + + # mass transfer rate + J_T2 = ( + a * h_l_const * (c_T2 - K_s * (P_0 * y_T2 + EPS)) + ) # TODO pressure shouldn't be a constant (use hydrostatic pressure profile), how to deal with this ? -> use fem.Expression ? + + F = 0 # variational formulation + + # transient terms + F += eps_l * ((c_T2 - c_T2_n) / dt) * v_c * ufl.dx + F += eps_g * 1 / (const.R * T) * (P_0 * (y_T2 - y_T2_n) / dt) * v_y * ufl.dx + + # diffusion/dispersion terms #TODO shouldn't use D_l, transport of T in liquid is dominated by dispersive effects due to gas sparging, find dispersion coeff for steady liquid in gas bubbles + F += eps_l * D_l * ufl.dot(ufl.grad(c_T2), ufl.grad(v_c)) * ufl.dx + + # NOTE remove diffusive term for gas for now for mass balance + # F += eps_g * E_g * ufl.dot(ufl.grad(P_0 * y_T2), ufl.grad(v_y)) * ufl.dx + + # mass exchange (coupling term) + F += J_T2 * v_c * ufl.dx - J_T2 * v_y * ufl.dx + + # Generation term in the breeder + F += -gen_T2 * v_c * ufl.dx + + # advection of gas + F += ( + 1 + / (const.R * T) + * ufl.inner(ufl.dot(ufl.grad(P_0 * y_T2), vel), v_y) + * ufl.dx + ) + + # BOUNDARY CONDITIONS + gas_inlet_facets = dolfinx.mesh.locate_entities_boundary( + mesh, fdim, lambda x: np.isclose(x[0], 0.0) + ) + gas_outlet_facets = dolfinx.mesh.locate_entities_boundary( + mesh, fdim, lambda x: np.isclose(x[0], tank_height) + ) + bc1 = dolfinx.fem.dirichletbc( + dolfinx.fem.Constant(mesh, 0.0), + dolfinx.fem.locate_dofs_topological(V.sub(1), fdim, gas_inlet_facets), + V.sub(1), + ) + bc2 = dolfinx.fem.dirichletbc( + dolfinx.fem.Constant(mesh, 0.0), + dolfinx.fem.locate_dofs_topological(V.sub(0), fdim, gas_outlet_facets), + V.sub(0), + ) + + # Custom measure + all_facets = np.concatenate((gas_inlet_facets, gas_outlet_facets)) + all_tags = np.concatenate( + (np.full_like(gas_inlet_facets, 1), np.full_like(gas_outlet_facets, 2)) + ) + facet_markers = dolfinx.mesh.meshtags(mesh, fdim, all_facets, all_tags) + ds = ufl.Measure("ds", domain=mesh, subdomain_data=facet_markers) + + # set up problem + problem = NonlinearProblem( + F, + u, + bcs=[bc1, bc2], + petsc_options_prefix="librasparge", + # petsc_options={"snes_monitor": None}, + ) + + # initialise post processing + V0_ct, ct_dofs = u.function_space.sub(0).collapse() + coords = V0_ct.tabulate_dof_coordinates()[:, 0] + ct_sort_coords = np.argsort(coords) + x_ct = coords[ct_sort_coords] + + V0_y, y_dofs = u.function_space.sub(1).collapse() + coords = V0_y.tabulate_dof_coordinates()[:, 0] + y_sort_coords = np.argsort(coords) + x_y = coords[y_sort_coords] + + times = [] + c_T2_solutions = [] + y_T2_solutions = [] + sources_T2 = [] + fluxes_T2 = [] + inventories_T2_salt = [] + + # SOLVE + t = 0 + while t < t_final: + gen_T2_mag.value = source_T2 * self.signal_irr(t * ureg.s) + h_l_const.value = h_l * self.signal_sparging(t * ureg.s) + """ utiliser ufl.conditional TODO""" + + problem.solve() + + # update previous solution + u_n.x.array[:] = u.x.array[:] + + # post process + c_T2_post, y_T2_post = u.split() + + c_T2_vals = u.x.array[ct_dofs][ct_sort_coords] + y_T2_vals = u.x.array[y_dofs][y_sort_coords] + + # store time and solution + # TODO give units to the results + times.append(t) + c_T2_solutions.append(c_T2_vals.copy()) + y_T2_solutions.append(y_T2_vals.copy()) + sources_T2.append( + source_T2 * self.signal_irr(t * ureg.s) * tank_volume + ) # total T generation rate in the tank [mol/s] TODO useless: signal_irr is already given + + flux_T2 = dolfinx.fem.assemble_scalar( + dolfinx.fem.form( + tank_area * vel_x * P_0 / (const.R * T) * y_T2_post * ds(2) + ) + ) # total T flux at the outlet [mol/s] + + # flux_T_inlet = dolfinx.fem.assemble_scalar( + # dolfinx.fem.form( + # tank_area + # * E_g + # * P_0 + # / (const.R * T) + # * y_T2_post.dx(0) + # * T2_to_T + # * ds(1) + # ) + # ) # total T dispersive flux at the inlet [mol/s] + + n = ufl.FacetNormal(mesh) + flux_T2_inlet = dolfinx.fem.assemble_scalar( + dolfinx.fem.form(-E_g * ufl.inner(ufl.grad(P_0 * y_T2_post), n) * ds(1)) + ) # total T dispersive flux at the inlet [Pa T2 /s/m2] + flux_T2_inlet *= 1 / (const.R * T) * T2_to_T # convert to mol T/s/m2 + flux_T2_inlet *= tank_area # convert to mol T/s + + # fluxes_T2.append(flux_T2 + flux_T2_inlet) + fluxes_T2.append(flux_T2) + + inventory_T2_salt = dolfinx.fem.assemble_scalar( + dolfinx.fem.form(c_T2_post * ufl.dx) ) - ) # total T flux at the outlet [mol/s] - - # flux_T_inlet = dolfinx.fem.assemble_scalar( - # dolfinx.fem.form( - # tank_area - # * E_g - # * P_0 - # / (const.R * T) - # * y_T2_post.dx(0) - # * T2_to_T - # * ds(1) - # ) - # ) # total T dispersive flux at the inlet [mol/s] - - n = ufl.FacetNormal(mesh) - flux_T2_inlet = dolfinx.fem.assemble_scalar( - dolfinx.fem.form(-E_g * ufl.inner(ufl.grad(P_0 * y_T2_post), n) * ds(1)) - ) # total T dispersive flux at the inlet [Pa T2 /s/m2] - flux_T2_inlet *= 1 / (const.R * T) * T2_to_T # convert to mol T/s/m2 - flux_T2_inlet *= tank_area # convert to mol T/s - - # fluxes_T2.append(flux_T2 + flux_T2_inlet) - fluxes_T2.append(flux_T2) - - inventory_T2_salt = dolfinx.fem.assemble_scalar( - dolfinx.fem.form(c_T2_post * ufl.dx) + inventory_T2_salt *= tank_area # get total amount of T2 in [mol] + inventories_T2_salt.append(inventory_T2_salt) + + # advance time + t += dt + + inventories_T2_salt = np.array(inventories_T2_salt) + + # TODO reattach units using wrapping + # https://pint.readthedocs.io/en/stable/advanced/performance.html#a-safer-method-wrapping + results = SimulationResults( + times=times, + c_T2_solutions=c_T2_solutions, + y_T2_solutions=y_T2_solutions, + x_ct=x_ct, + x_y=x_y, + inventories_T2_salt=inventories_T2_salt, + source_T2=sources_T2, + fluxes_T2=fluxes_T2, + sim_input=self.sim_input, ) - inventory_T2_salt *= tank_area # get total amount of T2 in [mol] - inventories_T2_salt.append(inventory_T2_salt) - - # advance time - t += dt - - inventories_T2_salt = np.array(inventories_T2_salt) - - # TODO reattach units using wrapping - # https://pint.readthedocs.io/en/stable/advanced/performance.html#a-safer-method-wrapping - results = SimulationResults( - times=times, - c_T2_solutions=c_T2_solutions, - y_T2_solutions=y_T2_solutions, - x_ct=x_ct, - x_y=x_y, - inventories_T2_salt=inventories_T2_salt, - source_T2=sources_T2, - fluxes_T2=fluxes_T2, - sim_input=input, - ) - return results + return results diff --git a/test/standard_input.json b/test/standard_input.json new file mode 100644 index 0000000..17b74fa --- /dev/null +++ b/test/standard_input.json @@ -0,0 +1,14 @@ +{ + "height": "1.000e+00 m", + "area": "2.000e-01 m ** 2", + "u_g0": "2.182e-01 m / s", + "temperature": "6.000e+02 \u00b0C", + "a": "5.925e-01 / m", + "h_l": "2.750e-05 m / s", + "K_s": "6.366e-04 mol / m ** 3 / Pa", + "P_bottom": "1.208e+05 Pa", + "eps_g": "4.091e-04", + "E_g": "1.111e-02 m ** 2 / s", + "D_l": "2.857e-09 m ** 2 / s", + "source_T": "8.303e-16 molT / m ** 3 / s" +} \ No newline at end of file diff --git a/test/test_find_in_graph.reference.log b/test/test_find_in_graph.reference.log new file mode 100644 index 0000000..cf43ccb --- /dev/null +++ b/test/test_find_in_graph.reference.log @@ -0,0 +1,17 @@ +INFO:sparging.inputs:Found default correlation for required node 'drho': drho +INFO:sparging.inputs:Resolving argument 'rho_l' for correlation 'drho'... +INFO:sparging.inputs:Found default correlation for required node 'rho_l': rho_l +INFO:sparging.inputs:Resolving argument 'temperature' for correlation 'rho_l'... +INFO:sparging.inputs:Found Quantity for required node 'temperature' in graph: 6.000e+02 °C +INFO:sparging.inputs:Resolving argument 'rho_g' for correlation 'drho'... +INFO:sparging.inputs:Found default correlation for required node 'rho_g': rho_g +INFO:sparging.inputs:Resolving argument 'temperature' for correlation 'rho_g'... +INFO:sparging.inputs:Found required node 'temperature' in discovered nodes... +INFO:sparging.inputs:Resolving argument 'P_bottom' for correlation 'rho_g'... +INFO:sparging.inputs:Found default correlation for required node 'P_bottom': P_bottom +INFO:sparging.inputs:Resolving argument 'P_top' for correlation 'P_bottom'... +INFO:sparging.inputs:Found Quantity for required node 'P_top' in graph: 1.000e+00 atm +INFO:sparging.inputs:Resolving argument 'rho_l' for correlation 'P_bottom'... +INFO:sparging.inputs:Found required node 'rho_l' in discovered nodes... +INFO:sparging.inputs:Resolving argument 'height' for correlation 'P_bottom'... +INFO:sparging.inputs:Found Quantity for required node 'height' in graph: 1.000e+00 m diff --git a/test/test_input.yml b/test/test_input.yml deleted file mode 100644 index b2afa2e..0000000 --- a/test/test_input.yml +++ /dev/null @@ -1,16 +0,0 @@ -# LIBRA Pi input -tank_height: 1 m -tank_diameter: 0.5 m # m, tank diameter -nozzle_diameter: 0.001 m # m -nb_nozzle: 10 # number of nozzles at bottom of the tank -n_gen_rate: 1.0e+09 neutron/s # neutron generation rate -tbr: 0.1 triton / neutron # tritium breeding ratio -P_top: 1 bar # Pa, gas pressure at the top of the tank -T: 600 celsius # temperature [K] = 600°C temperature -flow_g: 400 sccm -# h_l: from h_l_malara - -dt: 0.2 hours -time_end : 6 days -signal_sparging : step('24 hours') -signal_irradiation: step('0 hours') - step('96 hours') \ No newline at end of file diff --git a/test/test_simulation_input.py b/test/test_simulation_input.py index bb9be7e..f669cc6 100644 --- a/test/test_simulation_input.py +++ b/test/test_simulation_input.py @@ -1,27 +1,295 @@ import sparging +from sparging.config import ureg +from sparging import all_correlations, CorrelationGroup, Correlation +from sparging.inputs import ( + ColumnGeometry, + BreederMaterial, + OperatingParameters, + SpargingParameters, + find_in_graph, + check_input, + SimulationInput, +) +import pytest +import dataclasses +import difflib +import logging +from pathlib import Path -def test_get(): +# define standard LIBRA input parameters to be used in multiple tests +geom = ColumnGeometry( + area=0.2 * ureg.m**2, + height=1.0 * ureg.m, + nozzle_diameter=0.001 * ureg.m, + nb_nozzle=10 * ureg.dimensionless, +) + +flibe = BreederMaterial( + name="FLiBe", +) + +operating_params = OperatingParameters( + temperature=600 * ureg.celsius, + P_top=1 * ureg.atm, + flow_g_mol=400 * ureg.sccm, + tbr=0.1 * ureg("triton / neutron"), + n_gen_rate=1e9 * ureg("neutron / s"), +) + +sparging_params = SpargingParameters( + h_l=all_correlations("h_l_briggs"), +) + + +def test_from_parameters_success(tmp_path): + """ + Test that SimulationInput.from_parameters successfully creates a SimulationInput object from minimal input objects + and that the generated SimulationInput is consistent with the standard input it should yield + """ + sim_input = SimulationInput.from_parameters( + geom, flibe, operating_params, sparging_params + ) + + assert isinstance(sim_input, SimulationInput), ( + "Expected from_parameters to return a SimulationInput instance" + ) + # Check that all fields are populated and have the correct types + for field in dataclasses.fields(sim_input): + value = getattr(sim_input, field.name) + assert isinstance(value, ureg.Quantity), ( + f"Expected field '{field.name}' to be a pint.Quantity, got {type(value)}" + ) + + reference_path = Path(__file__).with_name("standard_input.json") + generated_path = Path(tmp_path).joinpath("generated_input.json") + + sim_input.to_json(generated_path) + + generated_text = generated_path.read_text(encoding="utf-8") + reference_text = reference_path.read_text(encoding="utf-8") + + diff = "\n".join( + difflib.unified_diff( + reference_text.splitlines(), + generated_text.splitlines(), + fromfile=str(reference_path.name), + tofile=str(generated_path.name), + lineterm="", + ) + ) + assert generated_text == reference_text, f"Log output mismatch:\n{diff}" + + +def test_find_in_graph_logging(tmp_path): + """ + Test that the `find_in_graph` function logs the expected output when searching for a parameter in the graph. + """ + # BUILD + reference_log_path = Path(__file__).with_name("test_find_in_graph.reference.log") + generated_log_path = Path(tmp_path).joinpath("test_find_in_graph.generated.log") + + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s:%(name)s:%(message)s", + handlers=[logging.FileHandler(generated_log_path, mode="w")], + force=True, # reset handlers so pytest/previous tests don't interfere + ) + + # RUN + find_in_graph("drho", {}, [geom, flibe, operating_params, sparging_params]) + + # TEST + logging.shutdown() + + assert reference_log_path.exists(), ( + f"Reference log not found at {reference_log_path}. " + f"Create/update it from {generated_log_path} once output is validated." + ) + + generated_text = generated_log_path.read_text(encoding="utf-8") + reference_text = reference_log_path.read_text(encoding="utf-8") + + diff = "\n".join( + difflib.unified_diff( + reference_text.splitlines(), + generated_text.splitlines(), + fromfile=str(reference_log_path.name), + tofile=str(generated_log_path.name), + lineterm="", + ) + ) + assert generated_text == reference_text, f"Log output mismatch:\n{diff}" + + +@pytest.mark.parametrize("in_discovered", (True, False)) +def test_find_in_graph_result(in_discovered: bool): + """ + Test finding a node in the graph. + This test checks that the `find_in_graph` function can successfully find the `d_b` parameter + using the provided `ColumnGeometry` and `OperatingParameters`. It also tests both cases where + `flow_g_vol` is provided in the discovered nodes and where it is not, ensuring that the + function can handle both scenarios correctly. + """ + # BUILD + discovered_nodes = {} + if in_discovered: + discovered_nodes["flow_g_vol"] = 0.01 * ureg.m**3 / ureg.s + + # RUN + find_in_graph( + "d_b", + discovered_nodes=discovered_nodes, + graph=[geom, operating_params], + ) + + # TEST + assert "d_b" in discovered_nodes, "Expected to find d_b in graph" + + correlation = sparging.all_correlations("d_b") + + flow_g_vol = discovered_nodes.get("flow_g_vol") + expected_value = correlation( + flow_g_vol=flow_g_vol, + nozzle_diameter=geom.nozzle_diameter, + nb_nozzle=geom.nb_nozzle, + ) + + assert discovered_nodes["d_b"] == expected_value, ( + f"Expected d_b to be {expected_value}, got {discovered_nodes['d_b']}" + ) + + +@pytest.mark.parametrize("missing_param", ("nb_nozzle", "flow_g_mol", "n_gen_rate")) +def test_find_in_graph_unresolvable(missing_param: str): + """ + Test that find_in_graph raises an error when a parameter cannot be resolved. + """ # BUILD - input_dict = {"h_l": "from h_l_malara"} + broken_geom = dataclasses.replace(geom) + broken_op_params = dataclasses.replace(operating_params) + to_find = str() + match missing_param: + case "nb_nozzle": + to_find = "d_b" + setattr(broken_geom, missing_param, None) + case "flow_g_mol": + to_find = "d_b" + setattr(broken_op_params, missing_param, None) + case "n_gen_rate": + to_find = "source_T" + setattr(broken_op_params, missing_param, None) # RUN - quantity = sparging.model.get_quantity_or_correlation(input_dict, "h_l") + with pytest.raises( + ValueError, + match=f"Could not find path to required node '{missing_param}' in the graph or in the default correlations", + ): + find_in_graph( + to_find, + discovered_nodes={}, # no discovered nodes provided + graph=[ + broken_geom, + broken_op_params, + ], # missing necessary parameters for d_b correlation + ) + +@pytest.mark.parametrize("required_node", ("flow_g_mol", "non_existent_param")) +def test_check_input_none(required_node: str): + """ + Test that check_input returns None when the required node is not found in the graph. + """ + # BUILD + broken_op_params = dataclasses.replace(operating_params, flow_g_mol=None) + # RUN + result = check_input(required_node, [geom, broken_op_params]) # TEST - assert callable(quantity), f"Expected a correlation function, got {quantity}" + assert result is None, f"Expected None for non-existent parameter, got {result}" -# def test_get_source_T(): -# # BUILD -# params = sparging.helpers.get_input("test/test_input.yml") -# # RUN -# quantity = sparging.model.get_quantity_or_correlation(params, "source_T") +def test_check_input_unexpected_type(): + """ + Test that check_input raises an error when it finds a parameter in the graph but it is not a pint.Quantity or Correlation. + """ + # BUILD + broken_op_params = dataclasses.replace(operating_params, tbr=13) + # RUN + result = -1 + with pytest.raises( + ValueError, + match="In check_input: found result for 'tbr': but expected a Correlation or a pint.Quantity, got 13 of type ", + ): + result = check_input("tbr", [geom, broken_op_params]) + assert False, f"Expected ValueError, got {result}" + + +def test_correlation_group(): + """ + Test that CorrelationGroup __contains__ and __call__ behave as expected. + """ + # BUILD + corr_str = "dummy_corr" + corr_str_typo = "dummy_coor" + corr_str_wrong_type = ureg.Quantity(5, ureg.s) + + my_corr_obj = Correlation( + identifier=corr_str, + function=lambda: None, + corr_type="dummy", + input_units=[], + ) + corr_group = CorrelationGroup([my_corr_obj]) -# # TEST -# assert callable(quantity), "Expected a correlation function" + # TEST __contains__ + if corr_str in corr_group: # should be found + pass + if my_corr_obj in corr_group: # should be found + pass + if corr_str_typo in corr_group: # should not be found + assert False, ( + f"Expected '{corr_str_typo}' not to be found in CorrelationGroup, but it was" + ) + with pytest.raises( + TypeError, + match="Type not valid for correlation group membership check, expected str or Correlation, got ", + ): + if corr_str_wrong_type in corr_group: # wrong type, should raise TypeError + pass + # TEST __call__ + correlation_found = corr_group(corr_str) # return a Correlation + assert isinstance(correlation_found, Correlation), ( + "Expected to find dummy_corr Correlation" + ) + + with pytest.raises( + ValueError, + match=f"Correlation with identifier {corr_str_typo} not found", + ): + corr_group(corr_str_typo) # non-existent correlation, should raise ValueError + + +def test_correlation(): + """ + Test Correlation raises an error when: + - providing wrong input (missing arguments/wrong dimensionality)""" -def test_get_from_file(): # BUILD - params = sparging.helpers.get_input("test/test_input.yml") - sparging.model.SimulationInput(params) + my_corr_obj = Correlation( + identifier="dummy_corr", + function=lambda a, b: a * ureg.s, + corr_type="dummy", + input_units=[ + "m" + ], # TODO should raise an error if input_units length doesn't match function arguments when constructing the correlation + ) + + # RUN + assert isinstance(my_corr_obj(a=5 * ureg.m, b=None), ureg.Quantity), ( + "Expected a pint.Quantity as output" + ) # should work + + with pytest.raises(ValueError, match="expected a pint.Quantity"): + my_corr_obj(a=5, b=None) # wrong input type, should be a pint.Quantity + with pytest.raises(ValueError, match="expected dimensions"): + my_corr_obj(a=5 * ureg.s, b=None) # wrong units, should raise ValueError diff --git a/test/test_solve.py b/test/test_solve.py index 6b66d3d..aceb223 100644 --- a/test/test_solve.py +++ b/test/test_solve.py @@ -1,34 +1,85 @@ -import sparging.model as model -from sparging.helpers import get_input +from sparging.model import Simulation +from sparging.inputs import SimulationInput +from sparging.config import ureg import pytest +import dataclasses +from pint import DimensionalityError -def test_model_solve(): +standard_input = SimulationInput( + height=1.0 * ureg.m, + area=0.2 * ureg.m**2, + u_g0=0.25 * ureg("m/s"), + temperature=600 * ureg.celsius, + a=0.5 * ureg("1/m"), + h_l=3e-5 * ureg("m/s"), + K_s=1e-4 * ureg("mol/m**3/Pa"), + P_bottom=1.2 * ureg.bar, + eps_g=0.001 * ureg.dimensionless, + E_g=1e-2 * ureg("m^2/s"), + D_l=3e-9 * ureg("m^2/s"), + source_T=8e-16 * ureg("molT/m^3/s"), +) + + +@pytest.fixture +def standard_simulation(): + return Simulation( + standard_input, + t_final=6 * ureg.hours, + signal_irr=lambda t: 1 if t > 1 * ureg.hour and t < 3 * ureg.hour else 0, + signal_sparging=lambda t: 1, + ) + + +def test_model_solve_successfull(tmp_path, standard_simulation): """ Tests that `model.solve` runs without errors for a simple test case. Does not check results. + Also tests successful exporting results to yaml, json and csv files. """ - params = get_input("test/test_input.yml") - sim_input = model.SimulationInput(params) - # TODO integrate to input file - t_sparging_hr = [24, 1e20] # time interval when sparger is ON - t_irr_hr = [0, 96] # time interval when irradiation is ON - t_final = 1 * model.days_to_seconds + output = standard_simulation.solve(dt=0.05 * ureg.hour, dx=0.01 * ureg.m) + from pathlib import Path - model.solve( - sim_input, - t_final=t_final, - t_irr=[t * model.hours_to_seconds for t in t_irr_hr], - t_sparging=[t * model.hours_to_seconds for t in t_sparging_hr], - ) + output.to_yaml(Path(tmp_path).joinpath("dummy.yaml")) + output.to_json(Path(tmp_path).joinpath("dummy.json")) + output.profiles_to_csv(Path(tmp_path)) -def test_model_solve_incomplete_input(): +def test_model_solve_missing_input(standard_simulation): """ - Tests SimulationInput raises error when required input is missing. + Tests SimulationInput raises error when a required input quantity is missing. """ - params = get_input("test/test_input.yml") - params.pop("tank_diameter") # remove bubble velocity to test default value + # BUILD + broken_input = dataclasses.replace(standard_input) + del broken_input.u_g0 # missing required parameter + standard_simulation.sim_input = broken_input + + # TEST + with pytest.raises( + AttributeError, match="'SimulationInput' object has no attribute 'u_g0'" + ): + standard_simulation.solve(dt=0.05 * ureg.hour, dx=0.01 * ureg.m) + - with pytest.raises(KeyError, match="Missing a required input"): - model.SimulationInput(params) +def test_model_solve_wrong_input(standard_simulation): + """ + Tests Simulation.solve() raises error when required input has wrong dimensionality + """ + # BUILD + broken_input = dataclasses.replace( + standard_input, u_g0=3 * ureg("m^2/s") + ) # wrong units + standard_simulation.sim_input = broken_input + + # TEST + with pytest.raises(DimensionalityError, match="Cannot convert from"): + standard_simulation.solve(dt=0.05 * ureg.hour, dx=0.01 * ureg.m) + + +def test_model_solve_wrong_argument(standard_simulation): + """ + Tests Simulation.solve() can't be given a timestep without specifying the units + """ + with pytest.raises(AttributeError, match="object has no attribute 'to'"): + standard_simulation.solve(dt=0.01)