Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions fspin/rate_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,23 +456,53 @@ def start_spinning(self, func, condition_fn, *args, **kwargs):
raise TypeError("Expected a regular function for sync mode.")
return self.start_spinning_sync(func, condition_fn, *args, **kwargs)

def stop_spinning(self):
def _close_own_loop(self):
if self._own_loop is not None:
self._own_loop.close()
self._own_loop = None

def _stop_thread(self):
if self._thread:
# Avoid deadlock if stop_spinning is called from within the worker thread
current = threading.current_thread()
if self._thread.is_alive() and current is not self._thread:
self._thread.join()

async def _cancel_task(self):
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass

async def stop_spinning_async(self):
"""
Signals the spinning loop to stop.
Signals the spinning loop to stop and awaits async cleanup.
"""
self._stop_event.set()
if self.is_coroutine:
if self._task:
self._task.cancel()
await self._cancel_task()
else:
if self._thread:
# Avoid deadlock if stop_spinning is called from within the worker thread
current = threading.current_thread()
if self._thread.is_alive() and current is not self._thread:
self._thread.join()
if self._own_loop is not None:
self._own_loop.close()
self._own_loop = None
self._stop_thread()
self._close_own_loop()

def stop_spinning(self):
"""
Signals the spinning loop to stop.
"""
if self.is_coroutine:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
asyncio.run(self.stop_spinning_async())
else:
Comment on lines +496 to +499

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Stop async spins on wrong loop

stop_spinning now runs stop_spinning_async() via asyncio.run when no loop is running (lines 496-499), but stop_spinning_async awaits self._task, which remains bound to the loop that created it. When the spin is running on an owned/background loop, calling stop_spinning() from sync code now raises RuntimeError: ... attached to a different loop and leaves the task running. The cleanup coroutine needs to be scheduled on the task’s loop (e.g., via self._own_loop/run_coroutine_threadsafe) instead of a fresh loop.

Useful? React with 👍 / 👎.

loop.create_task(self.stop_spinning_async())
return

self._stop_event.set()
self._stop_thread()
self._close_own_loop()

def get_report(self, output=True):
"""
Expand Down
2 changes: 1 addition & 1 deletion fspin/spin_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ async def __aenter__(self):
return self.rc

async def __aexit__(self, exc_type, exc_val, exc_tb):
self.rc.stop_spinning()
await self.rc.stop_spinning_async()
22 changes: 20 additions & 2 deletions tests/test_ratecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from fspin.reporting import ReportLogger
from fspin.rate_control import RateControl
from fspin.decorators import spin
from fspin.spin_context import spin as spin_context
from fspin.loop_context import loop

def test_create_histogram():
Expand Down Expand Up @@ -170,8 +171,7 @@ async def awork():
await asyncio.sleep(0.05)

# Stop the task and wait for it to be cancelled
rc.stop_spinning()
await asyncio.sleep(0.05)
await rc.stop_spinning_async()

# Check if the task is done (it might be cancelled or completed)
assert rc._task.done(), "Task is not done after stop_spinning"
Expand All @@ -188,6 +188,24 @@ async def awork():
pass


@pytest.mark.asyncio
async def test_async_context_manager_waits_for_cleanup():
cleanup_called = asyncio.Event()

async def awork():
try:
while True:
await asyncio.sleep(0.01)
finally:
cleanup_called.set()

async with spin_context(awork, freq=100) as rc:
await asyncio.sleep(0.03)

await asyncio.wait_for(cleanup_called.wait(), timeout=0.5)
assert rc._task.done()


@pytest.mark.asyncio
async def test_spin_async_exception_handling(caplog):
async def awork():
Expand Down
Loading