Skip to content

Commit

Permalink
Fix “an interrupted (Cancelled) coroutine doesn't cleanup” (Bogdanp#544)
Browse files Browse the repository at this point in the history
* Test showing issue

* Fix it

* Reraise Interrupts

* Revert "Reraise Interrupts"

This reverts commit 94a63de.

* Revert "Fix it"

This reverts commit ab716c5.

* Use a threading.Event

* Skip test for gevent or non CPython
  • Loading branch information
caspervdw authored Aug 27, 2023
1 parent 0f50659 commit 9b914c8
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
36 changes: 24 additions & 12 deletions dramatiq/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,28 @@ def run_coroutine(self, coro: Awaitable[R]) -> R:
if not self.loop.is_running():
raise RuntimeError("Event loop is not running.")

future = asyncio.run_coroutine_threadsafe(coro, self.loop)
while True:
done = threading.Event()

async def wrapped_coro() -> R:
try:
# Use a timeout to be able to catch asynchronously
# raised dramatiq exceptions (Interrupt).
return future.result(timeout=self.interrupt_check_ival)
except Interrupt:
# Asynchronously raised from another thread: cancel
# the future and reiterate to wait for possible
# cleanup actions.
self.loop.call_soon_threadsafe(future.cancel)
except concurrent.futures.TimeoutError:
continue
return await coro
finally:
done.set()

future = asyncio.run_coroutine_threadsafe(wrapped_coro(), self.loop)
try:
while True:
try:
# Use a timeout to be able to catch asynchronously
# raised dramatiq exceptions (Interrupt).
return future.result(timeout=self.interrupt_check_ival)
except concurrent.futures.TimeoutError:
continue
except Interrupt:
# Asynchronously raised from another thread: cancel the future.
self.loop.call_soon_threadsafe(future.cancel)
# Wait for the 'done' event instead of the future; the future will
# raise CancelledError immediately while we should wait for the coro
# to actually finish its cleanup actions.
done.wait()
raise
49 changes: 43 additions & 6 deletions tests/middleware/test_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import asyncio
import threading
from threading import get_ident
from unittest import mock

from dramatiq import threading
import pytest

from dramatiq.asyncio import EventLoopThread, async_to_sync, get_event_loop_thread, set_event_loop_thread
from dramatiq.asyncio import (
EventLoopThread,
async_to_sync,
get_event_loop_thread,
set_event_loop_thread,
)
from dramatiq.logging import get_logger
from dramatiq.middleware.asyncio import AsyncIO

Expand Down Expand Up @@ -43,7 +48,7 @@ def test_event_loop_thread_run_coroutine(started_thread: EventLoopThread):
result = {}

async def get_thread_id():
return threading.get_ident()
return get_ident()

result = started_thread.run_coroutine(get_thread_id())

Expand All @@ -52,14 +57,46 @@ async def get_thread_id():


def test_event_loop_thread_run_coroutine_exception(started_thread: EventLoopThread):
async def raise_error():
async def raise_actual_error():
raise TypeError("bla")

async def raise_error():
await raise_actual_error()

coro = raise_error()

with pytest.raises(TypeError, match="bla"):
with pytest.raises(TypeError, match="bla") as e:
started_thread.run_coroutine(coro)

# the error has the correct traceback
assert e.traceback[-2].name == "raise_error"
assert e.traceback[-1].name == "raise_actual_error"


@pytest.mark.skipif(
threading.current_platform not in threading.supported_platforms,
reason="Threading not supported on this platform.",
)
@pytest.mark.skipif(
threading.is_gevent_active(), reason="Thread exceptions not supported with gevent."
)
def test_event_loop_thread_run_coroutine_interrupted(started_thread: EventLoopThread):
side_effect_target = {"cleanup": False}

async def sleep_interrupt(worker_thread_id: int):
threading.raise_thread_exception(worker_thread_id, threading.Interrupt)
try:
for _ in range(100):
await asyncio.sleep(0.01)
finally:
await asyncio.sleep(0.01)
side_effect_target["cleanup"] = True

with pytest.raises(threading.Interrupt):
started_thread.run_coroutine(sleep_interrupt(get_ident()))

assert side_effect_target["cleanup"]


@mock.patch("dramatiq.middleware.asyncio.EventLoopThread")
def test_async_middleware_before_worker_boot(EventLoopThreadMock):
Expand Down

0 comments on commit 9b914c8

Please sign in to comment.