Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 88 additions & 16 deletions rocketpy/rocket/parachute.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,21 @@ class Parachute:
- The string "apogee" which triggers the parachute at apogee, i.e.,
when the rocket reaches its highest point and starts descending.

- The string "launch + X" where X is a number in seconds. The parachute
will be ejected X seconds after launch (t=0). This is useful for
simulating delay charges that activate at a fixed time from launch.

- The string "burnout + X" where X is a number in seconds. The parachute
will be ejected X seconds after motor burnout. This is useful for
simulating delay charges in motors with delay elements.

Parachute.triggerfunc : function
Trigger function created from the trigger used to evaluate the trigger
condition for the parachute ejection system. It is a callable function
that takes three arguments: Freestream pressure in Pa, Height above
ground level in meters, and the state vector of the simulation. The
returns ``True`` if the parachute ejection system should be triggered
and ``False`` otherwise.
that takes six arguments: Freestream pressure in Pa, Height above
ground level in meters, the state vector of the simulation, sensors
list, current time t, and rocket object. It returns ``True`` if the
parachute ejection system should be triggered and ``False`` otherwise.

.. note:

Expand Down Expand Up @@ -153,7 +160,14 @@ def __init__(
height above ground level.
- The string "apogee" which triggers the parachute at apogee, i.e., \
when the rocket reaches its highest point and starts descending.

- The string "launch + X" where X is the delay in seconds from launch. \
For example, "launch + 5" triggers 5 seconds after launch. This is \
useful for simulating delay charges that activate at a fixed time \
from launch.
- The string "burnout + X" where X is the delay in seconds from motor \
burnout. For example, "burnout + 3.5" triggers 3.5 seconds after \
motor burnout. This is useful for simulating delay charges in motors \
with delay elements.
.. note::

The function will be called according to the sampling rate specified.
Expand Down Expand Up @@ -232,35 +246,93 @@ def __evaluate_trigger_function(self, trigger):
sig = signature(triggerfunc)
if len(sig.parameters) == 3:

def triggerfunc(p, h, y, sensors):
def triggerfunc(p, h, y, sensors, t=None, rocket=None):
return trigger(p, h, y)
elif len(sig.parameters) == 4:

def triggerfunc(p, h, y, sensors, t=None, rocket=None):
return trigger(p, h, y, sensors)

self.triggerfunc = triggerfunc

elif isinstance(trigger, (int, float)):
# The parachute is deployed at a given height
def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument
def triggerfunc(p, h, y, sensors, t=None, rocket=None): # pylint: disable=unused-argument
# p = pressure considering parachute noise signal
# h = height above ground level considering parachute noise signal
# y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3]
return y[5] < 0 and h < trigger

self.triggerfunc = triggerfunc

elif trigger.lower() == "apogee":
# The parachute is deployed at apogee
def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument
# p = pressure considering parachute noise signal
# h = height above ground level considering parachute noise signal
# y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3]
return y[5] < 0
elif isinstance(trigger, str):
trigger_lower = trigger.lower().strip()

if trigger_lower == "apogee":
# The parachute is deployed at apogee
def triggerfunc(p, h, y, sensors, t=None, rocket=None): # pylint: disable=unused-argument
# p = pressure considering parachute noise signal
# h = height above ground level considering parachute noise signal
# y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3]
return y[5] < 0

self.triggerfunc = triggerfunc

elif "+" in trigger_lower:
# Time-based trigger: "launch + X" or "burnout + X"
parts = trigger_lower.split("+")
if len(parts) != 2:
raise ValueError(
f"Invalid time-based trigger format for parachute '{self.name}'. "
+ "Expected format: 'launch + delay' or 'burnout + delay' "
+ "where delay is a number in seconds."
)

event = parts[0].strip()
try:
delay = float(parts[1].strip())
except ValueError:
raise ValueError(
f"Invalid delay value in trigger '{trigger}' for parachute '{self.name}'. "
+ "Delay must be a number in seconds."
)
Comment on lines +291 to +298
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation doesn't validate that the delay value is non-negative. Users could potentially specify negative delays like "launch + -5" or "burnout + -3.5", which would create parachutes that trigger before the event occurs. While the trigger would technically work (it would immediately trigger if the current time is greater than the negative target time), this is likely a user error that should be caught during initialization.

Consider adding validation to ensure delay >= 0, or document that negative delays are intentionally supported if there's a valid use case.

Copilot uses AI. Check for mistakes.

if event == "launch":
# Deploy at launch time + delay
def triggerfunc(p, h, y, sensors, t=None, rocket=None): # pylint: disable=unused-argument
if t is None:
return False
return t >= delay

self.triggerfunc = triggerfunc

elif event == "burnout":
# Deploy at motor burnout time + delay
def triggerfunc(p, h, y, sensors, t=None, rocket=None): # pylint: disable=unused-argument
if t is None or rocket is None:
return False
burnout_time = rocket.motor.burn_out_time
return t >= burnout_time + delay

Comment on lines +314 to +316
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The burnout trigger implementation doesn't handle the case where a rocket might not have a motor or when accessing motor.burn_out_time might fail. While the trigger returns False if rocket is None, it doesn't handle potential AttributeError if rocket.motor doesn't exist or if burn_out_time is not set.

Consider adding defensive checks like:

  • Verify hasattr(rocket, 'motor') before accessing rocket.motor
  • Verify hasattr(rocket.motor, 'burn_out_time') or handle the case where burn_out_time might be None
  • Add a try-except block to gracefully handle unexpected cases

This would make the implementation more robust against edge cases or incomplete rocket configurations.

Suggested change
burnout_time = rocket.motor.burn_out_time
return t >= burnout_time + delay
# Defensive checks: rocket may not have a motor or burn_out_time
motor = getattr(rocket, "motor", None)
if motor is None:
return False
burnout_time = getattr(motor, "burn_out_time", None)
if burnout_time is None:
return False
try:
return t >= burnout_time + delay
except (TypeError, ValueError, AttributeError):
# If burnout_time is not a valid number or attribute access fails,
# do not trigger the parachute instead of raising an exception.
return False

Copilot uses AI. Check for mistakes.
self.triggerfunc = triggerfunc

else:
raise ValueError(
f"Invalid time-based trigger event '{event}' for parachute '{self.name}'. "
+ "Supported events are 'launch' and 'burnout'."
)

self.triggerfunc = triggerfunc
else:
raise ValueError(
f"Unable to set the trigger function for parachute '{self.name}'. "
+ "Trigger string must be 'apogee', 'launch + <delay>', or 'burnout + <delay>'. "
+ "See the Parachute class documentation for more information."
)

else:
raise ValueError(
f"Unable to set the trigger function for parachute '{self.name}'. "
+ "Trigger must be a callable, a float value or the string 'apogee'. "
+ "Trigger must be a callable, a float value or a string. "
+ "See the Parachute class documentation for more information."
)

Expand Down
125 changes: 125 additions & 0 deletions rocketpy/simulation/flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,8 @@ def __simulate(self, verbose):
height_above_ground_level,
self.y_sol,
self.sensors,
node.t,
self.rocket,
):
# Remove parachute from flight parachutes
self.parachutes.remove(parachute)
Expand Down Expand Up @@ -800,6 +802,125 @@ def __simulate(self, verbose):
if self.__check_simulation_events(phase, phase_index, node_index):
break # Stop if simulation termination event occurred

