Handle nyx WS message types (message, new, stop, ping)

- Accept {type: 'message', content} format from nyx
- Handle 'new' (create fresh session), 'stop' (cancel pipeline)
- Ignore 'ping', 'auth', 'connect' (auth via query params)
- Send 'ready' after session_info on WS connect

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nico 2026-03-31 21:16:15 +02:00
parent e205e99da0
commit 38d63b0daa

View File

@ -232,11 +232,14 @@ def register_routes(app):
runtime.update_identity(user_claims, origin)
runtime.attach_ws(ws)
# Tell client which session they're on
# Tell client which session they're on + ready signal
try:
await ws.send_text(json.dumps({
"type": "session_info", "session_id": runtime.session_id,
"graph": runtime.graph.get("name", "unknown")}))
"graph": runtime.graph.get("name", "unknown"),
"history_len": len(runtime.history)}))
await ws.send_text(json.dumps({
"type": "ready", "session_id": runtime.session_id}))
except Exception:
pass
@ -244,9 +247,19 @@ def register_routes(app):
while True:
data = await ws.receive_text()
msg = json.loads(data)
msg_type = msg.get("type", "")
rt = _sessions.get(runtime.session_id, runtime)
# Ping — keep-alive, no processing
if msg_type == "ping":
continue
# Auth message from nyx — already authed via query param, ignore
if msg_type in ("auth", "connect"):
continue
try:
if msg.get("type") == "action":
if msg_type == "action":
action = msg.get("action", "unknown")
data_payload = msg.get("data")
if hasattr(rt, 'use_frames') and rt.use_frames:
@ -256,12 +269,46 @@ def register_routes(app):
await rt.handle_message(action_text)
else:
await rt.handle_action(action, data_payload)
elif msg.get("type") == "cancel_process":
elif msg_type == "cancel_process":
rt.process_manager.cancel(msg.get("pid", 0))
elif msg_type == "new":
# New session requested
rt.detach_ws()
new_rt = await _get_or_create_session(
user_claims=user_claims, origin=origin)
new_rt.attach_ws(ws)
runtime = new_rt
rt = new_rt
await ws.send_text(json.dumps({
"type": "session_info", "session_id": rt.session_id,
"graph": rt.graph.get("name", "unknown"),
"history_len": 0}))
await ws.send_text(json.dumps({"type": "cleared"}))
continue
elif msg_type == "stop":
# Cancel running pipeline
if _pipeline_task and not _pipeline_task.done():
_pipeline_task.cancel()
continue
elif msg_type == "message":
# nyx format: {type: 'message', content: '...'}
text = msg.get("content", "").strip()
if text:
await rt.handle_message(text, dashboard=msg.get("dashboard"))
else:
await rt.handle_message(msg.get("text", ""), dashboard=msg.get("dashboard"))
# Auto-save after each message
# Legacy/assay format: {text: '...', dashboard: [...]}
text = msg.get("text", "").strip()
if text:
await rt.handle_message(text, dashboard=msg.get("dashboard"))
# Auto-save after each processed message
asyncio.create_task(_save_session(rt))
except Exception as e:
import traceback
log.error(f"[ws] handler error: {e}\n{traceback.format_exc()}")