__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
from .coroutines import coroutine
STDOUT = subprocess.STDOUT
DEVNULL = subprocess.DEVNULL
class SubprocessStreamProtocol(streams.FlowControlMixin,
protocols.SubprocessProtocol):
"""Like StreamReaderProtocol, but for a subprocess."""
def __init__(self, limit, loop):
super().__init__(loop=loop)
self.stdin = self.stdout = self.stderr = None
self._process_exited = False
info = [self.__class__.__name__]
if self.stdin is not None:
info.append('stdin=%r' % self.stdin)
if self.stdout is not None:
info.append('stdout=%r' % self.stdout)
if self.stderr is not None:
info.append('stderr=%r' % self.stderr)
return '<%s>' % ' '.join(info)
def connection_made(self, transport):
self._transport = transport
stdout_transport = transport.get_pipe_transport(1)
if stdout_transport is not None:
self.stdout = streams.StreamReader(limit=self._limit,
self.stdout.set_transport(stdout_transport)
stderr_transport = transport.get_pipe_transport(2)
if stderr_transport is not None:
self.stderr = streams.StreamReader(limit=self._limit,
self.stderr.set_transport(stderr_transport)
stdin_transport = transport.get_pipe_transport(0)
if stdin_transport is not None:
self.stdin = streams.StreamWriter(stdin_transport,
def pipe_data_received(self, fd, data):
def pipe_connection_lost(self, fd, exc):
self.connection_lost(exc)
reader.set_exception(exc)
self._pipe_fds.remove(fd)
self._maybe_close_transport()
def process_exited(self):
self._process_exited = True
self._maybe_close_transport()
def _maybe_close_transport(self):
if len(self._pipe_fds) == 0 and self._process_exited:
def __init__(self, transport, protocol, loop):
self._transport = transport
self._protocol = protocol
self.stdin = protocol.stdin
self.stdout = protocol.stdout
self.stderr = protocol.stderr
self.pid = transport.get_pid()
return '<%s %s>' % (self.__class__.__name__, self.pid)
return self._transport.get_returncode()
"""Wait until the process exit and return the process return code.
This method is a coroutine."""
return (yield from self._transport._wait())
def send_signal(self, signal):
self._transport.send_signal(signal)
self._transport.terminate()
def _feed_stdin(self, input):
debug = self._loop.get_debug()
logger.debug('%r communicate: feed stdin (%s bytes)',
yield from self.stdin.drain()
except (BrokenPipeError, ConnectionResetError) as exc:
# communicate() ignores BrokenPipeError and ConnectionResetError
logger.debug('%r communicate: stdin got %r', self, exc)
logger.debug('%r communicate: close stdin', self)
def _read_stream(self, fd):
transport = self._transport.get_pipe_transport(fd)
if self._loop.get_debug():
name = 'stdout' if fd == 1 else 'stderr'
logger.debug('%r communicate: read %s', self, name)
output = yield from stream.read()
if self._loop.get_debug():
name = 'stdout' if fd == 1 else 'stderr'
logger.debug('%r communicate: close %s', self, name)
def communicate(self, input=None):
stdin = self._feed_stdin(input)
if self.stdout is not None:
stdout = self._read_stream(1)
if self.stderr is not None:
stderr = self._read_stream(2)
stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
loop = events.get_event_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
transport, protocol = yield from loop.subprocess_shell(
cmd, stdin=stdin, stdout=stdout,
return Process(transport, protocol, loop)
def create_subprocess_exec(program, *args, stdin=None, stdout=None,
limit=streams._DEFAULT_LIMIT, **kwds):
loop = events.get_event_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
transport, protocol = yield from loop.subprocess_exec(
stdin=stdin, stdout=stdout,
return Process(transport, protocol, loop)