# List and feed overshootable time nodes
if self.time_overshoot:
# Initialize phase overshootable time nodes
overshootable_nodes = self.TimeNodes()
# Add overshootable parachute time nodes
overshootable_nodes.add_parachutes(
self.parachutes, self.solution[-2][0], self.t
)
# Add last time node (always skipped)
overshootable_nodes.add_node(self.t, [], [], [])
if len(overshootable_nodes) > 1:
# Sort and merge equal overshootable time nodes
overshootable_nodes.sort()
overshootable_nodes.merge()
# Clear if necessary
if overshootable_nodes[0].t == phase.t and phase.clear:
overshootable_nodes[0].parachutes = []
overshootable_nodes[0].callbacks = []
# Feed overshootable time nodes trigger
interpolator = phase.solver.dense_output()
for (
overshootable_index,
overshootable_node,
) in self.time_iterator(overshootable_nodes):
# Calculate state at node time
overshootable_node.y_sol = interpolator(
overshootable_node.t
)
for parachute in overshootable_node.parachutes:
# Calculate and save pressure signal
(
noisy_pressure,
height_above_ground_level,
) = self.__calculate_and_save_pressure_signals(
parachute,
overshootable_node.t,
overshootable_node.y_sol[2],
)

# Check for parachute trigger
if parachute.triggerfunc(
noisy_pressure,
height_above_ground_level,
overshootable_node.y_sol,
self.sensors,
overshootable_node.t,
self.rocket,
):
# Remove parachute from flight parachutes
self.parachutes.remove(parachute)
# Create phase for time after detection and
# before inflation
# Must only be created if parachute has any lag
i = 1
if parachute.lag != 0:
self.flight_phases.add_phase(
overshootable_node.t,
phase.derivative,
clear=True,
index=phase_index + i,
)
i += 1
# Create flight phase for time after inflation
callbacks = [
lambda self,
parachute_cd_s=parachute.cd_s: setattr(
self, "parachute_cd_s", parachute_cd_s
),
lambda self,
parachute_radius=parachute.radius: setattr(
self,
"parachute_radius",
parachute_radius,
),
lambda self,
parachute_height=parachute.height: setattr(
self,
"parachute_height",
parachute_height,
),
lambda self,
parachute_porosity=parachute.porosity: setattr(
self,
"parachute_porosity",
parachute_porosity,
),
lambda self,
added_mass_coefficient=parachute.added_mass_coefficient: setattr(
self,
"parachute_added_mass_coefficient",
added_mass_coefficient,
),
]
self.flight_phases.add_phase(
overshootable_node.t + parachute.lag,
self.u_dot_parachute,
callbacks,
clear=False,
index=phase_index + i,
)
# Rollback history
self.t = overshootable_node.t
self.y_sol = overshootable_node.y_sol
self.solution[-1] = [
overshootable_node.t,
*overshootable_node.y_sol,
]
# Prepare to leave loops and start new flight phase
overshootable_nodes.flush_after(
overshootable_index
)
phase.time_nodes.flush_after(node_index)
phase.time_nodes.add_node(self.t, [], [], [])
phase.solver.status = "finished"
# Save parachute event
self.parachute_events.append(
[self.t, parachute]
)

