Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-32309: Implement asyncio.to_thread() #20143

Merged
merged 9 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Doc/library/asyncio-api-index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ await on multiple things with timeouts.
* - :class:`Task`
- Task object.

* - :func:`to_thread`
- Asychronously run a function in a separate OS thread.

* - :func:`run_coroutine_threadsafe`
- Schedule a coroutine from another OS thread.

Expand Down
56 changes: 56 additions & 0 deletions Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,62 @@ Waiting Primitives
# ...


Running in Threads
==================

.. coroutinefunction:: to_thread(func, /, \*args, \*\*kwargs)
aeros marked this conversation as resolved.
Show resolved Hide resolved

Asynchronously run function *func* in a separate thread.
aeros marked this conversation as resolved.
Show resolved Hide resolved

Any \*args and \*\*kwargs supplied for this function are directly passed
to *func*.

Return an :class:`asyncio.Future` which represents the eventual result of
*func*.

This coroutine function is primarily intended to be used for executing
IO-bound functions/methods that would otherwise block the event loop if
they were ran in the main thread. For example::

def blocking_io():
print(f"start blocking_io at {time.strftime('%X')}")
# Note that time.sleep() can be replaced with any blocking
# IO-bound operation, such as file operations.
time.sleep(1)
print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
print(f"started main at {time.strftime('%X')}")

await asyncio.gather(
asyncio.to_thread(blocking_io),
asyncio.sleep(1))

print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

Directly calling `blocking_io()` in any coroutine would block the event loop
for its duration, resulting in an additional 1 second of run time. Instead,
by using `asyncio.to_thread()`, we can run it in a separate thread without
blocking the event loop.

.. note::

Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used
to make IO-bound functions non-blocking. However, for extension modules
that release the GIL or alternative Python implementations that don't
have one, `asyncio.to_thread()` can also be used for CPU-bound functions.
aeros marked this conversation as resolved.
Show resolved Hide resolved


Scheduling From Other Threads
=============================

Expand Down
6 changes: 6 additions & 0 deletions Doc/whatsnew/3.9.rst
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ that schedules a shutdown for the default executor that waits on the
Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
implementation that polls process file descriptors. (:issue:`38692`)

Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
running IO-bound functions in a separate thread to avoid blocking the event
loop, and essentially works as a high-level version of
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.
(Contributed by Kyle Stanley and Yury Selivanov in :issue:`32309`.)

compileall
----------

Expand Down
2 changes: 2 additions & 0 deletions Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .streams import *
from .subprocess import *
from .tasks import *
from .threads import *
from .transports import *

# Exposed for _asynciomodule.c to implement now deprecated
Expand All @@ -35,6 +36,7 @@
streams.__all__ +
subprocess.__all__ +
tasks.__all__ +
threads.__all__ +
transports.__all__)

if sys.platform == 'win32': # pragma: no cover
Expand Down
21 changes: 21 additions & 0 deletions Lib/asyncio/threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""High-level support for working with threads in asyncio"""

import functools

from . import events


__all__ = "to_thread",


async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.

Any *args and **kwargs supplied for this function are directly passed
to *func*.

Return an asyncio.Future which represents the eventual result of *func*.
"""
loop = events.get_running_loop()
func_call = functools.partial(func, *args, **kwargs)
aeros marked this conversation as resolved.
Show resolved Hide resolved
return await loop.run_in_executor(None, func_call)
79 changes: 79 additions & 0 deletions Lib/test/test_asyncio/test_threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Tests for asyncio/threads.py"""

import asyncio
import unittest

from unittest import mock
from test.test_asyncio import utils as test_utils


def tearDownModule():
asyncio.set_event_loop_policy(None)


class ToThreadTests(test_utils.TestCase):
def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def tearDown(self):
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
self.loop.close()
asyncio.set_event_loop(None)
self.loop = None
super().tearDown()

def test_to_thread(self):
async def main():
return await asyncio.to_thread(sum, [40, 2])

result = self.loop.run_until_complete(main())
self.assertEqual(result, 42)

def test_to_thread_exception(self):
def raise_runtime():
raise RuntimeError("test")

async def main():
await asyncio.to_thread(raise_runtime)

with self.assertRaisesRegex(RuntimeError, "test"):
self.loop.run_until_complete(main())

def test_to_thread_once(self):
func = mock.Mock()

async def main():
await asyncio.to_thread(func)

self.loop.run_until_complete(main())
func.assert_called_once()

def test_to_thread_concurrent(self):
func = mock.Mock()

async def main():
futs = []
for _ in range(10):
fut = asyncio.to_thread(func)
futs.append(fut)
await asyncio.gather(*futs)

self.loop.run_until_complete(main())
self.assertEqual(func.call_count, 10)

def test_to_thread_args_kwargs(self):
# Unlike run_in_executor(), to_thread() should directly accept kwargs.
func = mock.Mock()

async def main():
await asyncio.to_thread(func, 'test', something=True)

self.loop.run_until_complete(main())
func.assert_called_once_with('test', something=True)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
running IO-bound functions in a separate thread to avoid blocking the event
loop, and essentially works as a high-level version of
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.