diff --git a/agent/api.py b/agent/api.py index 6db925d..e318edb 100644 --- a/agent/api.py +++ b/agent/api.py @@ -232,11 +232,14 @@ def register_routes(app): runtime.update_identity(user_claims, origin) runtime.attach_ws(ws) - # Tell client which session they're on + # Tell client which session they're on + ready signal try: await ws.send_text(json.dumps({ "type": "session_info", "session_id": runtime.session_id, - "graph": runtime.graph.get("name", "unknown")})) + "graph": runtime.graph.get("name", "unknown"), + "history_len": len(runtime.history)})) + await ws.send_text(json.dumps({ + "type": "ready", "session_id": runtime.session_id})) except Exception: pass @@ -244,9 +247,19 @@ def register_routes(app): while True: data = await ws.receive_text() msg = json.loads(data) + msg_type = msg.get("type", "") rt = _sessions.get(runtime.session_id, runtime) + + # Ping — keep-alive, no processing + if msg_type == "ping": + continue + + # Auth message from nyx — already authed via query param, ignore + if msg_type in ("auth", "connect"): + continue + try: - if msg.get("type") == "action": + if msg_type == "action": action = msg.get("action", "unknown") data_payload = msg.get("data") if hasattr(rt, 'use_frames') and rt.use_frames: @@ -256,12 +269,46 @@ def register_routes(app): await rt.handle_message(action_text) else: await rt.handle_action(action, data_payload) - elif msg.get("type") == "cancel_process": + + elif msg_type == "cancel_process": rt.process_manager.cancel(msg.get("pid", 0)) + + elif msg_type == "new": + # New session requested + rt.detach_ws() + new_rt = await _get_or_create_session( + user_claims=user_claims, origin=origin) + new_rt.attach_ws(ws) + runtime = new_rt + rt = new_rt + await ws.send_text(json.dumps({ + "type": "session_info", "session_id": rt.session_id, + "graph": rt.graph.get("name", "unknown"), + "history_len": 0})) + await ws.send_text(json.dumps({"type": "cleared"})) + continue + + elif msg_type == "stop": + # Cancel running pipeline + if _pipeline_task and not _pipeline_task.done(): + _pipeline_task.cancel() + continue + + elif msg_type == "message": + # nyx format: {type: 'message', content: '...'} + text = msg.get("content", "").strip() + if text: + await rt.handle_message(text, dashboard=msg.get("dashboard")) + else: - await rt.handle_message(msg.get("text", ""), dashboard=msg.get("dashboard")) - # Auto-save after each message + # Legacy/assay format: {text: '...', dashboard: [...]} + text = msg.get("text", "").strip() + if text: + await rt.handle_message(text, dashboard=msg.get("dashboard")) + + # Auto-save after each processed message asyncio.create_task(_save_session(rt)) + except Exception as e: import traceback log.error(f"[ws] handler error: {e}\n{traceback.format_exc()}")