Comment on lines +829 to +923
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code block appears to duplicate the logic already implemented in the __process_overshootable_nodes method (called on line 925). The entire section from lines 805-923 should be removed as it redundantly handles overshootable time nodes for parachute triggers. The existing __process_overshootable_nodes method already:

  1. Initializes overshootable_nodes
  2. Adds parachutes, controllers, and sensors
  3. Checks for parachute triggers via __check_overshootable_parachute_triggers
  4. Handles the rollback and phase management

This duplication creates maintainability issues and could lead to inconsistent behavior if one implementation is updated without the other.

Suggested change
# Calculate state at node time
overshootable_node.y_sol = interpolator(
overshootable_node.t
)
for parachute in overshootable_node.parachutes:
# Calculate and save pressure signal
(
noisy_pressure,
height_above_ground_level,
) = self.__calculate_and_save_pressure_signals(
parachute,
overshootable_node.t,
overshootable_node.y_sol[2],
)
# Check for parachute trigger
if parachute.triggerfunc(
noisy_pressure,
height_above_ground_level,
overshootable_node.y_sol,
self.sensors,
overshootable_node.t,
self.rocket,
):
# Remove parachute from flight parachutes
self.parachutes.remove(parachute)
# Create phase for time after detection and
# before inflation
# Must only be created if parachute has any lag
i = 1
if parachute.lag != 0:
self.flight_phases.add_phase(
overshootable_node.t,
phase.derivative,
clear=True,
index=phase_index + i,
)
i += 1
# Create flight phase for time after inflation
callbacks = [
lambda self,
parachute_cd_s=parachute.cd_s: setattr(
self, "parachute_cd_s", parachute_cd_s
),
lambda self,
parachute_radius=parachute.radius: setattr(
self,
"parachute_radius",
parachute_radius,
),
lambda self,
parachute_height=parachute.height: setattr(
self,
"parachute_height",
parachute_height,
),
lambda self,
parachute_porosity=parachute.porosity: setattr(
self,
"parachute_porosity",
parachute_porosity,
),
lambda self,
added_mass_coefficient=parachute.added_mass_coefficient: setattr(
self,
"parachute_added_mass_coefficient",
added_mass_coefficient,
),
]
self.flight_phases.add_phase(
overshootable_node.t + parachute.lag,
self.u_dot_parachute,
callbacks,
clear=False,
index=phase_index + i,
)
# Rollback history
self.t = overshootable_node.t
self.y_sol = overshootable_node.y_sol
self.solution[-1] = [
overshootable_node.t,
*overshootable_node.y_sol,
]
# Prepare to leave loops and start new flight phase
overshootable_nodes.flush_after(
overshootable_index
)
phase.time_nodes.flush_after(node_index)
phase.time_nodes.add_node(self.t, [], [], [])
phase.solver.status = "finished"
# Save parachute event
self.parachute_events.append(
[self.t, parachute]
)
# Legacy inlined overshootable-node processing has
# been consolidated into `__process_overshootable_nodes`
# to avoid duplicated logic and inconsistent behavior.
# This loop is intentionally left as a no-op.
pass

Copilot uses AI. Check for mistakes.
# Process overshootable time nodes if enabled
if self.time_overshoot and self.__process_overshootable_nodes(
phase, phase_index, node_index
Expand Down Expand Up @@ -946,6 +1067,8 @@ def __check_and_handle_parachute_triggers(
height_above_ground_level,
self.y_sol,
self.sensors,
node.t,
self.rocket,
):
continue # Check next parachute

Expand Down Expand Up @@ -1355,6 +1478,8 @@ def __check_overshootable_parachute_triggers(
height_above_ground_level,
overshootable_node.y_sol,
self.sensors,
overshootable_node.t,
self.rocket,
):
continue # Check next parachute

Expand Down
Loading
Loading