This repository has been archived on 2026-04-03. You can view files and clone it, but cannot push or open issues or pull requests.
agent-runtime/agent/runtime.py
Nico 376fdb2458 Wire NodePool into Runtime + API: shared stateless nodes across sessions
- Runtime accepts optional pool= param, uses shared nodes from pool
  for stateless roles, creates fresh sensor/memorizer/ui per-session
- FrameEngine sets _current_hud contextvar at start of process_message
- API creates global NodePool once, passes to all Runtime instances
- Graph switch resets pool for new graph
- Legacy _ensure_runtime also uses pool
- 15/15 engine + matrix tests green

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 18:41:15 +02:00

471 lines
21 KiB
Python

"""Runtime: wires all nodes together into a processing pipeline."""
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Callable
from uuid import uuid4
from .types import Envelope, Command, InputAnalysis, ThoughtResult, DirectorPlan
from .process import ProcessManager
from .engine import load_graph, instantiate_nodes, list_graphs, get_graph_for_cytoscape
from .frame_engine import FrameEngine
log = logging.getLogger("runtime")
TRACE_FILE = Path(__file__).parent.parent / "trace.jsonl"
# Default graph — can be switched at runtime
_active_graph_name = "v4-eras"
class OutputSink:
"""Collects output. Streams to attached WebSocket or SSE queue."""
def __init__(self):
self.ws = None
self.queue: asyncio.Queue | None = None # SSE streaming queue
self.response: str = ""
self.controls: list = []
self.done: bool = False
def attach(self, ws):
self.ws = ws
def detach(self):
self.ws = None
def attach_queue(self, queue: asyncio.Queue):
"""Attach an asyncio.Queue for SSE streaming (HTTP mode)."""
self.queue = queue
def detach_queue(self):
self.queue = None
def reset(self):
self.response = ""
self.controls = []
self.done = False
async def _emit(self, event: dict):
"""Send event to WS or SSE queue."""
msg = json.dumps(event)
if self.queue:
try:
self.queue.put_nowait(event)
except asyncio.QueueFull:
pass
if self.ws:
try:
await self.ws.send_text(msg)
except Exception:
pass
async def send_delta(self, text: str):
self.response += text
await self._emit({"type": "delta", "content": text})
async def send_controls(self, controls: list):
self.controls = controls
await self._emit({"type": "controls", "controls": controls})
async def send_artifacts(self, artifacts: list):
await self._emit({"type": "artifacts", "artifacts": artifacts})
async def send_hud(self, data: dict):
await self._emit({"type": "hud", **data})
async def send_done(self):
self.done = True
await self._emit({"type": "done"})
class Runtime:
def __init__(self, user_claims: dict = None, origin: str = "",
broadcast: Callable = None, graph_name: str = None,
session_id: str = None, pool=None):
self.session_id = session_id or str(uuid4())
self.sink = OutputSink()
self.history: list[dict] = []
self.MAX_HISTORY = 40
self._broadcast = broadcast or (lambda e: None)
gname = graph_name or _active_graph_name
if pool:
# Phase 2: use shared node pool for stateless nodes
self.graph = pool.graph
self.process_manager = ProcessManager(send_hud=self._send_hud)
# Shared nodes from pool (stateless, serve all sessions)
self.input_node = pool.shared.get("input")
self.thinker = pool.shared.get("thinker")
self.output_node = pool.shared.get("output")
self.director = pool.shared.get("director")
self.interpreter = pool.shared.get("interpreter")
# Per-session stateful nodes (fresh each session)
from .nodes import UINode, MemorizerNodeV1 as MemorizerNode, SensorNode
self.ui_node = UINode(send_hud=self._send_hud)
self.memorizer = MemorizerNode(send_hud=self._send_hud)
self.sensor = SensorNode(send_hud=self._send_hud)
# Build combined nodes dict for FrameEngine
nodes = dict(pool.shared)
nodes["ui"] = self.ui_node
nodes["memorizer"] = self.memorizer
nodes["sensor"] = self.sensor
log.info(f"[runtime] using shared pool for graph '{gname}' "
f"({len(pool.shared)} shared, 3 per-session)")
else:
# Legacy: create all nodes per-session
self.graph = load_graph(gname)
self.process_manager = ProcessManager(send_hud=self._send_hud)
nodes = instantiate_nodes(self.graph, send_hud=self._send_hud,
process_manager=self.process_manager)
self.input_node = nodes["input"]
self.thinker = nodes.get("thinker")
self.output_node = nodes["output"]
self.ui_node = nodes["ui"]
self.memorizer = nodes["memorizer"]
self.director = nodes.get("director")
self.sensor = nodes["sensor"]
self.interpreter = nodes.get("interpreter")
# Detect graph type
self.is_v2 = self.director is not None and hasattr(self.director, "decide")
self.use_frames = self.graph.get("engine") == "frames"
self.sensor.start(
get_memo_state=lambda: self.memorizer.state,
get_server_controls=lambda: self.ui_node.current_controls,
)
claims = user_claims or {}
log.info(f"[runtime] user_claims: {claims}")
self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or "unknown"
log.info(f"[runtime] resolved identity: {self.identity}")
self.channel = origin or "unknown"
self.memorizer.state["user_name"] = self.identity
self.memorizer.state["situation"] = f"authenticated on {self.channel}" if origin else "local session"
# Frame engine (for v3+ graphs)
if self.use_frames:
self.frame_engine = FrameEngine(
graph=self.graph, nodes=nodes, sink=self.sink,
history=self.history, send_hud=self._send_hud,
sensor=self.sensor, memorizer=self.memorizer,
ui_node=self.ui_node, identity=self.identity,
channel=self.channel, broadcast=self._broadcast)
log.info(f"[runtime] using FrameEngine for graph '{gname}'")
def attach_ws(self, ws):
"""Attach a WebSocket for real-time streaming."""
self.sink.attach(ws)
log.info("[runtime] WS attached")
def detach_ws(self):
"""Detach WebSocket. Runtime keeps running."""
self.sink.detach()
log.info("[runtime] WS detached")
def update_identity(self, user_claims: dict, origin: str = ""):
"""Update identity from WS auth claims."""
claims = user_claims or {}
self.identity = claims.get("name") or claims.get("preferred_username") or claims.get("username") or self.identity
if origin:
self.channel = origin
self.memorizer.state["user_name"] = self.identity
self.memorizer.state["situation"] = f"authenticated on {self.channel}"
log.info(f"[runtime] identity updated: {self.identity}")
async def _send_hud(self, data: dict):
await self.sink.send_hud(data)
trace_entry = {"ts": time.strftime("%Y-%m-%d %H:%M:%S.") + f"{time.time() % 1:.3f}"[2:], **data}
try:
with open(TRACE_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(trace_entry, ensure_ascii=False) + "\n")
if TRACE_FILE.exists() and TRACE_FILE.stat().st_size > 500_000:
lines = TRACE_FILE.read_text(encoding="utf-8").strip().split("\n")
TRACE_FILE.write_text("\n".join(lines[-500:]) + "\n", encoding="utf-8")
except Exception as e:
log.error(f"trace write error: {e}")
self._broadcast(trace_entry)
async def _stream_text(self, text: str):
"""Stream pre-formed text to the client as deltas."""
self.sink.reset()
chunk_size = 12
for i in range(0, len(text), chunk_size):
await self.sink.send_delta(text[i:i + chunk_size])
await self.sink.send_done()
async def _run_output_and_ui(self, thought, mem_ctx):
"""Run Output and UI nodes in parallel. Returns (response_text, controls)."""
self.sink.reset()
output_task = asyncio.create_task(
self.output_node.process(thought, self.history, self.sink, memory_context=mem_ctx))
ui_task = asyncio.create_task(
self.ui_node.process(thought, self.history, memory_context=mem_ctx))
response, controls = await asyncio.gather(output_task, ui_task)
if controls:
await self.sink.send_controls(controls)
return response
async def handle_action(self, action: str, data: dict = None):
"""Handle a structured UI action (button click etc.)."""
self.sensor.note_user_activity()
# Try machine transition first (go: target — no LLM needed)
handled, transition_result = self.ui_node.try_machine_transition(action)
if handled:
await self._send_hud({"node": "ui", "event": "machine_transition",
"action": action, "detail": transition_result})
# Re-render all controls (machines + state + buttons)
controls = self.ui_node.get_machine_controls()
# Include non-machine buttons and labels
for ctrl in self.ui_node.current_controls:
if not ctrl.get("machine_id"):
controls.append(ctrl)
self.ui_node.current_controls = controls
await self.sink.send_controls(controls)
await self._send_hud({"node": "ui", "event": "controls", "controls": controls})
await self._stream_text(transition_result)
self.history.append({"role": "user", "content": f"[clicked {action}]"})
self.history.append({"role": "assistant", "content": transition_result})
return
# Try local UI action next (inc, dec, toggle — no LLM needed)
result, controls = await self.ui_node.process_local_action(action, data)
if result is not None:
# Local action handled — send controls update + short response
if controls:
await self.sink.send_controls(controls)
await self._stream_text(result)
self.history.append({"role": "user", "content": f"[clicked {action}]"})
self.history.append({"role": "assistant", "content": result})
return
# Complex action — needs Thinker reasoning
action_desc = f"ACTION: {action}"
if data:
action_desc += f" | data: {json.dumps(data)}"
self.history.append({"role": "user", "content": action_desc})
sensor_lines = self.sensor.get_context_lines()
director_line = self.director.get_context_line() if self.director else ""
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines, ui_state=self.ui_node.state)
if director_line:
mem_ctx += f"\n\n{director_line}"
command = Command(
analysis=InputAnalysis(intent="action", topic=action, complexity="simple"),
source_text=action_desc)
if self.is_v2:
plan = await self.director.decide(command, self.history, memory_context=mem_ctx)
thought = await self.thinker.process(command, plan, self.history, memory_context=mem_ctx)
if self.interpreter and thought.tool_used and thought.tool_output:
interpreted = await self.interpreter.interpret(
thought.tool_used, thought.tool_output, action_desc)
thought.response = interpreted.summary
else:
thought = await self.thinker.process(command, self.history, memory_context=mem_ctx)
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
if not self.is_v2 and self.director:
await self.director.update(self.history, self.memorizer.state)
if len(self.history) > self.MAX_HISTORY:
self.history = self.history[-self.MAX_HISTORY:]
def _format_dashboard(self, dashboard: list) -> str:
"""Format dashboard controls into a context string for Thinker.
Compares browser-reported state against server-side controls to detect mismatches."""
server_controls = self.ui_node.current_controls
server_buttons = [str(c.get("label", "")) for c in server_controls if isinstance(c, dict) and c.get("type") == "button"]
browser_buttons = [str(c.get("label", "")) for c in dashboard if isinstance(c, dict) and c.get("type") == "button"] if dashboard else []
lines = []
# Mismatch detection (S3* audit)
if server_buttons and not browser_buttons:
lines.append(f"WARNING: Server sent {len(server_buttons)} controls but dashboard shows NONE.")
lines.append(f" Expected buttons: {', '.join(server_buttons)}")
lines.append(" Controls failed to render or were lost. You MUST re-emit them in ACTIONS.")
elif server_buttons and sorted(server_buttons) != sorted(browser_buttons):
lines.append(f"WARNING: Dashboard mismatch.")
lines.append(f" Server sent: {', '.join(server_buttons)}")
lines.append(f" Browser shows: {', '.join(browser_buttons) or 'nothing'}")
lines.append(" Re-emit correct controls in ACTIONS if needed.")
if not dashboard:
lines.append("Dashboard: empty (user sees nothing)")
else:
lines.append("Dashboard (what user currently sees):")
for ctrl in dashboard:
ctype = ctrl.get("type", "unknown")
if ctype == "button":
lines.append(f" - Button: {ctrl.get('label', '?')}")
elif ctype == "label":
lines.append(f" - Label: {ctrl.get('text', '?')} = {ctrl.get('value', '?')}")
elif ctype == "table":
cols = ctrl.get("columns", [])
rows = len(ctrl.get("data", []))
lines.append(f" - Table: {', '.join(cols)} ({rows} rows)")
else:
lines.append(f" - {ctype}: {ctrl.get('label', ctrl.get('text', '?'))}")
return "\n".join(lines)
async def handle_message(self, text: str, dashboard: list = None,
model_overrides: dict = None):
# Frame engine: delegate entirely
if self.use_frames:
result = await self.frame_engine.process_message(
text, dashboard, model_overrides=model_overrides)
return result
# Detect ACTION: prefix from API/test runner
if text.startswith("ACTION:"):
parts = text.split("|", 1)
action = parts[0].replace("ACTION:", "").strip()
data = None
if len(parts) > 1:
try:
data = json.loads(parts[1].replace("data:", "").strip())
except (json.JSONDecodeError, Exception):
pass
return await self.handle_action(action, data)
envelope = Envelope(
text=text,
user_id="bob",
session_id="test",
timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
)
self.sensor.note_user_activity()
if dashboard is not None:
self.sensor.update_browser_dashboard(dashboard)
self.history.append({"role": "user", "content": text})
# Check Sensor flags (idle return, workspace mismatch)
sensor_flags = self.sensor.consume_flags()
sensor_lines = self.sensor.get_context_lines()
director_line = self.director.get_context_line() if self.director else ""
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines, ui_state=self.ui_node.state)
if director_line:
mem_ctx += f"\n\n{director_line}"
machine_summary = self.ui_node.get_machine_summary()
if machine_summary:
mem_ctx += f"\n\n{machine_summary}"
if dashboard is not None:
mem_ctx += f"\n\n{self._format_dashboard(dashboard)}"
# Inject sensor flags into context
if sensor_flags:
flag_lines = ["Sensor flags:"]
for f in sensor_flags:
if f["type"] == "idle_return":
flag_lines.append(f" - User returned after {f['away_duration']} away. Welcome them back briefly, mention what's on their dashboard.")
elif f["type"] == "workspace_mismatch":
flag_lines.append(f" - Workspace mismatch detected: {f['detail']}. Check if controls need re-emitting.")
mem_ctx += "\n\n" + "\n".join(flag_lines)
command = await self.input_node.process(
envelope, self.history, memory_context=mem_ctx,
identity=self.identity, channel=self.channel)
# Reflex path: trivial social messages skip Thinker entirely
if command.analysis.intent == "social" and command.analysis.complexity == "trivial":
await self._send_hud({"node": "runtime", "event": "reflex_path",
"detail": f"{command.analysis.intent}/{command.analysis.complexity}"})
thought = ThoughtResult(response=command.source_text, actions=[])
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
if not self.is_v2:
await self.director.update(self.history, self.memorizer.state)
if len(self.history) > self.MAX_HISTORY:
self.history = self.history[-self.MAX_HISTORY:]
return
if self.is_v2:
# v2 flow: Director decides, Thinker executes, Interpreter reads results
plan = await self.director.decide(command, self.history, memory_context=mem_ctx)
thought = await self.thinker.process(command, plan, self.history, memory_context=mem_ctx)
# Interpreter: factual summary of tool results (no hallucination)
if self.interpreter and thought.tool_used and thought.tool_output:
interpreted = await self.interpreter.interpret(
thought.tool_used, thought.tool_output, text)
# Replace thinker's response with interpreter's factual summary
thought.response = interpreted.summary
else:
# v1 flow: optional Director pre-planning for complex requests
is_complex = command.analysis.complexity == "complex"
is_data_request = (command.analysis.intent in ("request", "action")
and any(k in text.lower()
for k in ["daten", "data", "database", "db", "tabelle", "table",
"query", "abfrage", "untersuche", "investigate", "explore",
"analyse", "analyze", "umsatz", "revenue", "billing",
"abrechnung", "customer", "kunde", "geraete", "device",
"objekt", "object", "how many", "wieviele", "welche"]))
needs_planning = is_complex or (is_data_request and len(text.split()) > 8)
if needs_planning:
plan = await self.director.plan(self.history, self.memorizer.state, text)
if plan:
director_line = self.director.get_context_line()
mem_ctx = self.memorizer.get_context_block(sensor_lines=sensor_lines, ui_state=self.ui_node.state)
mem_ctx += f"\n\n{director_line}"
if machine_summary:
mem_ctx += f"\n\n{machine_summary}"
if dashboard is not None:
mem_ctx += f"\n\n{self._format_dashboard(dashboard)}"
thought = await self.thinker.process(command, self.history, memory_context=mem_ctx)
self.director.current_plan = ""
# Output (voice) and UI (screen) run in parallel
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
if not self.is_v2:
await self.director.update(self.history, self.memorizer.state)
if len(self.history) > self.MAX_HISTORY:
self.history = self.history[-self.MAX_HISTORY:]
def to_state(self) -> dict:
"""Serialize session state for DB storage."""
return {
"history": self.history,
"memorizer_state": self.memorizer.state,
"ui_state": {
"state": self.ui_node.state,
"bindings": self.ui_node.bindings,
"machines": self.ui_node.machines,
},
}
def restore_state(self, state: dict):
"""Restore session state from DB."""
self.history = state.get("history", [])
memo = state.get("memorizer_state")
if memo:
self.memorizer.state = memo
ui = state.get("ui_state", {})
if ui:
self.ui_node.state = ui.get("state", {})
self.ui_node.bindings = ui.get("bindings", {})
self.ui_node.machines = ui.get("machines", {})