Files
go-socket/machine-client/clientTest.py
T

411 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import threading
import requests
import websockets
from datetime import datetime
from prompt_toolkit import PromptSession
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.keys import Keys
BASE_URL = "http://localhost:8080"
WS_URI = "ws://localhost:8080/ws"
RETRY_DELAY = 2
token: str | None = None
user_id: int | None = None
bindings = KeyBindings()
@bindings.add(Keys.Enter)
def submit(event):
event.current_buffer.validate_and_handle()
@bindings.add(Keys.Escape, Keys.Enter)
def newline(event):
event.current_buffer.insert_text("\n")
send_queue: asyncio.Queue = None
loop: asyncio.AbstractEventLoop = None
shutdown_event: asyncio.Event = None
# ── helpers ──────────────────────────────────────────────────────────────────
def post(path, **data):
return requests.post(f"{BASE_URL}{path}", data=data)
def post_auth(path, **data):
return requests.post(f"{BASE_URL}{path}", data={"token": token, **data})
def post_auth_header(path, files=None, **data):
return requests.post(f"{BASE_URL}{path}", headers={"token": token}, data=data, files=files)
def fmt_msg(m: dict) -> str:
sender = m.get("sender", "?")
content = m.get("content", "")
ts = m.get("createdAt", "")
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
ts = dt.strftime("%H:%M:%S")
except Exception:
pass
return f"[{ts}] <{sender}> {content}"
# ── commands ─────────────────────────────────────────────────────────────────
def cmd_new_user(args):
if len(args) < 2:
print("usage: /new/user <username> <password>")
return
r = post("/new/user", username=args[0], password=args[1])
print("registered" if r.ok else f"error: {r.text}")
def cmd_new_token(args):
global token, user_id
if len(args) < 2:
print("usage: /new/token <username> <password>")
return
r = post("/new/token", username=args[0], password=args[1])
if r.ok:
data = r.json()
token = data["token"]
user_id = data["userId"]
print(f"logged in user_id={user_id}")
asyncio.run_coroutine_threadsafe(
send_queue.put(json.dumps({"token": token})), loop
)
else:
print(f"login failed: {r.text}")
def cmd_msg_user(args):
if not token:
print("not logged in")
return
if len(args) < 2:
print("usage: /msg/user <connectionid> <message…>")
return
conn_id = args[0]
content = " ".join(args[1:])
r = post_auth("/msg/user", connectionid=conn_id, msgContent=content)
print("sent" if r.ok else f"error: {r.text}")
def cmd_get_connection_messages(args):
if not token:
print("not logged in")
return
if not args:
print("usage: /get/connection/messages <connectionid> [count] [before]")
return
data = {"connectionid": args[0]}
if len(args) > 1:
data["messages"] = args[1]
if len(args) > 2:
data["before"] = args[2]
r = post_auth("/get/connection/messages", **data)
if r.ok:
msgs = r.json() or []
if not msgs:
print("no messages")
for m in msgs:
print(fmt_msg(m))
else:
print(f"error: {r.text}")
def cmd_get_connections(args):
if not token:
print("not logged in")
return
r = post_auth("/get/connections")
if r.ok:
for c in (r.json() or []):
print(f" {c['id']} requestor={c['requestorId']} recipient={c['recipientId']} state={c['state']}")
else:
print(f"error: {r.text}")
def cmd_new_connection(args):
if not token:
print("not logged in")
return
if not args:
print("usage: /new/connection <userid>")
return
r = post_auth("/new/connection", recipient=args[0])
print("connection requested" if r.ok else f"error: {r.text}")
def cmd_mod_connection_elevate(args):
if not token:
print("not logged in")
return
if not args:
print("usage: /mod/connection/elevate <connectionid>")
return
r = post_auth("/mod/connection/elevate", connectionid=args[0])
print(f"ok: {r.text}" if r.ok else f"error: {r.text}")
def cmd_del_connection(args):
if not token:
print("not logged in")
return
if not args:
print("usage: /del/connection <connectionid>")
return
r = post_auth("/del/connection", connectionid=args[0])
print("deleted" if r.ok else f"error: {r.text}")
def cmd_del_user(args):
if not token:
print("not logged in")
return
r = post_auth("/del/user")
print("user deleted" if r.ok else f"error: {r.text}")
def cmd_get_user(args):
if not token:
print("not logged in")
return
if not args:
print("usage: /get/user <userid>")
return
r = post_auth("/get/user", targetid=args[0])
if r.ok:
print(json.dumps(r.json(), indent=2))
else:
print(f"error: {r.text}")
def cmd_mod_user_profile(args):
if not token:
print("not logged in")
return
data = {}
for arg in args:
if "=" in arg:
k, v = arg.split("=", 1)
data[k] = v
if not data:
print("usage: /mod/user/profile [pronouns=...] [description=...] [color=r,g,b,a]")
return
r = post_auth("/mod/user/profile", **data)
print("updated" if r.ok else f"error: {r.text}")
def cmd_get_file(args):
if not token:
print("not logged in")
return
if len(args) < 2:
print("usage: /get/file <connectionid> <key>")
return
r = post_auth_header("/get/file", connectionid=args[0], key=args[1])
if r.ok:
print(json.dumps(r.json(), indent=2))
else:
print(f"error: {r.text}")
def cmd_new_file(args):
if not token:
print("not logged in")
return
if len(args) < 2:
print("usage: /new/file <connectionid> <filepath>")
return
conn_id, filepath = args[0], args[1]
try:
with open(filepath, "rb") as f:
r = post_auth_header("/new/file", files={"file": f}, connectionid=conn_id)
print(f"uploaded: {r.text}" if r.ok else f"error: {r.text}")
except FileNotFoundError:
print(f"file not found: {filepath}")
def cmd_mod_user_avatar(args):
if not token:
print("not logged in")
return
if not args:
print("usage: /mod/user/avatar <filepath>")
return
filepath = args[0]
try:
with open(filepath, "rb") as f:
r = post_auth_header("/mod/user/avatar", files={"file": f})
print("avatar updated" if r.ok else f"error: {r.text}")
except FileNotFoundError:
print(f"file not found: {filepath}")
def cmd_mod_user_profilebg(args):
if not token:
print("not logged in")
return
if not args:
print("usage: /mod/user/profilebg <filepath>")
return
filepath = args[0]
try:
with open(filepath, "rb") as f:
r = post_auth_header("/mod/user/profilebg", files={"file": f})
print("profile background updated" if r.ok else f"error: {r.text}")
except FileNotFoundError:
print(f"file not found: {filepath}")
def cmd_get_user_avatar(args):
if not token:
print("not logged in")
return
if not args:
print("usage: /get/user/avatar <userid>")
return
r = post_auth_header("/get/user/avatar", userid=args[0])
print(r.text if r.ok else f"error: {r.text}")
def cmd_get_user_profilebg(args):
if not token:
print("not logged in")
return
if not args:
print("usage: /get/user/profilebg <userid>")
return
r = post_auth_header("/get/user/profilebg", userid=args[0])
print(r.text if r.ok else f"error: {r.text}")
COMMANDS = {
"/new/user": cmd_new_user,
"/new/token": cmd_new_token,
"/new/connection": cmd_new_connection,
"/new/file": cmd_new_file,
"/mod/user/profile": cmd_mod_user_profile,
"/mod/user/avatar": cmd_mod_user_avatar,
"/mod/user/profilebg": cmd_mod_user_profilebg,
"/mod/connection/elevate": cmd_mod_connection_elevate,
"/get/user": cmd_get_user,
"/get/connections": cmd_get_connections,
"/get/connection/messages": cmd_get_connection_messages,
"/get/file": cmd_get_file,
"/get/user/avatar": cmd_get_user_avatar,
"/get/user/profilebg": cmd_get_user_profilebg,
"/del/user": cmd_del_user,
"/del/connection": cmd_del_connection,
"/msg/user": cmd_msg_user,
}
HELP = """
/new/user <user> <pass> create account
/new/token <user> <pass> authenticate
/new/connection <userid> send connection request
/new/file <connectionid> <filepath> upload attachment
/mod/user/profile [pronouns=...] [description=...] [color=r,g,b,a] update profile
/mod/user/avatar <filepath> set avatar image
/mod/user/profilebg <filepath> set profile background
/mod/connection/elevate <connectionid> elevate connection
/get/user <userid> get user info
/get/connections list your connections
/get/connection/messages <connectionid> [count] [before] fetch message history
/get/file <connectionid> <key> download attachment URL
/get/user/avatar <userid> get avatar download URL
/get/user/profilebg <userid> get profile background URL
/del/user delete your account
/del/connection <connectionid> delete a connection
/msg/user <connectionid> <message…> send a DM
"""
# ── websocket ─────────────────────────────────────────────────────────────────
async def receiver(ws):
async for raw in ws:
try:
data = json.loads(raw)
if "content" in data and "sender" in data:
print(f"\n{fmt_msg(data)}", flush=True)
else:
print(f"\n[SERVER] {json.dumps(data, indent=2)}", flush=True)
except json.JSONDecodeError:
print(f"\n[SERVER] {raw}", flush=True)
async def sender(ws):
while True:
msg = await send_queue.get()
if msg is None:
break
await ws.send(msg)
async def run():
global send_queue
send_queue = asyncio.Queue()
input_thread = threading.Thread(target=input_loop, daemon=True)
input_thread_started = False
while not shutdown_event.is_set():
try:
async with websockets.connect(WS_URI) as ws:
print(f"connected to {WS_URI}")
if not input_thread_started:
print("Alt+Enter = newline | Enter = send | /help | Ctrl+C = quit\n")
input_thread.start()
input_thread_started = True
if token:
await ws.send(json.dumps({"token": token}))
recv_task = asyncio.create_task(receiver(ws))
send_task = asyncio.create_task(sender(ws))
shutdown_task = asyncio.create_task(shutdown_event.wait())
done, pending = await asyncio.wait(
[recv_task, send_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
if shutdown_event.is_set():
return
send_queue = asyncio.Queue()
print(f"\n[disconnected] reconnecting in {RETRY_DELAY}s…")
except (OSError, websockets.exceptions.WebSocketException):
if shutdown_event.is_set():
return
print(f"[waiting for server] retrying in {RETRY_DELAY}s…", flush=True)
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=RETRY_DELAY)
except asyncio.TimeoutError:
pass
# ── input loop ────────────────────────────────────────────────────────────────
def input_loop():
session = PromptSession(key_bindings=bindings, multiline=True)
while True:
try:
text = session.prompt(">>> ").strip()
if not text:
continue
if text == "/help":
print(HELP)
continue
parts = text.split()
cmd = parts[0]
if cmd in COMMANDS:
COMMANDS[cmd](parts[1:])
else:
asyncio.run_coroutine_threadsafe(send_queue.put(text), loop)
except (EOFError, KeyboardInterrupt):
asyncio.run_coroutine_threadsafe(shutdown_event.set(), loop)
break
# ── main ──────────────────────────────────────────────────────────────────────
def main():
global loop, shutdown_event
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
shutdown_event = asyncio.Event()
try:
loop.run_until_complete(run())
except KeyboardInterrupt:
pass
finally:
loop.close()
if __name__ == "__main__":
main()