This repository has been archived on 2026-04-03. You can view files and clone it, but cannot push or open issues or pull requests.
agent-runtime/agent/frame_engine.py
Nico f30da07636 Phase 3: Engine walks graph edges instead of hardcoded pipelines
Replace 4 hardcoded pipeline methods (_run_expert_pipeline,
_run_director_pipeline, _run_thinker_pipeline + dispatch logic) with
a single _walk_edges() method that follows declared data edges.

Key changes:
- _WalkCtx dataclass carries state through the edge walk
- _eval_condition() evaluates conditions against walk context
- _resolve_edge() picks active edge (conditional > unconditional)
- Dispatch adapters per node role (_dispatch_pa, _dispatch_expert, etc.)
- PA retry + progress wrapping stay as expert dispatch adapter logic
- process_message() and _handle_action() use _walk_edges()
- _run_reflex() kept as simple 2-frame shortcut

15/15 engine tests + 9/9 matrix tests green, identical traces.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 19:00:57 +02:00

789 lines
36 KiB
Python

"""Frame Engine: edge-walking deterministic pipeline execution.
Walks the graph's data edges to determine pipeline flow. Each step
dispatches a node and evaluates conditions on outgoing edges to pick
the next node. Frames advance on node completion (not on a timer).
Edge types:
data — typed objects flowing between nodes (walked by engine)
context — text injected into LLM prompts (aggregated via _build_context)
state — shared mutable state reads (consumed by sensor/runtime)
Works with any graph definition (v1-v4). Node implementations unchanged.
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field, asdict
from .types import Envelope, Command, InputAnalysis, ThoughtResult, DirectorPlan, PARouting
from .nodes.base import _current_hud
log = logging.getLogger("runtime")
# --- Frame Trace ---
@dataclass
class FrameRecord:
"""One frame's execution record."""
frame: int
node: str # which node ran ("input", "director", ...)
started: float = 0.0 # time.monotonic()
ended: float = 0.0
duration_ms: float = 0.0
input_summary: str = "" # what the node received
output_summary: str = "" # what the node produced
route: str = "" # where output goes next ("director", "output+ui", ...)
condition: str = "" # if a condition was evaluated ("reflex=True", ...)
error: str = "" # if the node failed
@dataclass
class FrameTrace:
"""Complete trace of one message through the pipeline."""
message: str = ""
graph: str = ""
total_frames: int = 0
total_ms: float = 0.0
started: float = 0.0
path: str = "" # "reflex", "director", "director+interpreter"
frames: list = field(default_factory=list) # list of FrameRecord
def to_dict(self) -> dict:
return {
"message": self.message[:100],
"graph": self.graph,
"total_frames": self.total_frames,
"total_ms": round(self.total_ms, 1),
"path": self.path,
"frames": [
{
"frame": f.frame,
"node": f.node,
"duration_ms": round(f.duration_ms, 1),
"input": f.input_summary[:200],
"output": f.output_summary[:200],
"route": f.route,
"condition": f.condition,
"error": f.error,
}
for f in self.frames
],
}
# --- Walk context: carries state through the edge walk ---
@dataclass
class _WalkCtx:
"""Mutable context passed through the edge walker."""
command: Command = None
routing: PARouting = None
plan: DirectorPlan = None
thought: ThoughtResult = None
mem_ctx: str = ""
dashboard: list = None
path_nodes: list = field(default_factory=list) # visited node names for path label
class FrameEngine:
"""Edge-walking engine that steps through graph nodes frame by frame."""
def __init__(self, graph: dict, nodes: dict, sink, history: list,
send_hud, sensor, memorizer, ui_node, identity: str = "unknown",
channel: str = "unknown", broadcast=None):
self.graph = graph
self.nodes = nodes
self.sink = sink
self.history = history
self._send_hud = send_hud
self.sensor = sensor
self.memorizer = memorizer
self.ui_node = ui_node
self.identity = identity
self.channel = channel
self._broadcast = broadcast or (lambda e: None)
self.frame = 0
self.bus = {}
self.conditions = graph.get("conditions", {})
self.data_edges = [e for e in graph.get("edges", []) if e.get("type") == "data"]
self.has_director = "director" in nodes and hasattr(nodes.get("director"), "decide")
self.has_interpreter = "interpreter" in nodes
self.has_pa = "pa" in nodes and hasattr(nodes.get("pa"), "route")
# Discover available experts in this graph
self._experts = {}
for role, node in nodes.items():
if role.startswith("expert_"):
expert_name = role[7:] # "expert_eras" → "eras"
self._experts[expert_name] = node
if self.has_pa and self._experts:
nodes["pa"].set_available_experts(list(self._experts.keys()))
log.info(f"[frame] PA with experts: {list(self._experts.keys())}")
# Frame trace — last message's complete trace, queryable via API
self.last_trace: FrameTrace = FrameTrace()
# History of recent traces (last 20 messages)
self.trace_history: list[dict] = []
self.MAX_TRACE_HISTORY = 20
# --- Frame lifecycle helpers ---
def _begin_frame(self, frame_num: int, node: str, input_summary: str = "") -> FrameRecord:
"""Start a new frame. Returns the record to fill in."""
self.frame = frame_num
rec = FrameRecord(
frame=frame_num,
node=node,
started=time.monotonic(),
input_summary=input_summary,
)
self.last_trace.frames.append(rec)
return rec
def _end_frame(self, rec: FrameRecord, output_summary: str = "",
route: str = "", condition: str = ""):
"""Complete a frame record with output and timing."""
rec.ended = time.monotonic()
rec.duration_ms = (rec.ended - rec.started) * 1000
rec.output_summary = output_summary
rec.route = route
rec.condition = condition
log.info(f"[frame] F{rec.frame} {rec.node} "
f"{rec.duration_ms:.0f}ms -> {route or 'done'}")
def _begin_trace(self, text: str) -> FrameTrace:
"""Start a new message trace."""
trace = FrameTrace(
message=text,
graph=self.graph.get("name", "unknown"),
started=time.monotonic(),
)
self.last_trace = trace
self.frame = 0
return trace
def _end_trace(self, path: str):
"""Finalize the trace and emit as HUD event."""
t = self.last_trace
t.total_frames = self.frame
t.total_ms = (time.monotonic() - t.started) * 1000
t.path = path
# Store in history
self.trace_history.append(t.to_dict())
if len(self.trace_history) > self.MAX_TRACE_HISTORY:
self.trace_history = self.trace_history[-self.MAX_TRACE_HISTORY:]
log.info(f"[frame] trace: {path} {t.total_frames}F {t.total_ms:.0f}ms")
async def _emit_trace_hud(self):
"""Emit the completed frame trace as a single HUD event."""
t = self.last_trace
await self._send_hud({
"node": "frame_engine",
"event": "frame_trace",
"trace": t.to_dict(),
})
# --- Condition evaluation ---
def _eval_condition(self, name: str, ctx: _WalkCtx) -> bool:
"""Evaluate a named condition against walk context."""
if name == "reflex":
return (ctx.command and ctx.command.analysis.intent == "social"
and ctx.command.analysis.complexity == "trivial")
if name == "has_tool_output":
return bool(ctx.thought and ctx.thought.tool_used and ctx.thought.tool_output)
# PA routing conditions
if name == "expert_is_none":
return ctx.routing is not None and ctx.routing.expert == "none"
if name.startswith("expert_is_"):
expert = name[len("expert_is_"):]
return ctx.routing is not None and ctx.routing.expert == expert
return False
def _check_condition(self, name: str, command: Command = None,
thought: ThoughtResult = None) -> bool:
"""Legacy wrapper for _eval_condition (used by tests)."""
ctx = _WalkCtx(command=command, thought=thought)
return self._eval_condition(name, ctx)
# --- Edge resolution ---
def _resolve_edge(self, outgoing: list, ctx: _WalkCtx) -> dict | None:
"""Pick the active edge from a node's outgoing data edges.
Conditional edges take priority when they match."""
conditional = [e for e in outgoing if e.get("condition")]
unconditional = [e for e in outgoing if not e.get("condition")]
for edge in conditional:
if self._eval_condition(edge["condition"], ctx):
return edge
return unconditional[0] if unconditional else None
# --- Node dispatch adapters ---
async def _dispatch_pa(self, ctx: _WalkCtx) -> str:
"""Dispatch PA node. Returns route summary."""
a = ctx.command.analysis
rec = self._begin_frame(self.frame + 1, "pa",
input_summary=f"intent={a.intent} topic={a.topic}")
routing = await self.nodes["pa"].route(
ctx.command, self.history, memory_context=ctx.mem_ctx,
identity=self.identity, channel=self.channel)
ctx.routing = routing
route_summary = f"expert={routing.expert} job={routing.job[:60]}"
self._end_frame(rec, output_summary=route_summary,
route=f"expert_{routing.expert}" if routing.expert != "none" else "output")
# Stream thinking message
if routing.thinking_message:
await self.sink.send_delta(routing.thinking_message + "\n\n")
return routing.expert
async def _dispatch_expert(self, expert_role: str, ctx: _WalkCtx):
"""Dispatch an expert node with progress wrapping and PA retry."""
expert = self._experts.get(expert_role.replace("expert_", ""))
if not expert:
expert_name = expert_role.replace("expert_", "")
log.error(f"[frame] expert '{expert_name}' not found")
ctx.thought = ThoughtResult(response=f"Expert '{expert_name}' not available.")
return
expert_name = expert_role.replace("expert_", "")
rec = self._begin_frame(self.frame + 1, expert_role,
input_summary=f"job: {ctx.routing.job[:80]}")
# Wrap expert HUD for progress streaming
original_hud = expert.send_hud
expert.send_hud = self._make_progress_wrapper(original_hud, ctx.routing.language)
try:
thought = await expert.execute(ctx.routing.job, ctx.routing.language)
finally:
expert.send_hud = original_hud
ctx.thought = thought
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"actions={len(thought.actions)} errors={len(thought.errors)}")
has_tool = bool(thought.tool_used and thought.tool_output)
# --- PA retry: expert failed or skipped tools ---
expectation = self.memorizer.state.get("user_expectation", "conversational")
job_needs_data = any(k in (ctx.routing.job or "").lower()
for k in ["query", "select", "tabelle", "table", "daten", "data",
"cost", "kosten", "count", "anzahl", "average", "schnitt",
"find", "finde", "show", "zeig", "list", "beschreib"])
expert_skipped_tools = not has_tool and not thought.errors and job_needs_data
if (thought.errors or expert_skipped_tools) and not has_tool and expectation in ("delegated", "waiting_input", "conversational"):
retry_reason = f"{len(thought.errors)} errors" if thought.errors else "no tool calls for data job"
self._end_frame(rec, output_summary=thought_summary,
route="pa_retry", condition=f"expert_failed ({retry_reason}), expectation={expectation}")
await self._send_hud({"node": "runtime", "event": "pa_retry",
"detail": f"expert failed: {retry_reason}, retrying via PA"})
retry_msg = "Anderer Ansatz..." if ctx.routing.language == "de" else "Trying a different approach..."
await self.sink.send_delta(retry_msg + "\n")
retry_errors = thought.errors if thought.errors else [
{"query": "(none)", "error": "Expert produced no database queries. The job requires data lookup but the expert answered without querying. Reformulate with explicit query instructions."}
]
error_summary = "; ".join(e.get("error", "")[:80] for e in retry_errors[-2:])
rec = self._begin_frame(self.frame + 1, "pa_retry",
input_summary=f"errors: {error_summary[:100]}")
routing2 = await self.nodes["pa"].route_retry(
ctx.command, self.history, memory_context=ctx.mem_ctx,
identity=self.identity, channel=self.channel,
original_job=ctx.routing.job, errors=retry_errors)
self._end_frame(rec, output_summary=f"retry_job: {(routing2.job or '')[:60]}",
route=f"expert_{routing2.expert}" if routing2.expert != "none" else "output")
if routing2.expert != "none":
expert2 = self._experts.get(routing2.expert, expert)
rec = self._begin_frame(self.frame + 1, f"expert_{routing2.expert}_retry",
input_summary=f"retry job: {(routing2.job or '')[:80]}")
original_hud2 = expert2.send_hud
expert2.send_hud = self._make_progress_wrapper(original_hud2, routing2.language)
try:
thought = await expert2.execute(routing2.job, routing2.language)
finally:
expert2.send_hud = original_hud2
ctx.thought = thought
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"errors={len(thought.errors)}")
has_tool = bool(thought.tool_used and thought.tool_output)
self._end_frame(rec, output_summary=thought_summary,
route="interpreter" if has_tool else "output+ui")
ctx.routing = routing2
return
# Normal completion (no retry)
# Don't end frame yet — caller checks interpreter condition and sets route
async def _dispatch_director(self, ctx: _WalkCtx):
"""Dispatch Director node."""
a = ctx.command.analysis
rec = self._begin_frame(self.frame + 1, "director",
input_summary=f"intent={a.intent} topic={a.topic}")
plan = await self.nodes["director"].decide(ctx.command, self.history, memory_context=ctx.mem_ctx)
ctx.plan = plan
plan_summary = f"goal={plan.goal} tools={len(plan.tool_sequence)} hint={plan.response_hint[:50]}"
self._end_frame(rec, output_summary=plan_summary, route="thinker")
async def _dispatch_thinker(self, ctx: _WalkCtx):
"""Dispatch Thinker node (v1 or v2)."""
a = ctx.command.analysis
rec = self._begin_frame(self.frame + 1, "thinker",
input_summary=f"intent={a.intent} topic={a.topic}" if not ctx.plan
else f"goal={ctx.plan.goal} tools={len(ctx.plan.tool_sequence)}")
# v1 hybrid: optional director pre-planning
director = self.nodes.get("director")
if director and hasattr(director, "plan") and not ctx.plan:
is_complex = ctx.command.analysis.complexity == "complex"
text = ctx.command.source_text
is_data_request = (ctx.command.analysis.intent in ("request", "action")
and any(k in text.lower()
for k in ["daten", "data", "database", "db", "tabelle", "table",
"query", "abfrage", "untersuche", "investigate",
"analyse", "analyze", "customer", "kunde"]))
if is_complex or (is_data_request and len(text.split()) > 8):
await director.plan(self.history, self.memorizer.state, text)
ctx.mem_ctx = self._build_context(ctx.dashboard)
if ctx.plan:
thought = await self.nodes["thinker"].process(
ctx.command, ctx.plan, self.history, memory_context=ctx.mem_ctx)
else:
thought = await self.nodes["thinker"].process(
ctx.command, self.history, memory_context=ctx.mem_ctx)
if director and hasattr(director, "current_plan"):
director.current_plan = ""
ctx.thought = thought
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"actions={len(thought.actions)}")
self._end_frame(rec, output_summary=thought_summary, route="output+ui")
async def _dispatch_interpreter(self, ctx: _WalkCtx):
"""Dispatch Interpreter node."""
rec = self._begin_frame(self.frame + 1, "interpreter",
input_summary=f"tool={ctx.thought.tool_used} output[{len(ctx.thought.tool_output)}]")
# Use routing.job for expert pipeline, source_text for director pipeline
job = ctx.routing.job if ctx.routing else ctx.command.source_text
interpreted = await self.nodes["interpreter"].interpret(
ctx.thought.tool_used, ctx.thought.tool_output, job)
ctx.thought.response = interpreted.summary
self._end_frame(rec, output_summary=f"summary[{len(interpreted.summary)}]", route="output+ui")
async def _finish_pipeline(self, ctx: _WalkCtx) -> dict:
"""Common tail: output+ui parallel, memorizer update, trace."""
# If no thought yet (pa_direct path), create from routing
if not ctx.thought and ctx.routing:
ctx.thought = ThoughtResult(response=ctx.routing.response_hint, actions=[])
rec = self._begin_frame(self.frame + 1, "output+ui",
input_summary=f"response: {(ctx.thought.response or '')[:80]}")
self.sink.reset()
response = await self._run_output_and_ui(ctx.thought, ctx.mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
# v1 director post-processing
director = self.nodes.get("director")
if director and hasattr(director, "update") and not self.has_pa:
await director.update(self.history, self.memorizer.state)
self._trim_history()
controls_count = len(self.ui_node.current_controls)
self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}")
# Build path label from visited nodes
path = self._build_path_label(ctx)
self._end_trace(path)
await self._emit_trace_hud()
return self._make_result(response)
def _build_path_label(self, ctx: _WalkCtx) -> str:
"""Build trace path label from visited nodes."""
nodes = ctx.path_nodes
if not nodes:
return "unknown"
# Map visited nodes to path labels
has_interpreter = "interpreter" in nodes
if any(n.startswith("expert_") for n in nodes):
return "expert+interpreter" if has_interpreter else "expert"
if "director" in nodes:
return "director+interpreter" if has_interpreter else "director"
if "thinker" in nodes:
return "thinker"
if "pa" in nodes and not any(n.startswith("expert_") for n in nodes):
return "pa_direct"
return "unknown"
# --- Edge walker ---
async def _walk_edges(self, ctx: _WalkCtx) -> dict:
"""Walk data edges from input node through the graph.
Returns the pipeline result dict."""
current = "input" # just finished frame 1
while True:
# Find outgoing data edges from current node
outgoing = [e for e in self.data_edges if e["from"] == current]
if not outgoing:
break
# Resolve which edge to follow
edge = self._resolve_edge(outgoing, ctx)
if not edge:
break
target = edge["to"]
# Parallel target [output, ui] or terminal output → finish
if isinstance(target, list) or target == "output" or target == "memorizer":
return await self._finish_pipeline(ctx)
# Dispatch the target node
ctx.path_nodes.append(target)
if target == "pa":
await self._dispatch_pa(ctx)
current = "pa"
elif target.startswith("expert_"):
await self._dispatch_expert(target, ctx)
# After expert, check interpreter condition
has_tool = bool(ctx.thought and ctx.thought.tool_used and ctx.thought.tool_output)
if self.has_interpreter and has_tool:
# End expert frame with interpreter route
last_rec = self.last_trace.frames[-1]
if not last_rec.route: # not already ended by retry
self._end_frame(last_rec,
output_summary=f"response[{len(ctx.thought.response)}] tool={ctx.thought.tool_used}",
route="interpreter", condition="has_tool_output=True")
ctx.path_nodes.append("interpreter")
await self._dispatch_interpreter(ctx)
else:
# End expert frame with output route
last_rec = self.last_trace.frames[-1]
if not last_rec.route:
thought_summary = (f"response[{len(ctx.thought.response)}] tool={ctx.thought.tool_used or 'none'}")
self._end_frame(last_rec, output_summary=thought_summary,
route="output+ui",
condition="has_tool_output=False" if not has_tool else "")
return await self._finish_pipeline(ctx)
elif target == "director":
await self._dispatch_director(ctx)
current = "director"
elif target == "thinker":
await self._dispatch_thinker(ctx)
# After thinker, check interpreter condition
has_tool = bool(ctx.thought and ctx.thought.tool_used and ctx.thought.tool_output)
if self.has_interpreter and has_tool:
last_rec = self.last_trace.frames[-1]
self._end_frame(last_rec,
output_summary=f"response[{len(ctx.thought.response)}] tool={ctx.thought.tool_used}",
route="interpreter", condition="has_tool_output=True")
ctx.path_nodes.append("interpreter")
await self._dispatch_interpreter(ctx)
return await self._finish_pipeline(ctx)
elif target == "interpreter":
ctx.path_nodes.append("interpreter")
await self._dispatch_interpreter(ctx)
return await self._finish_pipeline(ctx)
else:
log.warning(f"[frame] unknown target node: {target}")
break
return await self._finish_pipeline(ctx)
# --- Main entry point ---
async def process_message(self, text: str, dashboard: list = None,
model_overrides: dict = None) -> dict:
"""Process a message through the frame pipeline.
Returns {response, controls, memorizer, frames, trace}.
model_overrides: optional {role: model} to override node models for this request only.
"""
# Apply per-request model overrides (restored after processing)
saved_models = {}
if model_overrides:
for role, model in model_overrides.items():
node = self.nodes.get(role)
if node and hasattr(node, "model"):
saved_models[role] = node.model
node.model = model
# Set session-scoped HUD for shared nodes (contextvar, per-task)
_current_hud.set(self._send_hud)
try:
self._begin_trace(text)
# Handle ACTION: prefix
if text.startswith("ACTION:"):
return await self._handle_action(text, dashboard)
# Setup
envelope = Envelope(
text=text, user_id=self.identity,
session_id="test", timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
)
self.sensor.note_user_activity()
if dashboard is not None:
self.sensor.update_browser_dashboard(dashboard)
self.history.append({"role": "user", "content": text})
# --- Frame 1: Input ---
mem_ctx = self._build_context(dashboard)
rec = self._begin_frame(1, "input", input_summary=text[:100])
command = await self.nodes["input"].process(
envelope, self.history, memory_context=mem_ctx,
identity=self.identity, channel=self.channel)
a = command.analysis
cmd_summary = f"intent={a.intent} language={a.language} tone={a.tone} complexity={a.complexity}"
# Build walk context
ctx = _WalkCtx(command=command, mem_ctx=mem_ctx, dashboard=dashboard)
# Check reflex condition
is_reflex = self._eval_condition("reflex", ctx)
if is_reflex:
self._end_frame(rec, output_summary=cmd_summary,
route="output (reflex)", condition="reflex=True")
await self._send_hud({"node": "runtime", "event": "reflex_path",
"detail": f"{a.intent}/{a.complexity}"})
return await self._run_reflex(command, mem_ctx)
# Find next node from edges
outgoing = [e for e in self.data_edges if e["from"] == "input" and not e.get("condition")]
next_node = outgoing[0]["to"] if outgoing else "unknown"
self._end_frame(rec, output_summary=cmd_summary,
route=next_node, condition="reflex=False")
# Walk remaining edges
return await self._walk_edges(ctx)
finally:
# Restore original models after per-request overrides
for role, original_model in saved_models.items():
node = self.nodes.get(role)
if node:
node.model = original_model
# --- Reflex (kept simple — 2 frames, no edge walking needed) ---
async def _run_reflex(self, command: Command, mem_ctx: str) -> dict:
"""Reflex: Input(F1) → Output(F2)."""
rec = self._begin_frame(2, "output", input_summary="reflex passthrough")
thought = ThoughtResult(response=command.source_text, actions=[])
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
self._trim_history()
self._end_frame(rec, output_summary=f"response[{len(response)}]")
self._end_trace("reflex")
await self._emit_trace_hud()
return self._make_result(response)
# --- Action handling ---
async def _handle_action(self, text: str, dashboard: list = None) -> dict:
"""Handle ACTION: messages (button clicks)."""
parts = text.split("|", 1)
action = parts[0].replace("ACTION:", "").strip()
data = None
if len(parts) > 1:
try:
data = json.loads(parts[1].replace("data:", "").strip())
except (json.JSONDecodeError, Exception):
pass
self.sensor.note_user_activity()
# Frame 1: Try machine transition (no LLM)
rec = self._begin_frame(1, "ui", input_summary=f"action={action}")
handled, transition_result = self.ui_node.try_machine_transition(action)
if handled:
await self._send_hud({"node": "ui", "event": "machine_transition",
"action": action, "detail": transition_result})
controls = self.ui_node.get_machine_controls()
for ctrl in self.ui_node.current_controls:
if not ctrl.get("machine_id"):
controls.append(ctrl)
self.ui_node.current_controls = controls
await self.sink.send_controls(controls)
await self._send_hud({"node": "ui", "event": "controls", "controls": controls})
self.sink.reset()
for i in range(0, len(transition_result), 12):
await self.sink.send_delta(transition_result[i:i + 12])
await self.sink.send_done()
self.history.append({"role": "user", "content": f"[clicked {action}]"})
self.history.append({"role": "assistant", "content": transition_result})
self._end_frame(rec, output_summary=f"machine_transition: {transition_result[:80]}")
self._end_trace("action_machine")
await self._emit_trace_hud()
return self._make_result(transition_result)
# Try local UI action
result, controls = await self.ui_node.process_local_action(action, data)
if result is not None:
if controls:
await self.sink.send_controls(controls)
self.sink.reset()
for i in range(0, len(result), 12):
await self.sink.send_delta(result[i:i + 12])
await self.sink.send_done()
self.history.append({"role": "user", "content": f"[clicked {action}]"})
self.history.append({"role": "assistant", "content": result})
self._end_frame(rec, output_summary=f"local_action: {result[:80]}")
self._end_trace("action_local")
await self._emit_trace_hud()
return self._make_result(result)
# Complex action — needs full pipeline via edge walking
self._end_frame(rec, output_summary="no local handler", route="edge_walk")
action_desc = f"ACTION: {action}"
if data:
action_desc += f" | data: {json.dumps(data)}"
self.history.append({"role": "user", "content": action_desc})
mem_ctx = self._build_context(dashboard)
command = Command(
analysis=InputAnalysis(intent="action", topic=action, complexity="simple"),
source_text=action_desc)
ctx = _WalkCtx(command=command, mem_ctx=mem_ctx, dashboard=dashboard)
return await self._walk_edges(ctx)
# --- Helpers ---
def _build_context(self, dashboard: list = None) -> str:
"""Build the full context string for nodes."""
sensor_lines = self.sensor.get_context_lines()
director = self.nodes.get("director")
director_line = director.get_context_line() if director else ""
mem_ctx = self.memorizer.get_context_block(
sensor_lines=sensor_lines, ui_state=self.ui_node.state)
if director_line:
mem_ctx += f"\n\n{director_line}"
machine_summary = self.ui_node.get_machine_summary()
if machine_summary:
mem_ctx += f"\n\n{machine_summary}"
if dashboard is not None:
mem_ctx += f"\n\n{self._format_dashboard(dashboard)}"
sensor_flags = self.sensor.consume_flags()
if sensor_flags:
flag_lines = ["Sensor flags:"]
for f in sensor_flags:
if f["type"] == "idle_return":
flag_lines.append(f" - User returned after {f['away_duration']} away.")
elif f["type"] == "workspace_mismatch":
flag_lines.append(f" - Workspace mismatch: {f['detail']}")
mem_ctx += "\n\n" + "\n".join(flag_lines)
return mem_ctx
def _format_dashboard(self, dashboard: list) -> str:
"""Format dashboard controls into context string."""
server_controls = self.ui_node.current_controls
server_buttons = [str(c.get("label", "")) for c in server_controls if isinstance(c, dict) and c.get("type") == "button"]
browser_buttons = [str(c.get("label", "")) for c in dashboard if isinstance(c, dict) and c.get("type") == "button"] if dashboard else []
lines = []
if server_buttons and not browser_buttons:
lines.append(f"WARNING: Server sent {len(server_buttons)} controls but dashboard shows NONE.")
lines.append(f" Expected: {', '.join(server_buttons)}")
lines.append(" Controls failed to render. You MUST re-emit them in ACTIONS.")
elif server_buttons and sorted(server_buttons) != sorted(browser_buttons):
lines.append("WARNING: Dashboard mismatch.")
lines.append(f" Server: {', '.join(server_buttons)}")
lines.append(f" Browser: {', '.join(browser_buttons) or 'nothing'}")
if not dashboard:
lines.append("Dashboard: empty")
else:
lines.append("Dashboard (user sees):")
for ctrl in dashboard:
ctype = ctrl.get("type", "unknown")
if ctype == "button":
lines.append(f" - Button: {ctrl.get('label', '?')}")
elif ctype == "table":
lines.append(f" - Table: {len(ctrl.get('data', []))} rows")
else:
lines.append(f" - {ctype}: {ctrl.get('label', ctrl.get('text', '?'))}")
return "\n".join(lines)
def _make_progress_wrapper(self, original_hud, language: str):
"""Wrap an expert's send_hud to also stream progress deltas to the user."""
sink = self.sink
progress_map = {
"tool_call": {"query_db": "Daten werden abgerufen..." if language == "de" else "Fetching data...",
"emit_actions": "UI wird erstellt..." if language == "de" else "Building UI...",
"create_machine": "Maschine wird erstellt..." if language == "de" else "Creating machine...",
"_default": "Verarbeite..." if language == "de" else "Processing..."},
"tool_result": {"_default": ""}, # silent on result
"planned": {"_default": "Plan erstellt..." if language == "de" else "Plan ready..."},
}
async def wrapper(data: dict):
await original_hud(data)
event = data.get("event", "")
if event in progress_map:
tool = data.get("tool", "_default")
msg = progress_map[event].get(tool, progress_map[event].get("_default", ""))
if msg:
await sink.send_delta(msg + "\n")
return wrapper
async def _run_output_and_ui(self, thought: ThoughtResult, mem_ctx: str) -> str:
"""Run Output and UI nodes in parallel. Returns response text."""
self.sink.reset()
output_task = asyncio.create_task(
self.nodes["output"].process(thought, self.history, self.sink, memory_context=mem_ctx))
ui_task = asyncio.create_task(
self.ui_node.process(thought, self.history, memory_context=mem_ctx))
response, controls = await asyncio.gather(output_task, ui_task)
if controls:
await self.sink.send_controls(controls)
# Send artifacts (new system) alongside controls
artifacts = self.ui_node.get_artifacts()
if artifacts:
await self.sink.send_artifacts(artifacts)
return response
def _make_result(self, response: str) -> dict:
"""Build the result dict returned to callers."""
return {
"response": response,
"controls": self.ui_node.current_controls,
"artifacts": self.ui_node.get_artifacts(),
"memorizer": self.memorizer.state,
"frames": self.frame,
"trace": self.last_trace.to_dict(),
}
def _trim_history(self):
if len(self.history) > 40:
self.history[:] = self.history[-40:]