import asyncio import threading import websockets from prompt_toolkit import PromptSession from prompt_toolkit.key_binding import KeyBindings from prompt_toolkit.keys import Keys URI = "ws://localhost:8080/ws" RETRY_DELAY = 2 bindings = KeyBindings() @bindings.add(Keys.Enter) def submit(event): event.current_buffer.validate_and_handle() @bindings.add(Keys.Escape, Keys.Enter) def newline(event): event.current_buffer.insert_text("\n") send_queue: asyncio.Queue = None loop: asyncio.AbstractEventLoop = None shutdown_event: asyncio.Event = None async def receiver(ws): async for message in ws: print(f"\n[SERVER] {message}", flush=True) async def sender(ws): while True: msg = await send_queue.get() if msg is None: break await ws.send(msg) async def run(): global send_queue send_queue = asyncio.Queue() input_thread = threading.Thread(target=input_loop, daemon=True) input_thread_started = False while not shutdown_event.is_set(): try: async with websockets.connect(URI) as ws: print(f"Connected to {URI}") if not input_thread_started: print("Alt+Enter = newline | Enter = send | Ctrl+C = quit\n") input_thread.start() input_thread_started = True recv_task = asyncio.create_task(receiver(ws)) send_task = asyncio.create_task(sender(ws)) shutdown_task = asyncio.create_task(shutdown_event.wait()) done, pending = await asyncio.wait( [recv_task, send_task, shutdown_task], return_when=asyncio.FIRST_COMPLETED, ) for t in pending: t.cancel() if shutdown_event.is_set(): return # reconnect send_queue = asyncio.Queue() print(f"\n[Disconnected] Reconnecting in {RETRY_DELAY}s...") except (OSError, websockets.exceptions.WebSocketException): if shutdown_event.is_set(): return print(f"[Waiting for server] Retrying in {RETRY_DELAY}s...", flush=True) try: await asyncio.wait_for(shutdown_event.wait(), timeout=RETRY_DELAY) except asyncio.TimeoutError: pass def input_loop(): session = PromptSession( key_bindings=bindings, multiline=True, ) while True: try: text = session.prompt(">>> ") if text is not None: asyncio.run_coroutine_threadsafe(send_queue.put(text), loop) except (EOFError, KeyboardInterrupt): asyncio.run_coroutine_threadsafe(shutdown_event.set(), loop) break def main(): global loop, shutdown_event loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) shutdown_event = asyncio.Event() try: loop.run_until_complete(run()) except KeyboardInterrupt: pass finally: loop.close() if __name__ == "__main__": main()