Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install circulation
run: |
python3 -m pip install git+https://github.com/hgrecco/pint.git
python -m pip install -e ".[test]"
- name: Test with pytest
run: |
Expand Down
21 changes: 0 additions & 21 deletions examples/regazzoni.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,6 @@
setup_logging()
circulation = Regazzoni2020()

from scipy.integrate import solve_ivp
import numpy as np


# res = solve_ivp(
# circulation.rhs,
# [0, 5],
# circulation.state_arr,
# t_eval=np.linspace(0, 5, 1000),
# method="RK45",
# max_step=1e-3,
# )
# plt.plot(res.y[0, :])
# plt.plot(res.y[1, :])
# plt.show()
# breakpoint()



# circulation.print_info()
# history = circulation.results

history = circulation.solve(num_beats=10)
circulation.print_info()
Expand Down
70 changes: 70 additions & 0 deletions examples/windkessel_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# %% [markdown]
# # The 3-Element Windkessel Circulation Model
#
# This tutorial demonstrates the 0D `ThreeElementWindkessel` model.
# It runs fully standalone using a simple Time-Varying Elastance (TVE)
# model for the Left Ventricle, requiring no external mechanics libraries.
#
# We will simulate two different preload states to observe the basic
# Frank-Starling mechanism (higher filling pressure -> larger stroke volume).

import logging
import matplotlib.pyplot as plt
import numpy as np
from circulation.windkessel import ThreeElementWindkessel
from circulation.units import ureg

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def main():
mmHg = ureg("mmHg")

# Setup two models with different preloads, explicitly using units
params_low = ThreeElementWindkessel.default_parameters()
params_low["P_venous"] = 6.0 * mmHg
model_low = ThreeElementWindkessel(parameters=params_low)

params_high = ThreeElementWindkessel.default_parameters()
params_high["P_venous"] = 12.0 * mmHg
model_high = ThreeElementWindkessel(parameters=params_high)

# Solve for 10 beats (add_units=False by default, so solve() strips units internally)
hist_low = model_low.solve(num_beats=10, dt=1e-3)
hist_high = model_high.solve(num_beats=10, dt=1e-3)

# Extract the final stable beat
# model_low.HR accesses the stripped float value (1.25)
samples = int(1.0 / model_low.HR / 1e-3)
slc = slice(-samples, None)
t_plot = np.arange(samples) * 1e-3

# Plotting
fig, axs = plt.subplots(1, 2, figsize=(14, 6))

# Panel 1: PV Loops
axs[0].plot(hist_low["V_LV"][slc], hist_low["p_LV"][slc], 'b-', lw=2.5, label="Low Preload (6 mmHg)")
axs[0].plot(hist_high["V_LV"][slc], hist_high["p_LV"][slc], 'r-', lw=2.5, label="High Preload (12 mmHg)")
axs[0].set_xlabel("Left Ventricular Volume (mL)", weight="bold")
axs[0].set_ylabel("Pressure (mmHg)", weight="bold")
axs[0].set_title("Pressure-Volume Loops", weight="bold")
axs[0].grid(True, alpha=0.3)
axs[0].legend()

# Panel 2: Wiggers Diagram (High Preload)
axs[1].plot(t_plot, hist_high["p_LV"][slc], 'r-', lw=2.5, label="LV Pressure")
axs[1].plot(t_plot, hist_high["p_ao"][slc], 'k--', lw=2.0, label="Aortic Pressure")
axs[1].plot(t_plot, hist_high["p_c"][slc], 'g:', lw=2.0, label="Capacitor Pressure (p_c)")
axs[1].axhline(12.0, color='gray', linestyle=':', label="Atrial Pressure")
axs[1].set_xlabel("Time (s)", weight="bold")
axs[1].set_ylabel("Pressure (mmHg)", weight="bold")
axs[1].set_title("Wiggers Diagram (High Preload)", weight="bold")
axs[1].grid(True, alpha=0.3)
axs[1].legend()

plt.tight_layout()
plt.savefig("windkessel_0D_demo.png", dpi=300)
logging.info("Saved plot to windkessel_0D_demo.png")
plt.show()

if __name__ == "__main__":
main()
12 changes: 10 additions & 2 deletions src/circulation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from . import base, log, regazzoni2020, zenker, units, time_varying_elastance
from . import base, log, regazzoni2020, zenker, units, time_varying_elastance, windkessel

__all__ = ["base", "log", "regazzoni2020", "zenker", "units", "time_varying_elastance"]
__all__ = [
"base",
"log",
"regazzoni2020",
"zenker",
"units",
"time_varying_elastance",
"windkessel",
]
120 changes: 120 additions & 0 deletions src/circulation/windkessel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from . import base
from . import units

