From 38d63b0daa88240b295ef76910f7205c2a86845a Mon Sep 17 00:00:00 2001 From: Nico Date: Tue, 31 Mar 2026 21:16:15 +0200 Subject: [PATCH] Handle nyx WS message types (message, new, stop, ping) - Accept {type: 'message', content} format from nyx - Handle 'new' (create fresh session), 'stop' (cancel pipeline) - Ignore 'ping', 'auth', 'connect' (auth via query params) - Send 'ready' after session_info on WS connect Co-Authored-By: Claude Opus 4.6 (1M context) --- agent/api.py | 59 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/agent/api.py b/agent/api.py index 6db925d..e318edb 100644 --- a/agent/api.py +++ b/agent/api.py @@ -232,11 +232,14 @@ def register_routes(app): runtime.update_identity(user_claims, origin) runtime.attach_ws(ws) - # Tell client which session they're on + # Tell client which session they're on + ready signal try: await ws.send_text(json.dumps({ "type": "session_info", "session_id": runtime.session_id, - "graph": runtime.graph.get("name", "unknown")})) + "graph": runtime.graph.get("name", "unknown"), + "history_len": len(runtime.history)})) + await ws.send_text(json.dumps({ + "type": "ready", "session_id": runtime.session_id})) except Exception: pass @@ -244,9 +247,19 @@ def register_routes(app): while True: data = await ws.receive_text() msg = json.loads(data) + msg_type = msg.get("type", "") rt = _sessions.get(runtime.session_id, runtime) + + # Ping — keep-alive, no processing + if msg_type == "ping": + continue + + # Auth message from nyx — already authed via query param, ignore + if msg_type in ("auth", "connect"): + continue + try: - if msg.get("type") == "action": + if msg_type == "action": action = msg.get("action", "unknown") data_payload = msg.get("data") if hasattr(rt, 'use_frames') and rt.use_frames: @@ -256,12 +269,46 @@ def register_routes(app): await rt.handle_message(action_text) else: await rt.handle_action(action, data_payload) - elif msg.get("type") == "cancel_process": + + elif msg_type == "cancel_process": rt.process_manager.cancel(msg.get("pid", 0)) + + elif msg_type == "new": + # New session requested + rt.detach_ws() + new_rt = await _get_or_create_session( + user_claims=user_claims, origin=origin) + new_rt.attach_ws(ws) + runtime = new_rt + rt = new_rt + await ws.send_text(json.dumps({ + "type": "session_info", "session_id": rt.session_id, + "graph": rt.graph.get("name", "unknown"), + "history_len": 0})) + await ws.send_text(json.dumps({"type": "cleared"})) + continue + + elif msg_type == "stop": + # Cancel running pipeline + if _pipeline_task and not _pipeline_task.done(): + _pipeline_task.cancel() + continue + + elif msg_type == "message": + # nyx format: {type: 'message', content: '...'} + text = msg.get("content", "").strip() + if text: + await rt.handle_message(text, dashboard=msg.get("dashboard")) + else: - await rt.handle_message(msg.get("text", ""), dashboard=msg.get("dashboard")) - # Auto-save after each message + # Legacy/assay format: {text: '...', dashboard: [...]} + text = msg.get("text", "").strip() + if text: + await rt.handle_message(text, dashboard=msg.get("dashboard")) + + # Auto-save after each processed message asyncio.create_task(_save_session(rt)) + except Exception as e: import traceback log.error(f"[ws] handler error: {e}\n{traceback.format_exc()}")