import concurrent.futures
class AsyncIOInteractiveConsole(code.InteractiveConsole):
def __init__(self, locals, loop):
self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
future = concurrent.futures.Future()
global repl_future_interrupted
repl_future_interrupted = False
func = types.FunctionType(code, self.locals)
except KeyboardInterrupt as ex:
repl_future_interrupted = True
except BaseException as ex:
if not inspect.iscoroutine(coro):
repl_future = self.loop.create_task(coro)
futures._chain_future(repl_future, future)
except BaseException as exc:
future.set_exception(exc)
loop.call_soon_threadsafe(callback)
if repl_future_interrupted:
self.write("\nKeyboardInterrupt\n")
class REPLThread(threading.Thread):
f'asyncio REPL {sys.version} on {sys.platform}\n'
f'Use "await" directly instead of "asyncio.run()".\n'
f'Type "help", "copyright", "credits" or "license" '
f'for more information.\n'
f'{getattr(sys, "ps1", ">>> ")}import asyncio'
exitmsg='exiting asyncio REPL...')
message=r'^coroutine .* was never awaited$',
loop.call_soon_threadsafe(loop.stop)
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
repl_locals = {'asyncio': asyncio}
for key in {'__name__', '__package__',
'__loader__', '__spec__',
'__builtins__', '__file__'}:
repl_locals[key] = locals()[key]
console = AsyncIOInteractiveConsole(repl_locals, loop)
repl_future_interrupted = False
repl_thread = REPLThread()
repl_thread.daemon = True
except KeyboardInterrupt:
if repl_future and not repl_future.done():
repl_future_interrupted = True