mL = units.ureg("mL")
mmHg = units.ureg("mmHg")
s = units.ureg("s")


class ThreeElementWindkessel(base.CirculationModel):
"""
A 0D model of the Left Ventricle coupled to a 3-Element Windkessel system.

The 3-Element Windkessel (WK3) models the systemic arterial tree using:
- R_c: Characteristic resistance (proximal aortic impedance)
- C: Arterial compliance (vessel elasticity)
- R_p: Peripheral resistance (microcirculation)
"""

def __init__(self, parameters=None, p_LV_func=None, **kwargs):
super().__init__(parameters, **kwargs)

if p_LV_func is not None:
self.p_LV_func = p_LV_func
self._E_LV = lambda t: 1.0 # Dummy function, not used
else:
# Use default time-varying elastance model if no custom mechanics are provided
self._E_LV = self.time_varying_elastance(**self.parameters["chambers"]["LV"])
self.p_LV_func = lambda V, t: (
self._E_LV(t) * (V - self.parameters["chambers"]["LV"]["V0"])
)

self._initialize()

@property
def HR(self) -> float:
return self.parameters["HR"]

@staticmethod
def default_parameters():
return {
"HR": 1.25 * units.ureg("Hz"), # Heart rate in Hz (75 BPM)
"R_c": 0.05 * mmHg * s / mL, # Characteristic resistance (mmHg*s/mL)
"R_p": 1.05 * mmHg * s / mL, # Peripheral resistance (mmHg*s/mL)
"C": 1.3 * mL / mmHg, # Arterial compliance (mL/mmHg)
"P_venous": 8.0 * mmHg, # Preload / Left Atrial Pressure (mmHg)
"R_mitral": 0.01 * mmHg * s / mL, # Mitral valve resistance
"R_aortic": 0.01 * mmHg * s / mL, # Aortic valve resistance
"chambers": {
"LV": {
"EA": 2.5 * mmHg / mL, # Max elastance (mmHg/mL)
"EB": 0.08 * mmHg / mL, # Min elastance (mmHg/mL)
"TC": 0.25 * s, # Contraction duration (s)
"TR": 0.15 * s, # Relaxation duration (s)
"tC": 0.0 * s, # Time of contraction onset (s)
"V0": 15.0 * mL, # Unstressed volume (mL)
}
},
}

@staticmethod
def default_initial_conditions() -> dict:
return {
"V_LV": 140.0 * mL, # Initial left ventricular volume
"p_c": 80.0 * mmHg, # Pressure in the arterial compliance capacitor
}

@staticmethod
def state_names() -> list:
return ["V_LV", "p_c"]

@staticmethod
def var_names() -> list:
return ["p_LV", "p_ao", "Q_in", "Q_out", "Q_p"]

def update_static_variables(self, t, y):
V_LV = y[0]
p_c = y[1]

p_LV = self.p_LV_func(V_LV, t)

P_venous = self.parameters["P_venous"]
R_mitral = self.parameters["R_mitral"]
R_aortic = self.parameters["R_aortic"]
R_c = self.parameters["R_c"]
R_p = self.parameters["R_p"]

# 1. Mitral Flow (Diastolic Filling)
Q_in = max(0.0, (P_venous - p_LV) / R_mitral)

# 2. Aortic Flow (Systolic Ejection)
# In a WK3 model, aortic pressure P_ao = p_c + Q_out * R_c
# If the valve is open (p_LV > p_ao), we substitute P_ao and solve for Q_out
if p_LV > p_c:
Q_out = (p_LV - p_c) / (R_aortic + R_c)
else:
Q_out = 0.0

# 3. Pressures and Peripheral Flow
p_ao = p_c + Q_out * R_c
Q_p = p_c / R_p # Discharge to venous pool (assumed 0 mmHg pressure drop reference)

var = self._get_var(t)
var[0] = p_LV
var[1] = p_ao
var[2] = Q_in
var[3] = Q_out
var[4] = Q_p
return var

def rhs(self, t, y):
var = self.update_static_variables(t, y)
Q_in = var[2]
Q_out = var[3]
Q_p = var[4]
C = self.parameters["C"]

self.dy[0] = Q_in - Q_out
self.dy[1] = (Q_out - Q_p) / C

return self.dy
12 changes: 12 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,15 @@ def test_Regazzoni2020():
for k, v in model.default_initial_conditions().items():
assert k in results
assert results[k][0] == v.magnitude


def test_ThreeElementWindkessel():
model = circulation.windkessel.ThreeElementWindkessel()
results = model.solve(T=1.0, dt=1e-3, dt_eval=0.1)

for k, v in results.items():
assert len(v) == len(results["time"]), k

for k, v in model.default_initial_conditions().items():
assert k in results
assert results[k][0] == v.magnitude
Loading