# A higher level module for using sockets (or Windows named pipes)
# multiprocessing/connection.py
# Copyright (c) 2006-2008, R Oudkerk
# Licensed to PSF under a Contributor Agreement.
__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
from . import AuthenticationError, BufferTooShort
from .context import reduction
_ForkingPickler = reduction.ForkingPickler
from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
if sys.platform == 'win32':
# A very generous timeout when it comes to local connections...
# The hmac module implicitly defaults to using MD5.
# Support using a stronger algorithm for the challenge/response code:
HMAC_DIGEST_NAME='sha256'
_mmap_counter = itertools.count()
default_family = 'AF_INET'
if hasattr(socket, 'AF_UNIX'):
default_family = 'AF_UNIX'
if sys.platform == 'win32':
default_family = 'AF_PIPE'
def _init_timeout(timeout=CONNECTION_TIMEOUT):
return time.monotonic() + timeout
return time.monotonic() > t
def arbitrary_address(family):
Return an arbitrary free address for the given family
elif family == 'AF_UNIX':
return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
(os.getpid(), next(_mmap_counter)), dir="")
raise ValueError('unrecognized family')
def _validate_family(family):
Checks if the family is valid for the current environment.
if sys.platform != 'win32' and family == 'AF_PIPE':
raise ValueError('Family %s is not recognized.' % family)
if sys.platform == 'win32' and family == 'AF_UNIX':
if not hasattr(socket, family):
raise ValueError('Family %s is not recognized.' % family)
def address_type(address):
Return the types of the address
This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
if type(address) == tuple:
elif type(address) is str and address.startswith('\\\\'):
elif type(address) is str or util.is_abstract_socket_namespace(address):
raise ValueError('address type of %r unrecognized' % address)
def __init__(self, handle, readable=True, writable=True):
handle = handle.__index__()
raise ValueError("invalid handle")
if not readable and not writable:
"at least one of `readable` and `writable` must be True")
self._readable = readable
self._writable = writable
# XXX should we use util.Finalize instead of a __del__?
if self._handle is not None:
raise OSError("handle is closed")
def _check_readable(self):
raise OSError("connection is write-only")
def _check_writable(self):
raise OSError("connection is read-only")
def _bad_message_length(self):
raise OSError("bad message length")
"""True if the connection is closed"""
return self._handle is None
"""True if the connection is readable"""
"""True if the connection is writable"""
"""File descriptor or handle of the connection"""
"""Close the connection"""
if self._handle is not None:
def send_bytes(self, buf, offset=0, size=None):
"""Send the bytes data from a bytes-like object"""
# HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
raise ValueError("offset is negative")
raise ValueError("buffer length < offset")
raise ValueError("size is negative")
raise ValueError("buffer length < offset + size")
self._send_bytes(m[offset:offset + size])
"""Send a (picklable) object"""
self._send_bytes(_ForkingPickler.dumps(obj))
def recv_bytes(self, maxlength=None):
Receive bytes data as a bytes object.
if maxlength is not None and maxlength < 0:
raise ValueError("negative maxlength")
buf = self._recv_bytes(maxlength)
self._bad_message_length()
def recv_bytes_into(self, buf, offset=0):
Receive bytes data into a writeable bytes-like object.
Return the number of bytes read.
with memoryview(buf) as m:
# Get bytesize of arbitrary buffer
bytesize = itemsize * len(m)
raise ValueError("negative offset")
raise ValueError("offset too large")
result = self._recv_bytes()
if bytesize < offset + size:
raise BufferTooShort(result.getvalue())
# Message can fit in dest
result.readinto(m[offset // itemsize :
(offset + size) // itemsize])
"""Receive a (picklable) object"""
return _ForkingPickler.loads(buf.getbuffer())
def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
return self._poll(timeout)
def __exit__(self, exc_type, exc_value, exc_tb):
class PipeConnection(_ConnectionBase):
Connection class based on a Windows named pipe.
Overlapped I/O is used, so the handles must have been created
with FILE_FLAG_OVERLAPPED.
_got_empty_message = False
def _close(self, _CloseHandle=_winapi.CloseHandle):
_CloseHandle(self._handle)
def _send_bytes(self, buf):
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
[ov.event], False, INFINITE)
assert waitres == WAIT_OBJECT_0
nwritten, err = ov.GetOverlappedResult(True)
assert nwritten == len(buf)
def _recv_bytes(self, maxsize=None):
if self._got_empty_message:
self._got_empty_message = False
bsize = 128 if maxsize is None else min(maxsize, 128)
ov, err = _winapi.ReadFile(self._handle, bsize,
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
[ov.event], False, INFINITE)
assert waitres == WAIT_OBJECT_0
nread, err = ov.GetOverlappedResult(True)
elif err == _winapi.ERROR_MORE_DATA:
return self._get_more_data(ov, maxsize)
if e.winerror == _winapi.ERROR_BROKEN_PIPE:
raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
def _poll(self, timeout):
if (self._got_empty_message or
_winapi.PeekNamedPipe(self._handle)[0] != 0):
return bool(wait([self], timeout))
def _get_more_data(self, ov, maxsize):
left = _winapi.PeekNamedPipe(self._handle)[1]
if maxsize is not None and len(buf) + left > maxsize:
self._bad_message_length()
ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
rbytes, err = ov.GetOverlappedResult(True)
class Connection(_ConnectionBase):
Connection class based on an arbitrary file descriptor (Unix only), or
a socket handle (Windows).
def _close(self, _close=_multiprocessing.closesocket):
_write = _multiprocessing.send
_read = _multiprocessing.recv
def _close(self, _close=os.close):
def _send(self, buf, write=_write):
n = write(self._handle, buf)
def _recv(self, size, read=_read):
chunk = read(handle, remaining)
raise OSError("got end of file during message")
def _send_bytes(self, buf):
pre_header = struct.pack("!i", -1)
header = struct.pack("!Q", n)
# For wire compatibility with 3.7 and lower
header = struct.pack("!i", n)
# The payload is large so Nagle's algorithm won't be triggered
# and we'd better avoid the cost of concatenation.
# Issue #20540: concatenate before sending, to avoid delays due
# to Nagle's algorithm on a TCP socket.
# Also note we want to avoid sending a 0-length buffer separately,
# to avoid "broken pipe" errors if the other end closed the pipe.
def _recv_bytes(self, maxsize=None):
size, = struct.unpack("!i", buf.getvalue())
size, = struct.unpack("!Q", buf.getvalue())
if maxsize is not None and size > maxsize:
def _poll(self, timeout):
r = wait([self], timeout)
Returns a listener object.
This is a wrapper for a bound socket which is 'listening' for
connections, or for a Windows named pipe.
def __init__(self, address=None, family=None, backlog=1, authkey=None):
family = family or (address and address_type(address)) \
address = address or arbitrary_address(family)
self._listener = PipeListener(address, backlog)
self._listener = SocketListener(address, family, backlog)
if authkey is not None and not isinstance(authkey, bytes):
raise TypeError('authkey should be a byte string')
Accept a connection on the bound socket or named pipe of `self`.
Returns a `Connection` object.
if self._listener is None:
raise OSError('listener is closed')
c = self._listener.accept()
deliver_challenge(c, self._authkey)
answer_challenge(c, self._authkey)
Close the bound socket or named pipe of `self`.
listener = self._listener
return self._listener._address
return self._listener._last_accepted
def __exit__(self, exc_type, exc_value, exc_tb):
def Client(address, family=None, authkey=None):
Returns a connection to the address of a `Listener`