"""Thread module emulating a subset of Java's threading model."""
from time import monotonic as _time, sleep as _sleep
from traceback import format_exc as _format_exc
from _weakrefset import WeakSet
from itertools import islice as _islice, count as _count
from _collections import deque as _deque
from collections import deque as _deque
# Note regarding PEP 8 compliant names
# This threading model was originally inspired by Java, and inherited
# the convention of camelCase function and method names from that
# language. Those original names are not in any imminent danger of
# being deprecated (even for Py3k),so this module provides them as an
# alias for the PEP 8 compliant names
# Note that using the new PEP 8 compliant names facilitates substitution
# with the multiprocessing module, which doesn't provide the old
__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
'enumerate', 'main_thread', 'TIMEOUT_MAX',
'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
'setprofile', 'settrace', 'local', 'stack_size']
# Rename some stuff so "from threading import *" is safe
_start_new_thread = _thread.start_new_thread
_allocate_lock = _thread.allocate_lock
_set_sentinel = _thread._set_sentinel
get_ident = _thread.get_ident
ThreadError = _thread.error
TIMEOUT_MAX = _thread.TIMEOUT_MAX
# Support for profile and trace hooks
"""Set a profile function for all threads started from the threading module.
The func will be passed to sys.setprofile() for each thread, before its
"""Set a trace function for all threads started from the threading module.
The func will be passed to sys.settrace() for each thread, before its run()
# Synchronization classes
def RLock(*args, **kwargs):
"""Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it again
without blocking; the thread must release it once for each time it has
return _PyRLock(*args, **kwargs)
return _CRLock(*args, **kwargs)
"""This class implements reentrant lock objects.
A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it
again without blocking; the thread must release it once for each time it
self._block = _allocate_lock()
owner = _active[owner].name
return "<%s %s.%s object owner=%r count=%d at %s>" % (
"locked" if self._block.locked() else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
def acquire(self, blocking=True, timeout=-1):
"""Acquire a lock, blocking or non-blocking.
When invoked without arguments: if this thread already owns the lock,
increment the recursion level by one, and return immediately. Otherwise,
if another thread owns the lock, block until the lock is unlocked. Once
the lock is unlocked (not owned by any thread), then grab ownership, set
the recursion level to one, and return. If more than one thread is
blocked waiting until the lock is unlocked, only one at a time will be
able to grab ownership of the lock. There is no return value in this
When invoked with the blocking argument set to true, do the same thing
as when called without arguments, and return true.
When invoked with the blocking argument set to false, do not block. If a
call without an argument would block, return false immediately;
otherwise, do the same thing as when called without arguments, and
When invoked with the floating-point timeout argument set to a positive
value, block for at most the number of seconds specified by timeout
and as long as the lock cannot be acquired. Return true if the lock has
been acquired, false if the timeout has elapsed.
rc = self._block.acquire(blocking, timeout)
"""Release a lock, decrementing the recursion level.
If after the decrement it is zero, reset the lock to unlocked (not owned
by any thread), and if any other threads are blocked waiting for the
lock to become unlocked, allow exactly one of them to proceed. If after
the decrement the recursion level is still nonzero, the lock remains
locked and owned by the calling thread.
Only call this method when the calling thread owns the lock. A
RuntimeError is raised if this method is called when the lock is
There is no return value.
if self._owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1
def __exit__(self, t, v, tb):
# Internal methods used by condition variables
def _acquire_restore(self, state):
self._count, self._owner = state
raise RuntimeError("cannot release un-acquired lock")
return self._owner == get_ident()
"""Class that implements a condition variable.
A condition variable allows one or more threads to wait until they are
notified by another thread.
If the lock argument is given and not None, it must be a Lock or RLock
object, and it is used as the underlying lock. Otherwise, a new RLock object
is created and used as the underlying lock.
def __init__(self, lock=None):
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
self._release_save = lock._release_save
self._acquire_restore = lock._acquire_restore
self._is_owned = lock._is_owned
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
self._lock.release() # No state to save
def _acquire_restore(self, x):
self._lock.acquire() # Ignore saved state
# Return True if lock is owned by current_thread.
# This method is called only if _lock doesn't have _is_owned().
if self._lock.acquire(0):
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
self._waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
# rhbz#2003758: Avoid waiter.acquire(True, timeout) since
# it uses the system clock internally.
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
gotit = waiter.acquire(0)
remaining = min(endtime - _time(), timeout)
delay = min(delay * 2, remaining, .05)
gotit = waiter.acquire(False)
self._acquire_restore(saved_state)
self._waiters.remove(waiter)
def wait_for(self, predicate, timeout=None):
"""Wait until a condition evaluates to True.
predicate should be a callable which result will be interpreted as a
boolean value. A timeout may be provided giving the maximum time to
endtime = _time() + waittime
waittime = endtime - _time()
"""Wake up one or more threads waiting on this condition, if any.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method wakes up at most n of the threads waiting for the condition
variable; it is a no-op if no threads are waiting.
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
for waiter in waiters_to_notify:
all_waiters.remove(waiter)
"""Wake up all threads waiting on this condition.
If the calling thread has not acquired the lock when this method
is called, a RuntimeError is raised.
self.notify(len(self._waiters))
"""This class implements semaphore objects.
Semaphores manage a counter representing the number of release() calls minus
the number of acquire() calls, plus an initial value. The acquire() method
blocks if necessary until it can return without making the counter
negative. If not given, value defaults to 1.
# After Tim Peters' semaphore class, but not quite the same (no maximum)
def __init__(self, value=1):
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
def acquire(self, blocking=True, timeout=None):
"""Acquire a semaphore, decrementing the internal counter by one.
When invoked without arguments: if the internal counter is larger than
zero on entry, decrement it by one and return immediately. If it is zero
on entry, block, waiting until some other thread has called release() to
make it larger than zero. This is done with proper interlocking so that
if multiple acquire() calls are blocked, release() will wake exactly one
of them up. The implementation may pick one at random, so the order in
which blocked threads are awakened should not be relied on. There is no
return value in this case.
When invoked with blocking set to true, do the same thing as when called
without arguments, and return true.
When invoked with blocking set to false, do not block. If a call without
an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true.
When invoked with a timeout other than None, it will block for at
most timeout seconds. If acquire does not complete successfully in
that interval, return false. Return true otherwise.
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
endtime = _time() + timeout
timeout = endtime - _time()
"""Release a semaphore, incrementing the internal counter by one.
When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread.
def __exit__(self, t, v, tb):
class BoundedSemaphore(Semaphore):
"""Implements a bounded semaphore.
A bounded semaphore checks to make sure its current value doesn't exceed its
initial value. If it does, ValueError is raised. In most situations
semaphores are used to guard resources with limited capacity.
If the semaphore is released too many times it's a sign of a bug. If not
given, value defaults to 1.
Like regular semaphores, bounded semaphores manage a counter representing
the number of release() calls minus the number of acquire() calls, plus an
initial value. The acquire() method blocks if necessary until it can return
without making the counter negative. If not given, value defaults to 1.
def __init__(self, value=1):
Semaphore.__init__(self, value)
self._initial_value = value
"""Release a semaphore, incrementing the internal counter by one.
When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread.
If the number of releases exceeds the number of acquires,
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")