Skip to content

Commit

Permalink
Issue python#7316: the acquire() method of lock objects in the :mod:`…
Browse files Browse the repository at this point in the history
…threading`

module now takes an optional timeout argument in seconds.  Timeout support
relies on the system threading library, so as to avoid a semi-busy wait
loop.
  • Loading branch information
pitrou committed Apr 14, 2010
1 parent e53de3d commit 7c3e577
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 77 deletions.
31 changes: 23 additions & 8 deletions Doc/library/_thread.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ implementation. For systems lacking the :mod:`_thread` module, the
:mod:`_dummy_thread` module is available. It duplicates this module's interface
and can be used as a drop-in replacement.

It defines the following constant and functions:
It defines the following constants and functions:


.. exception:: error
Expand Down Expand Up @@ -103,19 +103,34 @@ It defines the following constant and functions:
Availability: Windows, systems with POSIX threads.


.. data:: TIMEOUT_MAX

The maximum value allowed for the *timeout* parameter of
:meth:`Lock.acquire`. Specifiying a timeout greater than this value will
raise an :exc:`OverflowError`.


Lock objects have the following methods:


.. method:: lock.acquire([waitflag])
.. method:: lock.acquire(waitflag=1, timeout=-1)

Without the optional argument, this method acquires the lock unconditionally, if
Without any optional argument, this method acquires the lock unconditionally, if
necessary waiting until it is released by another thread (only one thread at a
time can acquire a lock --- that's their reason for existence). If the integer
*waitflag* argument is present, the action depends on its value: if it is zero,
the lock is only acquired if it can be acquired immediately without waiting,
while if it is nonzero, the lock is acquired unconditionally as before. The
return value is ``True`` if the lock is acquired successfully, ``False`` if not.
time can acquire a lock --- that's their reason for existence).

If the integer *waitflag* argument is present, the action depends on its
value: if it is zero, the lock is only acquired if it can be acquired
immediately without waiting, while if it is nonzero, the lock is acquired
unconditionally as above.

If the floating-point *timeout* argument is present and positive, it
specifies the maximum wait time in seconds before returning. A negative
*timeout* argument specifies an unbounded wait. You cannot specify
a *timeout* if *waitflag* is zero.

The return value is ``True`` if the lock is acquired successfully,
``False`` if not.

.. method:: lock.release()

Expand Down
28 changes: 26 additions & 2 deletions Doc/library/threading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ This module defines the following functions and objects:
Availability: Windows, systems with POSIX threads.


This module also defines the following constant:

.. data:: TIMEOUT_MAX

The maximum value allowed for the *timeout* parameter of blocking functions
(:meth:`Lock.acquire`, :meth:`RLock.acquire`, :meth:`Condition.wait`, etc.).
Specifiying a timeout greater than this value will raise an
:exc:`OverflowError`.


Detailed interfaces for the objects are documented below.

The design of this module is loosely based on Java's threading model. However,
Expand Down Expand Up @@ -349,7 +359,7 @@ and may vary across implementations.
All methods are executed atomically.


.. method:: Lock.acquire(blocking=True)
.. method:: Lock.acquire(blocking=True, timeout=-1)

Acquire a lock, blocking or non-blocking.

Expand All @@ -363,6 +373,15 @@ All methods are executed atomically.
without an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true.

When invoked with the floating-point *timeout* argument set to a positive
value, block for at most the number of seconds specified by *timeout*
and as long as the lock cannot be acquired. A negative *timeout* argument
specifies an unbounded wait. It is forbidden to specify a *timeout*
when *blocking* is false.

The return value is ``True`` if the lock is acquired successfully,
``False`` if not (for example if the *timeout* expired).


.. method:: Lock.release()

Expand Down Expand Up @@ -396,7 +415,7 @@ pair) resets the lock to unlocked and allows another thread blocked in
:meth:`acquire` to proceed.


.. method:: RLock.acquire(blocking=True)
.. method:: RLock.acquire(blocking=True, timeout=-1)

Acquire a lock, blocking or non-blocking.

Expand All @@ -415,6 +434,11 @@ pair) resets the lock to unlocked and allows another thread blocked in
without an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true.

When invoked with the floating-point *timeout* argument set to a positive
value, block for at most the number of seconds specified by *timeout*
and as long as the lock cannot be acquired. Return true if the lock has
been acquired, false if the timeout has elapsed.


.. method:: RLock.release()

Expand Down
35 changes: 35 additions & 0 deletions Include/pythread.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,41 @@ PyAPI_FUNC(void) PyThread_free_lock(PyThread_type_lock);
PyAPI_FUNC(int) PyThread_acquire_lock(PyThread_type_lock, int);
#define WAIT_LOCK 1
#define NOWAIT_LOCK 0

/* PY_TIMEOUT_T is the integral type used to specify timeouts when waiting
on a lock (see PyThread_acquire_lock_timed() below).
PY_TIMEOUT_MAX is the highest usable value (in microseconds) of that
type, and depends on the system threading API.
NOTE: this isn't the same value as `_thread.TIMEOUT_MAX`. The _thread
module exposes a higher-level API, with timeouts expressed in seconds
and floating-point numbers allowed.
*/
#if defined(HAVE_LONG_LONG)
#define PY_TIMEOUT_T PY_LONG_LONG
#define PY_TIMEOUT_MAX PY_LLONG_MAX
#else
#define PY_TIMEOUT_T long
#define PY_TIMEOUT_MAX LONG_MAX
#endif

/* In the NT API, the timeout is a DWORD and is expressed in milliseconds */
#if defined (NT_THREADS)
#if (0xFFFFFFFFLL * 1000 < PY_TIMEOUT_MAX)
#undef PY_TIMEOUT_MAX
#define PY_TIMEOUT_MAX (0xFFFFFFFFLL * 1000)
#endif
#endif

/* If microseconds == 0, the call is non-blocking: it returns immediately
even when the lock can't be acquired.
If microseconds > 0, the call waits up to the specified duration.
If microseconds < 0, the call waits until success (or abnormal failure)
microseconds must be less than PY_TIMEOUT_MAX. Behaviour otherwise is
undefined. */
PyAPI_FUNC(int) PyThread_acquire_lock_timed(PyThread_type_lock,
PY_TIMEOUT_T microseconds);
PyAPI_FUNC(void) PyThread_release_lock(PyThread_type_lock);

PyAPI_FUNC(size_t) PyThread_get_stacksize(void);
Expand Down
8 changes: 7 additions & 1 deletion Lib/_dummy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
'interrupt_main', 'LockType']

import traceback as _traceback
import time

# A dummy value
TIMEOUT_MAX = 2**31

class error(Exception):
"""Dummy implementation of _thread.error."""
Expand Down Expand Up @@ -92,7 +96,7 @@ class LockType(object):
def __init__(self):
self.locked_status = False

def acquire(self, waitflag=None):
def acquire(self, waitflag=None, timeout=-1):
"""Dummy implementation of acquire().
For blocking calls, self.locked_status is automatically set to
Expand All @@ -111,6 +115,8 @@ def acquire(self, waitflag=None):
self.locked_status = True
return True
else:
if timeout > 0:
time.sleep(timeout)
return False

__enter__ = acquire
Expand Down
4 changes: 2 additions & 2 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,10 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
p.terminate()

debug('joining task handler')
task_handler.join(1e100)
task_handler.join()

debug('joining result handler')
result_handler.join(1e100)
task_handler.join()

if pool and hasattr(pool[0], 'terminate'):
debug('joining pool workers')
Expand Down
44 changes: 39 additions & 5 deletions Lib/test/lock_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import sys
import time
from _thread import start_new_thread, get_ident
from _thread import start_new_thread, get_ident, TIMEOUT_MAX
import threading
import unittest

Expand Down Expand Up @@ -62,6 +62,14 @@ def tearDown(self):
support.threading_cleanup(*self._threads)
support.reap_children()

def assertTimeout(self, actual, expected):
# The waiting and/or time.time() can be imprecise, which
# is why comparing to the expected value would sometimes fail
# (especially under Windows).
self.assertGreaterEqual(actual, expected * 0.6)
# Test nothing insane happened
self.assertLess(actual, expected * 10.0)


class BaseLockTests(BaseTestCase):
"""
Expand Down Expand Up @@ -143,6 +151,32 @@ def f():
Bunch(f, 15).wait_for_finished()
self.assertEqual(n, len(threading.enumerate()))

def test_timeout(self):
lock = self.locktype()
# Can't set timeout if not blocking
self.assertRaises(ValueError, lock.acquire, 0, 1)
# Invalid timeout values
self.assertRaises(ValueError, lock.acquire, timeout=-100)
self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
# TIMEOUT_MAX is ok
lock.acquire(timeout=TIMEOUT_MAX)
lock.release()
t1 = time.time()
self.assertTrue(lock.acquire(timeout=5))
t2 = time.time()
# Just a sanity test that it didn't actually wait for the timeout.
self.assertLess(t2 - t1, 5)
results = []
def f():
t1 = time.time()
results.append(lock.acquire(timeout=0.5))
t2 = time.time()
results.append(t2 - t1)
Bunch(f, 1).wait_for_finished()
self.assertFalse(results[0])
self.assertTimeout(results[1], 0.5)


class LockTests(BaseLockTests):
"""
Expand Down Expand Up @@ -284,14 +318,14 @@ def test_timeout(self):
def f():
results1.append(evt.wait(0.0))
t1 = time.time()
r = evt.wait(0.2)
r = evt.wait(0.5)
t2 = time.time()
results2.append((r, t2 - t1))
Bunch(f, N).wait_for_finished()
self.assertEqual(results1, [False] * N)
for r, dt in results2:
self.assertFalse(r)
self.assertTrue(dt >= 0.2, dt)
self.assertTimeout(dt, 0.5)
# The event is set
results1 = []
results2 = []
Expand Down Expand Up @@ -397,14 +431,14 @@ def test_timeout(self):
def f():
cond.acquire()
t1 = time.time()
cond.wait(0.2)
cond.wait(0.5)
t2 = time.time()
cond.release()
results.append(t2 - t1)
Bunch(f, N).wait_for_finished()
self.assertEqual(len(results), 5)
for dt in results:
self.assertTrue(dt >= 0.2, dt)
self.assertTimeout(dt, 0.5)


class BaseSemaphoreTests(BaseTestCase):
Expand Down
25 changes: 7 additions & 18 deletions Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
_CRLock = _thread.RLock
except AttributeError:
_CRLock = None
TIMEOUT_MAX = _thread.TIMEOUT_MAX
del _thread


Expand Down Expand Up @@ -107,14 +108,14 @@ def __repr__(self):
return "<%s owner=%r count=%d>" % (
self.__class__.__name__, owner, self._count)

def acquire(self, blocking=True):
def acquire(self, blocking=True, timeout=-1):
me = _get_ident()
if self._owner == me:
self._count = self._count + 1
if __debug__:
self._note("%s.acquire(%s): recursive success", self, blocking)
return 1
rc = self._block.acquire(blocking)
rc = self._block.acquire(blocking, timeout)
if rc:
self._owner = me
self._count = 1
Expand Down Expand Up @@ -234,22 +235,10 @@ def wait(self, timeout=None):
if __debug__:
self._note("%s.wait(): got it", self)
else:
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
if not gotit:
if __debug__:
self._note("%s.wait(%s): timed out", self, timeout)
Expand Down
5 changes: 5 additions & 0 deletions Misc/NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ C-API
Library
-------

- Issue #7316: the acquire() method of lock objects in the :mod:`threading`
module now takes an optional timeout argument in seconds. Timeout support
relies on the system threading library, so as to avoid a semi-busy wait
loop.

- Issue #8383: pickle and pickletools use surrogatepass error handler when
encoding unicode as utf8 to support lone surrogates and stay compatible with
Python 2.x and 3.0
Expand Down
Loading

0 comments on commit 7c3e577

Please sign in to comment.