Skip to content

Commit

Permalink
bpo-39812: Remove daemon threads in concurrent.futures (pythonGH-19149)
Browse files Browse the repository at this point in the history
Remove daemon threads from :mod:`concurrent.futures` by adding
an internal `threading._register_atexit()`, which calls registered functions
prior to joining all non-daemon threads. This allows for compatibility
with subinterpreters, which don't support daemon threads.
  • Loading branch information
aeros authored Mar 27, 2020
1 parent 5f9c131 commit b61b818
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 32 deletions.
5 changes: 5 additions & 0 deletions Doc/whatsnew/3.9.rst
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ which have not started running, instead of waiting for them to complete before
shutting down the executor.
(Contributed by Kyle Stanley in :issue:`39349`.)

Removed daemon threads from :class:`~concurrent.futures.ThreadPoolExecutor`
and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves
compatibility with subinterpreters and predictability in their shutdown
processes. (Contributed by Kyle Stanley in :issue:`39812`.)

curses
------

Expand Down
23 changes: 6 additions & 17 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,6 @@
import sys
import traceback

# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
# allowing workers to die with the interpreter has two undesirable properties:
# - The workers would still be running during interpreter shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.

_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False
Expand Down Expand Up @@ -107,6 +94,12 @@ def _python_exit():
for t, _ in items:
t.join()

# Register for `_python_exit()` to be called just before joining all
# non-daemon threads. This is used instead of `atexit.register()` for
# compatibility with subinterpreters, which no longer support daemon threads.
# See bpo-39812 for context.
threading._register_atexit(_python_exit)

# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
Expand Down Expand Up @@ -306,9 +299,7 @@ def weakref_cb(_, thread_wakeup=self.thread_wakeup):
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items

# Set this thread to be daemonized
super().__init__()
self.daemon = True

def run(self):
# Main loop for the executor manager thread.
Expand Down Expand Up @@ -732,5 +723,3 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self._executor_manager_thread_wakeup = None

shutdown.__doc__ = _base.Executor.shutdown.__doc__

atexit.register(_python_exit)
20 changes: 5 additions & 15 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,6 @@
import weakref
import os

# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpreter shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.

_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
Expand All @@ -43,7 +30,11 @@ def _python_exit():
for t, q in items:
t.join()

atexit.register(_python_exit)
# Register for `_python_exit()` to be called just before joining all
# non-daemon threads. This is used instead of `atexit.register()` for
# compatibility with subinterpreters, which no longer support daemon threads.
# See bpo-39812 for context.
threading._register_atexit(_python_exit)


class _WorkItem(object):
Expand Down Expand Up @@ -197,7 +188,6 @@ def weakref_cb(_, q=self._work_queue):
self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
Expand Down
50 changes: 50 additions & 0 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -1397,5 +1397,55 @@ def test_interrupt_main_noerror(self):
signal.signal(signal.SIGINT, handler)


class AtexitTests(unittest.TestCase):

def test_atexit_output(self):
rc, out, err = assert_python_ok("-c", """if True:
import threading
def run_last():
print('parrot')
threading._register_atexit(run_last)
""")

self.assertFalse(err)
self.assertEqual(out.strip(), b'parrot')

def test_atexit_called_once(self):
rc, out, err = assert_python_ok("-c", """if True:
import threading
from unittest.mock import Mock
mock = Mock()
threading._register_atexit(mock)
mock.assert_not_called()
# force early shutdown to ensure it was called once
threading._shutdown()
mock.assert_called_once()
""")

self.assertFalse(err)

def test_atexit_after_shutdown(self):
# The only way to do this is by registering an atexit within
# an atexit, which is intended to raise an exception.
rc, out, err = assert_python_ok("-c", """if True:
import threading
def func():
pass
def run_last():
threading._register_atexit(func)
threading._register_atexit(run_last)
""")

self.assertTrue(err)
self.assertIn("RuntimeError: can't register atexit after shutdown",
err.decode())


if __name__ == "__main__":
unittest.main()
29 changes: 29 additions & 0 deletions Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os as _os
import sys as _sys
import _thread
import functools

from time import monotonic as _time
from _weakrefset import WeakSet
Expand Down Expand Up @@ -1346,6 +1347,27 @@ def enumerate():
with _active_limbo_lock:
return list(_active.values()) + list(_limbo.values())


_threading_atexits = []
_SHUTTING_DOWN = False

def _register_atexit(func, *arg, **kwargs):
"""CPython internal: register *func* to be called before joining threads.
The registered *func* is called with its arguments just before all
non-daemon threads are joined in `_shutdown()`. It provides a similar
purpose to `atexit.register()`, but its functions are called prior to
threading shutdown instead of interpreter shutdown.
For similarity to atexit, the registered functions are called in reverse.
"""
if _SHUTTING_DOWN:
raise RuntimeError("can't register atexit after shutdown")

call = functools.partial(func, *arg, **kwargs)
_threading_atexits.append(call)


from _thread import stack_size

# Create the main thread object,
Expand All @@ -1367,6 +1389,8 @@ def _shutdown():
# _shutdown() was already called
return

global _SHUTTING_DOWN
_SHUTTING_DOWN = True
# Main thread
tlock = _main_thread._tstate_lock
# The main thread isn't finished yet, so its thread state lock can't have
Expand All @@ -1376,6 +1400,11 @@ def _shutdown():
tlock.release()
_main_thread._stop()

# Call registered threading atexit functions before threads are joined.
# Order is reversed, similar to atexit.
for atexit_call in reversed(_threading_atexits):
atexit_call()

# Join all non-deamon threads
while True:
with _shutdown_locks_lock:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Removed daemon threads from :mod:`concurrent.futures` by adding
an internal `threading._register_atexit()`, which calls registered functions
prior to joining all non-daemon threads. This allows for compatibility
with subinterpreters, which don't support daemon threads.

0 comments on commit b61b818

Please sign in to comment.