# Only create this mapping if we haven't already.
if not self._communication_started:
self._fileobj2output = {}
self._fileobj2output[self.stdout] = []
self._fileobj2output[self.stderr] = []
stdout = self._fileobj2output[self.stdout]
stderr = self._fileobj2output[self.stderr]
input_view = memoryview(self._input)
with _PopenSelector() as selector:
selector.register(self.stdin, selectors.EVENT_WRITE)
selector.register(self.stdout, selectors.EVENT_READ)
selector.register(self.stderr, selectors.EVENT_READ)
while selector.get_map():
timeout = self._remaining_time(endtime)
if timeout is not None and timeout < 0:
raise TimeoutExpired(self.args, orig_timeout)
ready = selector.select(timeout)
self._check_timeout(endtime, orig_timeout)
# XXX Rewrite these to use non-blocking I/O on the file
# objects; they are no longer using C stdio!
for key, events in ready:
if key.fileobj is self.stdin:
chunk = input_view[self._input_offset :
self._input_offset + _PIPE_BUF]
self._input_offset += os.write(key.fd, chunk)
selector.unregister(key.fileobj)
if self._input_offset >= len(self._input):
selector.unregister(key.fileobj)
elif key.fileobj in (self.stdout, self.stderr):
data = os.read(key.fd, 32768)
selector.unregister(key.fileobj)
self._fileobj2output[key.fileobj].append(data)
self.wait(timeout=self._remaining_time(endtime))
# All data exchanged. Translate lists into strings.
stdout = b''.join(stdout)
stderr = b''.join(stderr)
# Translate newlines, if requested.
# This also turns bytes into strings.
if self.encoding or self.errors or self.universal_newlines:
stdout = self._translate_newlines(stdout,
stderr = self._translate_newlines(stderr,
def _save_input(self, input):
# This method is called from the _communicate_with_*() methods
# so that if we time out while communicating, we can continue
# sending input if we retry.
if self.stdin and self._input is None:
if input is not None and (
self.encoding or self.errors or self.universal_newlines):
self._input = self._input.encode(self.stdin.encoding,
def send_signal(self, sig):
"""Send a signal to the process."""
# Skip signalling a process that we know has already died.
if self.returncode is None:
"""Terminate the process with SIGTERM
self.send_signal(signal.SIGTERM)
"""Kill the process with SIGKILL
self.send_signal(signal.SIGKILL)