"""Thread module emulating a subset of Java's threading model."""
del _sys.modules[__name__]
from collections import deque as _deque
from itertools import count as _count
from time import time as _time, sleep as _sleep
from traceback import format_exc as _format_exc
# Note regarding PEP 8 compliant aliases
# This threading model was originally inspired by Java, and inherited
# the convention of camelCase function and method names from that
# language. While those names are not in any imminent danger of being
# deprecated, starting with Python 2.6, the module now provides a
# PEP 8 compliant alias for any such method name.
# Using the new PEP 8 compliant names also facilitates substitution
# with the multiprocessing module, which doesn't provide the old
# Rename some stuff so "from threading import *" is safe
__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
'current_thread', 'enumerate', 'Event',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
_start_new_thread = thread.start_new_thread
_allocate_lock = thread.allocate_lock
_get_ident = thread.get_ident
ThreadError = thread.error
# sys.exc_clear is used to work around the fact that except blocks
# don't fully clear the exception until 3.0.
warnings.filterwarnings('ignore', category=DeprecationWarning,
module='threading', message='sys.exc_clear')
# Debug support (adapted from ihooks.py).
# All the major classes here derive from _Verbose. We force that to
# be a new-style class so that all the major classes here are new-style.
# This helps debugging (type(instance) is more revealing for instances
def __init__(self, verbose=None):
def _note(self, format, *args):
# Issue #4188: calling current_thread() can incur an infinite
# recursion if it has to create a DummyThread on the fly.
name = _active[ident].name
name = "<OS thread %d>" % ident
format = "%s: %s\n" % (name, format)
_sys.stderr.write(format)
# Disable this when using "python -O"
def __init__(self, verbose=None):
# Support for profile and trace hooks
"""Set a profile function for all threads started from the threading module.
The func will be passed to sys.setprofile() for each thread, before its
"""Set a trace function for all threads started from the threading module.
The func will be passed to sys.settrace() for each thread, before its run()
# Synchronization classes
def RLock(*args, **kwargs):
"""Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it again
without blocking; the thread must release it once for each time it has
return _RLock(*args, **kwargs)
"""A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it
again without blocking; the thread must release it once for each time it
def __init__(self, verbose=None):
_Verbose.__init__(self, verbose)
self.__block = _allocate_lock()
owner = _active[owner].name
return "<%s owner=%r count=%d>" % (
self.__class__.__name__, owner, self.__count)
def acquire(self, blocking=1):
"""Acquire a lock, blocking or non-blocking.
When invoked without arguments: if this thread already owns the lock,
increment the recursion level by one, and return immediately. Otherwise,
if another thread owns the lock, block until the lock is unlocked. Once
the lock is unlocked (not owned by any thread), then grab ownership, set
the recursion level to one, and return. If more than one thread is
blocked waiting until the lock is unlocked, only one at a time will be
able to grab ownership of the lock. There is no return value in this
When invoked with the blocking argument set to true, do the same thing
as when called without arguments, and return true.
When invoked with the blocking argument set to false, do not block. If a
call without an argument would block, return false immediately;
otherwise, do the same thing as when called without arguments, and
self.__count = self.__count + 1
self._note("%s.acquire(%s): recursive success", self, blocking)
rc = self.__block.acquire(blocking)
self._note("%s.acquire(%s): initial success", self, blocking)
self._note("%s.acquire(%s): failure", self, blocking)
"""Release a lock, decrementing the recursion level.
If after the decrement it is zero, reset the lock to unlocked (not owned
by any thread), and if any other threads are blocked waiting for the
lock to become unlocked, allow exactly one of them to proceed. If after
the decrement the recursion level is still nonzero, the lock remains
locked and owned by the calling thread.
Only call this method when the calling thread owns the lock. A
RuntimeError is raised if this method is called when the lock is
There is no return value.
if self.__owner != _get_ident():
raise RuntimeError("cannot release un-acquired lock")
self.__count = count = self.__count - 1
self._note("%s.release(): final release", self)
self._note("%s.release(): non-final release", self)
def __exit__(self, t, v, tb):
# Internal methods used by condition variables
def _acquire_restore(self, count_owner):
count, owner = count_owner
self._note("%s._acquire_restore()", self)
self._note("%s._release_save()", self)
return self.__owner == _get_ident()
def Condition(*args, **kwargs):
"""Factory function that returns a new condition variable object.
A condition variable allows one or more threads to wait until they are
notified by another thread.
If the lock argument is given and not None, it must be a Lock or RLock
object, and it is used as the underlying lock. Otherwise, a new RLock object
is created and used as the underlying lock.
return _Condition(*args, **kwargs)
class _Condition(_Verbose):
"""Condition variables allow one or more threads to wait until they are
notified by another thread.
def __init__(self, lock=None, verbose=None):
_Verbose.__init__(self, verbose)
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
self._release_save = lock._release_save
self._acquire_restore = lock._acquire_restore
self._is_owned = lock._is_owned
return self.__lock.__enter__()
def __exit__(self, *args):
return self.__lock.__exit__(*args)
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
self.__lock.release() # No state to save
def _acquire_restore(self, x):
self.__lock.acquire() # Ignore saved state
# Return True if lock is owned by current_thread.
# This method is called only if __lock doesn't have _is_owned().
if self.__lock.acquire(0):
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notifyAll() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
self.__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
self._note("%s.wait(): got it", self)
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
gotit = waiter.acquire(0)
remaining = endtime - _time()
delay = min(delay * 2, remaining, .05)
self._note("%s.wait(%s): timed out", self, timeout)
self.__waiters.remove(waiter)
self._note("%s.wait(%s): got it", self, timeout)
self._acquire_restore(saved_state)
"""Wake up one or more threads waiting on this condition, if any.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method wakes up at most n of the threads waiting for the condition
variable; it is a no-op if no threads are waiting.
raise RuntimeError("cannot notify on un-acquired lock")
__waiters = self.__waiters
self._note("%s.notify(): no waiters", self)
self._note("%s.notify(): notifying %d waiter%s", self, n,
"""Wake up all threads waiting on this condition.
If the calling thread has not acquired the lock when this method
is called, a RuntimeError is raised.
self.notify(len(self.__waiters))
def Semaphore(*args, **kwargs):
"""A factory function that returns a new semaphore.
Semaphores manage a counter representing the number of release() calls minus
the number of acquire() calls, plus an initial value. The acquire() method
blocks if necessary until it can return without making the counter
negative. If not given, value defaults to 1.
return _Semaphore(*args, **kwargs)
class _Semaphore(_Verbose):
"""Semaphores manage a counter representing the number of release() calls
minus the number of acquire() calls, plus an initial value. The acquire()
method blocks if necessary until it can return without making the counter
negative. If not given, value defaults to 1.
# After Tim Peters' semaphore class, but not quite the same (no maximum)
def __init__(self, value=1, verbose=None):
raise ValueError("semaphore initial value must be >= 0")
_Verbose.__init__(self, verbose)
self.__cond = Condition(Lock())
def acquire(self, blocking=1):
"""Acquire a semaphore, decrementing the internal counter by one.
When invoked without arguments: if the internal counter is larger than
zero on entry, decrement it by one and return immediately. If it is zero
on entry, block, waiting until some other thread has called release() to
make it larger than zero. This is done with proper interlocking so that
if multiple acquire() calls are blocked, release() will wake exactly one
of them up. The implementation may pick one at random, so the order in
which blocked threads are awakened should not be relied on. There is no
return value in this case.
When invoked with blocking set to true, do the same thing as when called
without arguments, and return true.
When invoked with blocking set to false, do not block. If a call without
an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true.
self._note("%s.acquire(%s): blocked waiting, value=%s",
self, blocking, self.__value)
self.__value = self.__value - 1
self._note("%s.acquire: success, value=%s",
"""Release a semaphore, incrementing the internal counter by one.
When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread.
self.__value = self.__value + 1
self._note("%s.release: success, value=%s",
def __exit__(self, t, v, tb):
def BoundedSemaphore(*args, **kwargs):
"""A factory function that returns a new bounded semaphore.
A bounded semaphore checks to make sure its current value doesn't exceed its