From 74d7a1cb92ec2f1fa2d7c92c246fa123e5bc2e3a Mon Sep 17 00:00:00 2001 From: Henrik Finsberg Date: Tue, 5 May 2026 09:22:36 +0200 Subject: [PATCH 1/2] Add 3-element windkessel model and a corresponding demo --- examples/regazzoni.py | 21 ------ examples/windkessel_demo.py | 70 ++++++++++++++++++++ src/circulation/__init__.py | 12 +++- src/circulation/windkessel.py | 120 ++++++++++++++++++++++++++++++++++ tests/test_models.py | 12 ++++ 5 files changed, 212 insertions(+), 23 deletions(-) create mode 100644 examples/windkessel_demo.py create mode 100644 src/circulation/windkessel.py diff --git a/examples/regazzoni.py b/examples/regazzoni.py index 1337660..9416acb 100644 --- a/examples/regazzoni.py +++ b/examples/regazzoni.py @@ -9,27 +9,6 @@ setup_logging() circulation = Regazzoni2020() -from scipy.integrate import solve_ivp -import numpy as np - - -# res = solve_ivp( -# circulation.rhs, -# [0, 5], -# circulation.state_arr, -# t_eval=np.linspace(0, 5, 1000), -# method="RK45", -# max_step=1e-3, -# ) -# plt.plot(res.y[0, :]) -# plt.plot(res.y[1, :]) -# plt.show() -# breakpoint() - - - -# circulation.print_info() -# history = circulation.results history = circulation.solve(num_beats=10) circulation.print_info() diff --git a/examples/windkessel_demo.py b/examples/windkessel_demo.py new file mode 100644 index 0000000..fecbd84 --- /dev/null +++ b/examples/windkessel_demo.py @@ -0,0 +1,70 @@ +# %% [markdown] +# # The 3-Element Windkessel Circulation Model +# +# This tutorial demonstrates the 0D `ThreeElementWindkessel` model. +# It runs fully standalone using a simple Time-Varying Elastance (TVE) +# model for the Left Ventricle, requiring no external mechanics libraries. +# +# We will simulate two different preload states to observe the basic +# Frank-Starling mechanism (higher filling pressure -> larger stroke volume). + +import logging +import matplotlib.pyplot as plt +import numpy as np +from circulation.windkessel import ThreeElementWindkessel +from circulation.units import ureg + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") + +def main(): + mmHg = ureg("mmHg") + + # Setup two models with different preloads, explicitly using units + params_low = ThreeElementWindkessel.default_parameters() + params_low["P_venous"] = 6.0 * mmHg + model_low = ThreeElementWindkessel(parameters=params_low) + + params_high = ThreeElementWindkessel.default_parameters() + params_high["P_venous"] = 12.0 * mmHg + model_high = ThreeElementWindkessel(parameters=params_high) + + # Solve for 10 beats (add_units=False by default, so solve() strips units internally) + hist_low = model_low.solve(num_beats=10, dt=1e-3) + hist_high = model_high.solve(num_beats=10, dt=1e-3) + + # Extract the final stable beat + # model_low.HR accesses the stripped float value (1.25) + samples = int(1.0 / model_low.HR / 1e-3) + slc = slice(-samples, None) + t_plot = np.arange(samples) * 1e-3 + + # Plotting + fig, axs = plt.subplots(1, 2, figsize=(14, 6)) + + # Panel 1: PV Loops + axs[0].plot(hist_low["V_LV"][slc], hist_low["p_LV"][slc], 'b-', lw=2.5, label="Low Preload (6 mmHg)") + axs[0].plot(hist_high["V_LV"][slc], hist_high["p_LV"][slc], 'r-', lw=2.5, label="High Preload (12 mmHg)") + axs[0].set_xlabel("Left Ventricular Volume (mL)", weight="bold") + axs[0].set_ylabel("Pressure (mmHg)", weight="bold") + axs[0].set_title("Pressure-Volume Loops", weight="bold") + axs[0].grid(True, alpha=0.3) + axs[0].legend() + + # Panel 2: Wiggers Diagram (High Preload) + axs[1].plot(t_plot, hist_high["p_LV"][slc], 'r-', lw=2.5, label="LV Pressure") + axs[1].plot(t_plot, hist_high["p_ao"][slc], 'k--', lw=2.0, label="Aortic Pressure") + axs[1].plot(t_plot, hist_high["p_c"][slc], 'g:', lw=2.0, label="Capacitor Pressure (p_c)") + axs[1].axhline(12.0, color='gray', linestyle=':', label="Atrial Pressure") + axs[1].set_xlabel("Time (s)", weight="bold") + axs[1].set_ylabel("Pressure (mmHg)", weight="bold") + axs[1].set_title("Wiggers Diagram (High Preload)", weight="bold") + axs[1].grid(True, alpha=0.3) + axs[1].legend() + + plt.tight_layout() + plt.savefig("windkessel_0D_demo.png", dpi=300) + logging.info("Saved plot to windkessel_0D_demo.png") + plt.show() + +if __name__ == "__main__": + main() diff --git a/src/circulation/__init__.py b/src/circulation/__init__.py index e515ebb..eb58c4d 100644 --- a/src/circulation/__init__.py +++ b/src/circulation/__init__.py @@ -1,3 +1,11 @@ -from . import base, log, regazzoni2020, zenker, units, time_varying_elastance +from . import base, log, regazzoni2020, zenker, units, time_varying_elastance, windkessel -__all__ = ["base", "log", "regazzoni2020", "zenker", "units", "time_varying_elastance"] +__all__ = [ + "base", + "log", + "regazzoni2020", + "zenker", + "units", + "time_varying_elastance", + "windkessel", +] diff --git a/src/circulation/windkessel.py b/src/circulation/windkessel.py new file mode 100644 index 0000000..41fec90 --- /dev/null +++ b/src/circulation/windkessel.py @@ -0,0 +1,120 @@ +from . import base +from . import units + +mL = units.ureg("mL") +mmHg = units.ureg("mmHg") +s = units.ureg("s") + + +class ThreeElementWindkessel(base.CirculationModel): + """ + A 0D model of the Left Ventricle coupled to a 3-Element Windkessel system. + + The 3-Element Windkessel (WK3) models the systemic arterial tree using: + - R_c: Characteristic resistance (proximal aortic impedance) + - C: Arterial compliance (vessel elasticity) + - R_p: Peripheral resistance (microcirculation) + """ + + def __init__(self, parameters=None, p_LV_func=None, **kwargs): + super().__init__(parameters, **kwargs) + + if p_LV_func is not None: + self.p_LV_func = p_LV_func + self._E_LV = lambda t: 1.0 # Dummy function, not used + else: + # Use default time-varying elastance model if no custom mechanics are provided + self._E_LV = self.time_varying_elastance(**self.parameters["chambers"]["LV"]) + self.p_LV_func = lambda V, t: ( + self._E_LV(t) * (V - self.parameters["chambers"]["LV"]["V0"]) + ) + + self._initialize() + + @property + def HR(self) -> float: + return self.parameters["HR"] + + @staticmethod + def default_parameters(): + return { + "HR": 1.25 * units.ureg("Hz"), # Heart rate in Hz (75 BPM) + "R_c": 0.05 * mmHg * s / mL, # Characteristic resistance (mmHg*s/mL) + "R_p": 1.05 * mmHg * s / mL, # Peripheral resistance (mmHg*s/mL) + "C": 1.3 * mL / mmHg, # Arterial compliance (mL/mmHg) + "P_venous": 8.0 * mmHg, # Preload / Left Atrial Pressure (mmHg) + "R_mitral": 0.01 * mmHg * s / mL, # Mitral valve resistance + "R_aortic": 0.01 * mmHg * s / mL, # Aortic valve resistance + "chambers": { + "LV": { + "EA": 2.5 * mmHg / mL, # Max elastance (mmHg/mL) + "EB": 0.08 * mmHg / mL, # Min elastance (mmHg/mL) + "TC": 0.25 * s, # Contraction duration (s) + "TR": 0.15 * s, # Relaxation duration (s) + "tC": 0.0 * s, # Time of contraction onset (s) + "V0": 15.0 * mL, # Unstressed volume (mL) + } + }, + } + + @staticmethod + def default_initial_conditions() -> dict: + return { + "V_LV": 140.0 * mL, # Initial left ventricular volume + "p_c": 80.0 * mmHg, # Pressure in the arterial compliance capacitor + } + + @staticmethod + def state_names() -> list: + return ["V_LV", "p_c"] + + @staticmethod + def var_names() -> list: + return ["p_LV", "p_ao", "Q_in", "Q_out", "Q_p"] + + def update_static_variables(self, t, y): + V_LV = y[0] + p_c = y[1] + + p_LV = self.p_LV_func(V_LV, t) + + P_venous = self.parameters["P_venous"] + R_mitral = self.parameters["R_mitral"] + R_aortic = self.parameters["R_aortic"] + R_c = self.parameters["R_c"] + R_p = self.parameters["R_p"] + + # 1. Mitral Flow (Diastolic Filling) + Q_in = max(0.0, (P_venous - p_LV) / R_mitral) + + # 2. Aortic Flow (Systolic Ejection) + # In a WK3 model, aortic pressure P_ao = p_c + Q_out * R_c + # If the valve is open (p_LV > p_ao), we substitute P_ao and solve for Q_out + if p_LV > p_c: + Q_out = (p_LV - p_c) / (R_aortic + R_c) + else: + Q_out = 0.0 + + # 3. Pressures and Peripheral Flow + p_ao = p_c + Q_out * R_c + Q_p = p_c / R_p # Discharge to venous pool (assumed 0 mmHg pressure drop reference) + + var = self._get_var(t) + var[0] = p_LV + var[1] = p_ao + var[2] = Q_in + var[3] = Q_out + var[4] = Q_p + return var + + def rhs(self, t, y): + var = self.update_static_variables(t, y) + Q_in = var[2] + Q_out = var[3] + Q_p = var[4] + C = self.parameters["C"] + + self.dy[0] = Q_in - Q_out + self.dy[1] = (Q_out - Q_p) / C + + return self.dy diff --git a/tests/test_models.py b/tests/test_models.py index 4d4cc1a..4540c42 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -24,3 +24,15 @@ def test_Regazzoni2020(): for k, v in model.default_initial_conditions().items(): assert k in results assert results[k][0] == v.magnitude + + +def test_ThreeElementWindkessel(): + model = circulation.windkessel.ThreeElementWindkessel() + results = model.solve(T=1.0, dt=1e-3, dt_eval=0.1) + + for k, v in results.items(): + assert len(v) == len(results["time"]), k + + for k, v in model.default_initial_conditions().items(): + assert k in results + assert results[k][0] == v.magnitude From 3967d5813f157f75dc3c4a201da35fe0f1c0f838 Mon Sep 17 00:00:00 2001 From: Henrik Finsberg Date: Tue, 5 May 2026 09:39:13 +0200 Subject: [PATCH 2/2] Install pint from pypi in workflow --- .github/workflows/main.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index dc8391d..1bd42f2 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -25,7 +25,6 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install circulation run: | - python3 -m pip install git+https://github.com/hgrecco/pint.git python -m pip install -e ".[test]" - name: Test with pytest run: |