Phase 3: Engine walks graph edges instead of hardcoded pipelines

Replace 4 hardcoded pipeline methods (_run_expert_pipeline,
_run_director_pipeline, _run_thinker_pipeline + dispatch logic) with
a single _walk_edges() method that follows declared data edges.

Key changes:
- _WalkCtx dataclass carries state through the edge walk
- _eval_condition() evaluates conditions against walk context
- _resolve_edge() picks active edge (conditional > unconditional)
- Dispatch adapters per node role (_dispatch_pa, _dispatch_expert, etc.)
- PA retry + progress wrapping stay as expert dispatch adapter logic
- process_message() and _handle_action() use _walk_edges()
- _run_reflex() kept as simple 2-frame shortcut

15/15 engine tests + 9/9 matrix tests green, identical traces.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nico 2026-04-03 19:00:57 +02:00
parent 376fdb2458
commit f30da07636

View File

@ -1,12 +1,15 @@
"""Frame Engine: tick-based deterministic pipeline execution.
"""Frame Engine: edge-walking deterministic pipeline execution.
Replaces the imperative handle_message() with a frame-stepping model:
- Each frame dispatches all nodes that have pending input
- Frames advance on completion (not on a timer)
- 0ms when idle, engine awaits external input
- Deterministic ordering: reflex=2 frames, thinker=3-4, interpreter=5
Walks the graph's data edges to determine pipeline flow. Each step
dispatches a node and evaluates conditions on outgoing edges to pick
the next node. Frames advance on node completion (not on a timer).
Works with any graph definition (v1, v2, v3). Node implementations unchanged.
Edge types:
data typed objects flowing between nodes (walked by engine)
context text injected into LLM prompts (aggregated via _build_context)
state shared mutable state reads (consumed by sensor/runtime)
Works with any graph definition (v1-v4). Node implementations unchanged.
"""
import asyncio
@ -72,8 +75,22 @@ class FrameTrace:
}
# --- Walk context: carries state through the edge walk ---
@dataclass
class _WalkCtx:
"""Mutable context passed through the edge walker."""
command: Command = None
routing: PARouting = None
plan: DirectorPlan = None
thought: ThoughtResult = None
mem_ctx: str = ""
dashboard: list = None
path_nodes: list = field(default_factory=list) # visited node names for path label
class FrameEngine:
"""Tick-based engine that steps through graph nodes frame by frame."""
"""Edge-walking engine that steps through graph nodes frame by frame."""
def __init__(self, graph: dict, nodes: dict, sink, history: list,
send_hud, sensor, memorizer, ui_node, identity: str = "unknown",
@ -93,7 +110,7 @@ class FrameEngine:
self.frame = 0
self.bus = {}
self.conditions = graph.get("conditions", {})
self.edges = [e for e in graph.get("edges", []) if e.get("type") == "data"]
self.data_edges = [e for e in graph.get("edges", []) if e.get("type") == "data"]
self.has_director = "director" in nodes and hasattr(nodes.get("director"), "decide")
self.has_interpreter = "interpreter" in nodes
@ -172,6 +189,328 @@ class FrameEngine:
"trace": t.to_dict(),
})
# --- Condition evaluation ---
def _eval_condition(self, name: str, ctx: _WalkCtx) -> bool:
"""Evaluate a named condition against walk context."""
if name == "reflex":
return (ctx.command and ctx.command.analysis.intent == "social"
and ctx.command.analysis.complexity == "trivial")
if name == "has_tool_output":
return bool(ctx.thought and ctx.thought.tool_used and ctx.thought.tool_output)
# PA routing conditions
if name == "expert_is_none":
return ctx.routing is not None and ctx.routing.expert == "none"
if name.startswith("expert_is_"):
expert = name[len("expert_is_"):]
return ctx.routing is not None and ctx.routing.expert == expert
return False
def _check_condition(self, name: str, command: Command = None,
thought: ThoughtResult = None) -> bool:
"""Legacy wrapper for _eval_condition (used by tests)."""
ctx = _WalkCtx(command=command, thought=thought)
return self._eval_condition(name, ctx)
# --- Edge resolution ---
def _resolve_edge(self, outgoing: list, ctx: _WalkCtx) -> dict | None:
"""Pick the active edge from a node's outgoing data edges.
Conditional edges take priority when they match."""
conditional = [e for e in outgoing if e.get("condition")]
unconditional = [e for e in outgoing if not e.get("condition")]
for edge in conditional:
if self._eval_condition(edge["condition"], ctx):
return edge
return unconditional[0] if unconditional else None
# --- Node dispatch adapters ---
async def _dispatch_pa(self, ctx: _WalkCtx) -> str:
"""Dispatch PA node. Returns route summary."""
a = ctx.command.analysis
rec = self._begin_frame(self.frame + 1, "pa",
input_summary=f"intent={a.intent} topic={a.topic}")
routing = await self.nodes["pa"].route(
ctx.command, self.history, memory_context=ctx.mem_ctx,
identity=self.identity, channel=self.channel)
ctx.routing = routing
route_summary = f"expert={routing.expert} job={routing.job[:60]}"
self._end_frame(rec, output_summary=route_summary,
route=f"expert_{routing.expert}" if routing.expert != "none" else "output")
# Stream thinking message
if routing.thinking_message:
await self.sink.send_delta(routing.thinking_message + "\n\n")
return routing.expert
async def _dispatch_expert(self, expert_role: str, ctx: _WalkCtx):
"""Dispatch an expert node with progress wrapping and PA retry."""
expert = self._experts.get(expert_role.replace("expert_", ""))
if not expert:
expert_name = expert_role.replace("expert_", "")
log.error(f"[frame] expert '{expert_name}' not found")
ctx.thought = ThoughtResult(response=f"Expert '{expert_name}' not available.")
return
expert_name = expert_role.replace("expert_", "")
rec = self._begin_frame(self.frame + 1, expert_role,
input_summary=f"job: {ctx.routing.job[:80]}")
# Wrap expert HUD for progress streaming
original_hud = expert.send_hud
expert.send_hud = self._make_progress_wrapper(original_hud, ctx.routing.language)
try:
thought = await expert.execute(ctx.routing.job, ctx.routing.language)
finally:
expert.send_hud = original_hud
ctx.thought = thought
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"actions={len(thought.actions)} errors={len(thought.errors)}")
has_tool = bool(thought.tool_used and thought.tool_output)
# --- PA retry: expert failed or skipped tools ---
expectation = self.memorizer.state.get("user_expectation", "conversational")
job_needs_data = any(k in (ctx.routing.job or "").lower()
for k in ["query", "select", "tabelle", "table", "daten", "data",
"cost", "kosten", "count", "anzahl", "average", "schnitt",
"find", "finde", "show", "zeig", "list", "beschreib"])
expert_skipped_tools = not has_tool and not thought.errors and job_needs_data
if (thought.errors or expert_skipped_tools) and not has_tool and expectation in ("delegated", "waiting_input", "conversational"):
retry_reason = f"{len(thought.errors)} errors" if thought.errors else "no tool calls for data job"
self._end_frame(rec, output_summary=thought_summary,
route="pa_retry", condition=f"expert_failed ({retry_reason}), expectation={expectation}")
await self._send_hud({"node": "runtime", "event": "pa_retry",
"detail": f"expert failed: {retry_reason}, retrying via PA"})
retry_msg = "Anderer Ansatz..." if ctx.routing.language == "de" else "Trying a different approach..."
await self.sink.send_delta(retry_msg + "\n")
retry_errors = thought.errors if thought.errors else [
{"query": "(none)", "error": "Expert produced no database queries. The job requires data lookup but the expert answered without querying. Reformulate with explicit query instructions."}
]
error_summary = "; ".join(e.get("error", "")[:80] for e in retry_errors[-2:])
rec = self._begin_frame(self.frame + 1, "pa_retry",
input_summary=f"errors: {error_summary[:100]}")
routing2 = await self.nodes["pa"].route_retry(
ctx.command, self.history, memory_context=ctx.mem_ctx,
identity=self.identity, channel=self.channel,
original_job=ctx.routing.job, errors=retry_errors)
self._end_frame(rec, output_summary=f"retry_job: {(routing2.job or '')[:60]}",
route=f"expert_{routing2.expert}" if routing2.expert != "none" else "output")
if routing2.expert != "none":
expert2 = self._experts.get(routing2.expert, expert)
rec = self._begin_frame(self.frame + 1, f"expert_{routing2.expert}_retry",
input_summary=f"retry job: {(routing2.job or '')[:80]}")
original_hud2 = expert2.send_hud
expert2.send_hud = self._make_progress_wrapper(original_hud2, routing2.language)
try:
thought = await expert2.execute(routing2.job, routing2.language)
finally:
expert2.send_hud = original_hud2
ctx.thought = thought
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"errors={len(thought.errors)}")
has_tool = bool(thought.tool_used and thought.tool_output)
self._end_frame(rec, output_summary=thought_summary,
route="interpreter" if has_tool else "output+ui")
ctx.routing = routing2
return
# Normal completion (no retry)
# Don't end frame yet — caller checks interpreter condition and sets route
async def _dispatch_director(self, ctx: _WalkCtx):
"""Dispatch Director node."""
a = ctx.command.analysis
rec = self._begin_frame(self.frame + 1, "director",
input_summary=f"intent={a.intent} topic={a.topic}")
plan = await self.nodes["director"].decide(ctx.command, self.history, memory_context=ctx.mem_ctx)
ctx.plan = plan
plan_summary = f"goal={plan.goal} tools={len(plan.tool_sequence)} hint={plan.response_hint[:50]}"
self._end_frame(rec, output_summary=plan_summary, route="thinker")
async def _dispatch_thinker(self, ctx: _WalkCtx):
"""Dispatch Thinker node (v1 or v2)."""
a = ctx.command.analysis
rec = self._begin_frame(self.frame + 1, "thinker",
input_summary=f"intent={a.intent} topic={a.topic}" if not ctx.plan
else f"goal={ctx.plan.goal} tools={len(ctx.plan.tool_sequence)}")
# v1 hybrid: optional director pre-planning
director = self.nodes.get("director")
if director and hasattr(director, "plan") and not ctx.plan:
is_complex = ctx.command.analysis.complexity == "complex"
text = ctx.command.source_text
is_data_request = (ctx.command.analysis.intent in ("request", "action")
and any(k in text.lower()
for k in ["daten", "data", "database", "db", "tabelle", "table",
"query", "abfrage", "untersuche", "investigate",
"analyse", "analyze", "customer", "kunde"]))
if is_complex or (is_data_request and len(text.split()) > 8):
await director.plan(self.history, self.memorizer.state, text)
ctx.mem_ctx = self._build_context(ctx.dashboard)
if ctx.plan:
thought = await self.nodes["thinker"].process(
ctx.command, ctx.plan, self.history, memory_context=ctx.mem_ctx)
else:
thought = await self.nodes["thinker"].process(
ctx.command, self.history, memory_context=ctx.mem_ctx)
if director and hasattr(director, "current_plan"):
director.current_plan = ""
ctx.thought = thought
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"actions={len(thought.actions)}")
self._end_frame(rec, output_summary=thought_summary, route="output+ui")
async def _dispatch_interpreter(self, ctx: _WalkCtx):
"""Dispatch Interpreter node."""
rec = self._begin_frame(self.frame + 1, "interpreter",
input_summary=f"tool={ctx.thought.tool_used} output[{len(ctx.thought.tool_output)}]")
# Use routing.job for expert pipeline, source_text for director pipeline
job = ctx.routing.job if ctx.routing else ctx.command.source_text
interpreted = await self.nodes["interpreter"].interpret(
ctx.thought.tool_used, ctx.thought.tool_output, job)
ctx.thought.response = interpreted.summary
self._end_frame(rec, output_summary=f"summary[{len(interpreted.summary)}]", route="output+ui")
async def _finish_pipeline(self, ctx: _WalkCtx) -> dict:
"""Common tail: output+ui parallel, memorizer update, trace."""
# If no thought yet (pa_direct path), create from routing
if not ctx.thought and ctx.routing:
ctx.thought = ThoughtResult(response=ctx.routing.response_hint, actions=[])
rec = self._begin_frame(self.frame + 1, "output+ui",
input_summary=f"response: {(ctx.thought.response or '')[:80]}")
self.sink.reset()
response = await self._run_output_and_ui(ctx.thought, ctx.mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
# v1 director post-processing
director = self.nodes.get("director")
if director and hasattr(director, "update") and not self.has_pa:
await director.update(self.history, self.memorizer.state)
self._trim_history()
controls_count = len(self.ui_node.current_controls)
self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}")
# Build path label from visited nodes
path = self._build_path_label(ctx)
self._end_trace(path)
await self._emit_trace_hud()
return self._make_result(response)
def _build_path_label(self, ctx: _WalkCtx) -> str:
"""Build trace path label from visited nodes."""
nodes = ctx.path_nodes
if not nodes:
return "unknown"
# Map visited nodes to path labels
has_interpreter = "interpreter" in nodes
if any(n.startswith("expert_") for n in nodes):
return "expert+interpreter" if has_interpreter else "expert"
if "director" in nodes:
return "director+interpreter" if has_interpreter else "director"
if "thinker" in nodes:
return "thinker"
if "pa" in nodes and not any(n.startswith("expert_") for n in nodes):
return "pa_direct"
return "unknown"
# --- Edge walker ---
async def _walk_edges(self, ctx: _WalkCtx) -> dict:
"""Walk data edges from input node through the graph.
Returns the pipeline result dict."""
current = "input" # just finished frame 1
while True:
# Find outgoing data edges from current node
outgoing = [e for e in self.data_edges if e["from"] == current]
if not outgoing:
break
# Resolve which edge to follow
edge = self._resolve_edge(outgoing, ctx)
if not edge:
break
target = edge["to"]
# Parallel target [output, ui] or terminal output → finish
if isinstance(target, list) or target == "output" or target == "memorizer":
return await self._finish_pipeline(ctx)
# Dispatch the target node
ctx.path_nodes.append(target)
if target == "pa":
await self._dispatch_pa(ctx)
current = "pa"
elif target.startswith("expert_"):
await self._dispatch_expert(target, ctx)
# After expert, check interpreter condition
has_tool = bool(ctx.thought and ctx.thought.tool_used and ctx.thought.tool_output)
if self.has_interpreter and has_tool:
# End expert frame with interpreter route
last_rec = self.last_trace.frames[-1]
if not last_rec.route: # not already ended by retry
self._end_frame(last_rec,
output_summary=f"response[{len(ctx.thought.response)}] tool={ctx.thought.tool_used}",
route="interpreter", condition="has_tool_output=True")
ctx.path_nodes.append("interpreter")
await self._dispatch_interpreter(ctx)
else:
# End expert frame with output route
last_rec = self.last_trace.frames[-1]
if not last_rec.route:
thought_summary = (f"response[{len(ctx.thought.response)}] tool={ctx.thought.tool_used or 'none'}")
self._end_frame(last_rec, output_summary=thought_summary,
route="output+ui",
condition="has_tool_output=False" if not has_tool else "")
return await self._finish_pipeline(ctx)
elif target == "director":
await self._dispatch_director(ctx)
current = "director"
elif target == "thinker":
await self._dispatch_thinker(ctx)
# After thinker, check interpreter condition
has_tool = bool(ctx.thought and ctx.thought.tool_used and ctx.thought.tool_output)
if self.has_interpreter and has_tool:
last_rec = self.last_trace.frames[-1]
self._end_frame(last_rec,
output_summary=f"response[{len(ctx.thought.response)}] tool={ctx.thought.tool_used}",
route="interpreter", condition="has_tool_output=True")
ctx.path_nodes.append("interpreter")
await self._dispatch_interpreter(ctx)
return await self._finish_pipeline(ctx)
elif target == "interpreter":
ctx.path_nodes.append("interpreter")
await self._dispatch_interpreter(ctx)
return await self._finish_pipeline(ctx)
else:
log.warning(f"[frame] unknown target node: {target}")
break
return await self._finish_pipeline(ctx)
# --- Main entry point ---
async def process_message(self, text: str, dashboard: list = None,
@ -221,26 +560,26 @@ class FrameEngine:
a = command.analysis
cmd_summary = f"intent={a.intent} language={a.language} tone={a.tone} complexity={a.complexity}"
# Build walk context
ctx = _WalkCtx(command=command, mem_ctx=mem_ctx, dashboard=dashboard)
# Check reflex condition
is_reflex = self._check_condition("reflex", command=command)
is_reflex = self._eval_condition("reflex", ctx)
if is_reflex:
self._end_frame(rec, output_summary=cmd_summary,
route="output (reflex)", condition="reflex=True")
await self._send_hud({"node": "runtime", "event": "reflex_path",
"detail": f"{a.intent}/{a.complexity}"})
return await self._run_reflex(command, mem_ctx)
else:
next_node = "pa" if self.has_pa else ("director" if self.has_director else "thinker")
self._end_frame(rec, output_summary=cmd_summary,
route=next_node, condition=f"reflex=False")
# --- Frame 2+: Pipeline ---
if self.has_pa:
return await self._run_expert_pipeline(command, mem_ctx, dashboard)
elif self.has_director:
return await self._run_director_pipeline(command, mem_ctx, dashboard)
else:
return await self._run_thinker_pipeline(command, mem_ctx, dashboard)
# Find next node from edges
outgoing = [e for e in self.data_edges if e["from"] == "input" and not e.get("condition")]
next_node = outgoing[0]["to"] if outgoing else "unknown"
self._end_frame(rec, output_summary=cmd_summary,
route=next_node, condition="reflex=False")
# Walk remaining edges
return await self._walk_edges(ctx)
finally:
# Restore original models after per-request overrides
for role, original_model in saved_models.items():
@ -248,7 +587,7 @@ class FrameEngine:
if node:
node.model = original_model
# --- Pipeline variants ---
# --- Reflex (kept simple — 2 frames, no edge walking needed) ---
async def _run_reflex(self, command: Command, mem_ctx: str) -> dict:
"""Reflex: Input(F1) → Output(F2)."""
@ -266,279 +605,7 @@ class FrameEngine:
await self._emit_trace_hud()
return self._make_result(response)
async def _run_expert_pipeline(self, command: Command, mem_ctx: str,
dashboard: list = None) -> dict:
"""Expert pipeline: Input(F1) → PA(F2) → Expert(F3) → [Interpreter(F4)] → Output."""
a = command.analysis
# Frame 2: PA routes
rec = self._begin_frame(2, "pa",
input_summary=f"intent={a.intent} topic={a.topic}")
routing = await self.nodes["pa"].route(
command, self.history, memory_context=mem_ctx,
identity=self.identity, channel=self.channel)
route_summary = f"expert={routing.expert} job={routing.job[:60]}"
self._end_frame(rec, output_summary=route_summary,
route=f"expert_{routing.expert}" if routing.expert != "none" else "output")
# Stream thinking message to user
if routing.thinking_message:
await self.sink.send_delta(routing.thinking_message + "\n\n")
# Direct PA response (no expert needed)
if routing.expert == "none":
rec = self._begin_frame(3, "output+ui",
input_summary=f"pa_direct: {routing.response_hint[:80]}")
thought = ThoughtResult(response=routing.response_hint, actions=[])
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
self._trim_history()
self._end_frame(rec, output_summary=f"response[{len(response)}]")
self._end_trace("pa_direct")
await self._emit_trace_hud()
return self._make_result(response)
# Frame 3: Expert executes
expert = self._experts.get(routing.expert)
if not expert:
log.error(f"[frame] expert '{routing.expert}' not found")
thought = ThoughtResult(response=f"Expert '{routing.expert}' not available.")
rec = self._begin_frame(3, "output+ui", input_summary="expert_not_found")
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
self._end_frame(rec, output_summary="error", error=f"expert '{routing.expert}' not found")
self._end_trace("expert_error")
await self._emit_trace_hud()
return self._make_result(response)
rec = self._begin_frame(3, f"expert_{routing.expert}",
input_summary=f"job: {routing.job[:80]}")
# Wrap expert's HUD to stream progress to user
original_hud = expert.send_hud
expert.send_hud = self._make_progress_wrapper(original_hud, routing.language)
try:
thought = await expert.execute(routing.job, routing.language)
finally:
expert.send_hud = original_hud
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"actions={len(thought.actions)} errors={len(thought.errors)}")
has_tool = bool(thought.tool_used and thought.tool_output)
# PA retry: if expert failed OR skipped tools when data was needed
expectation = self.memorizer.state.get("user_expectation", "conversational")
# Detect hallucination: expert returned no tool output for a data job
job_needs_data = any(k in (routing.job or "").lower()
for k in ["query", "select", "tabelle", "table", "daten", "data",
"cost", "kosten", "count", "anzahl", "average", "schnitt",
"find", "finde", "show", "zeig", "list", "beschreib"])
expert_skipped_tools = not has_tool and not thought.errors and job_needs_data
if (thought.errors or expert_skipped_tools) and not has_tool and expectation in ("delegated", "waiting_input", "conversational"):
retry_reason = f"{len(thought.errors)} errors" if thought.errors else "no tool calls for data job"
self._end_frame(rec, output_summary=thought_summary,
route="pa_retry", condition=f"expert_failed ({retry_reason}), expectation={expectation}")
await self._send_hud({"node": "runtime", "event": "pa_retry",
"detail": f"expert failed: {retry_reason}, retrying via PA"})
# Stream retry notice to user
retry_msg = "Anderer Ansatz..." if routing.language == "de" else "Trying a different approach..."
await self.sink.send_delta(retry_msg + "\n")
# PA reformulates with error context
retry_errors = thought.errors if thought.errors else [
{"query": "(none)", "error": "Expert produced no database queries. The job requires data lookup but the expert answered without querying. Reformulate with explicit query instructions."}
]
error_summary = "; ".join(e.get("error", "")[:80] for e in retry_errors[-2:])
rec = self._begin_frame(self.frame + 1, "pa_retry",
input_summary=f"errors: {error_summary[:100]}")
routing2 = await self.nodes["pa"].route_retry(
command, self.history, memory_context=mem_ctx,
identity=self.identity, channel=self.channel,
original_job=routing.job, errors=retry_errors)
self._end_frame(rec, output_summary=f"retry_job: {(routing2.job or '')[:60]}",
route=f"expert_{routing2.expert}" if routing2.expert != "none" else "output")
if routing2.expert != "none":
expert2 = self._experts.get(routing2.expert, expert)
rec = self._begin_frame(self.frame + 1, f"expert_{routing2.expert}_retry",
input_summary=f"retry job: {(routing2.job or '')[:80]}")
original_hud2 = expert2.send_hud
expert2.send_hud = self._make_progress_wrapper(original_hud2, routing2.language)
try:
thought = await expert2.execute(routing2.job, routing2.language)
finally:
expert2.send_hud = original_hud2
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"errors={len(thought.errors)}")
has_tool = bool(thought.tool_used and thought.tool_output)
self._end_frame(rec, output_summary=thought_summary,
route="interpreter" if has_tool else "output+ui")
routing = routing2 # use retry routing for rest of pipeline
# Interpreter (conditional)
if self.has_interpreter and has_tool:
self._end_frame(rec, output_summary=thought_summary,
route="interpreter", condition="has_tool_output=True")
rec = self._begin_frame(4, "interpreter",
input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]")
interpreted = await self.nodes["interpreter"].interpret(
thought.tool_used, thought.tool_output, routing.job)
thought.response = interpreted.summary
self._end_frame(rec, output_summary=f"summary[{len(interpreted.summary)}]", route="output+ui")
rec = self._begin_frame(5, "output+ui",
input_summary=f"interpreted: {interpreted.summary[:80]}")
path = "expert+interpreter"
else:
self._end_frame(rec, output_summary=thought_summary,
route="output+ui",
condition="has_tool_output=False" if not has_tool else "")
rec = self._begin_frame(4, "output+ui",
input_summary=f"response: {thought.response[:80]}")
path = "expert"
# Clear progress text, render final response
self.sink.reset()
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
self._trim_history()
controls_count = len(self.ui_node.current_controls)
self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}")
self._end_trace(path)
await self._emit_trace_hud()
return self._make_result(response)
def _make_progress_wrapper(self, original_hud, language: str):
"""Wrap an expert's send_hud to also stream progress deltas to the user."""
sink = self.sink
progress_map = {
"tool_call": {"query_db": "Daten werden abgerufen..." if language == "de" else "Fetching data...",
"emit_actions": "UI wird erstellt..." if language == "de" else "Building UI...",
"create_machine": "Maschine wird erstellt..." if language == "de" else "Creating machine...",
"_default": "Verarbeite..." if language == "de" else "Processing..."},
"tool_result": {"_default": ""}, # silent on result
"planned": {"_default": "Plan erstellt..." if language == "de" else "Plan ready..."},
}
async def wrapper(data: dict):
await original_hud(data)
event = data.get("event", "")
if event in progress_map:
tool = data.get("tool", "_default")
msg = progress_map[event].get(tool, progress_map[event].get("_default", ""))
if msg:
await sink.send_delta(msg + "\n")
return wrapper
async def _run_director_pipeline(self, command: Command, mem_ctx: str,
dashboard: list = None) -> dict:
"""Director: Input(F1) → Director(F2) → Thinker(F3) → [Interpreter(F4)] → Output."""
a = command.analysis
# Frame 2: Director
rec = self._begin_frame(2, "director",
input_summary=f"intent={a.intent} topic={a.topic}")
plan = await self.nodes["director"].decide(command, self.history, memory_context=mem_ctx)
plan_summary = f"goal={plan.goal} tools={len(plan.tool_sequence)} hint={plan.response_hint[:50]}"
self._end_frame(rec, output_summary=plan_summary, route="thinker")
# Frame 3: Thinker
rec = self._begin_frame(3, "thinker",
input_summary=plan_summary[:100])
thought = await self.nodes["thinker"].process(
command, plan, self.history, memory_context=mem_ctx)
thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} "
f"actions={len(thought.actions)} machines={len(thought.machine_ops)}")
has_tool = bool(thought.tool_used and thought.tool_output)
# Check interpreter condition
if self.has_interpreter and has_tool:
self._end_frame(rec, output_summary=thought_summary,
route="interpreter", condition="has_tool_output=True")
# Frame 4: Interpreter
rec = self._begin_frame(4, "interpreter",
input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]")
interpreted = await self.nodes["interpreter"].interpret(
thought.tool_used, thought.tool_output, command.source_text)
thought.response = interpreted.summary
interp_summary = f"summary[{len(interpreted.summary)}] facts={interpreted.key_facts}"
self._end_frame(rec, output_summary=interp_summary, route="output+ui")
# Frame 5: Output
rec = self._begin_frame(5, "output+ui",
input_summary=f"interpreted: {interpreted.summary[:80]}")
path = "director+interpreter"
else:
self._end_frame(rec, output_summary=thought_summary,
route="output+ui",
condition="has_tool_output=False" if not has_tool else "")
# Frame 4: Output
rec = self._begin_frame(4, "output+ui",
input_summary=f"response: {thought.response[:80]}")
path = "director"
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
self._trim_history()
controls_count = len(self.ui_node.current_controls)
self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}")
self._end_trace(path)
await self._emit_trace_hud()
return self._make_result(response)
async def _run_thinker_pipeline(self, command: Command, mem_ctx: str,
dashboard: list = None) -> dict:
"""v1: Input(F1) → Thinker(F2) → Output(F3)."""
a = command.analysis
# Frame 2: Thinker
rec = self._begin_frame(2, "thinker",
input_summary=f"intent={a.intent} topic={a.topic}")
director = self.nodes.get("director")
if director and hasattr(director, "plan"):
is_complex = command.analysis.complexity == "complex"
text = command.source_text
is_data_request = (command.analysis.intent in ("request", "action")
and any(k in text.lower()
for k in ["daten", "data", "database", "db", "tabelle", "table",
"query", "abfrage", "untersuche", "investigate",
"analyse", "analyze", "customer", "kunde"]))
if is_complex or (is_data_request and len(text.split()) > 8):
await director.plan(self.history, self.memorizer.state, text)
mem_ctx = self._build_context(dashboard)
thought = await self.nodes["thinker"].process(command, self.history, memory_context=mem_ctx)
if director and hasattr(director, "current_plan"):
director.current_plan = ""
thought_summary = f"response[{len(thought.response)}] tool={thought.tool_used or 'none'}"
self._end_frame(rec, output_summary=thought_summary, route="output+ui")
# Frame 3: Output
rec = self._begin_frame(3, "output+ui",
input_summary=f"response: {thought.response[:80]}")
response = await self._run_output_and_ui(thought, mem_ctx)
self.history.append({"role": "assistant", "content": response})
await self.memorizer.update(self.history)
if director and hasattr(director, "update"):
await director.update(self.history, self.memorizer.state)
self._trim_history()
self._end_frame(rec, output_summary=f"response[{len(response)}]")
self._end_trace("thinker")
await self._emit_trace_hud()
return self._make_result(response)
# --- Action handling ---
async def _handle_action(self, text: str, dashboard: list = None) -> dict:
"""Handle ACTION: messages (button clicks)."""
@ -595,8 +662,8 @@ class FrameEngine:
await self._emit_trace_hud()
return self._make_result(result)
# Complex action — needs full pipeline
self._end_frame(rec, output_summary="no local handler", route="pa/director/thinker")
# Complex action — needs full pipeline via edge walking
self._end_frame(rec, output_summary="no local handler", route="edge_walk")
action_desc = f"ACTION: {action}"
if data:
@ -608,12 +675,8 @@ class FrameEngine:
analysis=InputAnalysis(intent="action", topic=action, complexity="simple"),
source_text=action_desc)
if self.has_pa:
return await self._run_expert_pipeline(command, mem_ctx, dashboard)
elif self.has_director:
return await self._run_director_pipeline(command, mem_ctx, dashboard)
else:
return await self._run_thinker_pipeline(command, mem_ctx, dashboard)
ctx = _WalkCtx(command=command, mem_ctx=mem_ctx, dashboard=dashboard)
return await self._walk_edges(ctx)
# --- Helpers ---
@ -670,6 +733,29 @@ class FrameEngine:
lines.append(f" - {ctype}: {ctrl.get('label', ctrl.get('text', '?'))}")
return "\n".join(lines)
def _make_progress_wrapper(self, original_hud, language: str):
"""Wrap an expert's send_hud to also stream progress deltas to the user."""
sink = self.sink
progress_map = {
"tool_call": {"query_db": "Daten werden abgerufen..." if language == "de" else "Fetching data...",
"emit_actions": "UI wird erstellt..." if language == "de" else "Building UI...",
"create_machine": "Maschine wird erstellt..." if language == "de" else "Creating machine...",
"_default": "Verarbeite..." if language == "de" else "Processing..."},
"tool_result": {"_default": ""}, # silent on result
"planned": {"_default": "Plan erstellt..." if language == "de" else "Plan ready..."},
}
async def wrapper(data: dict):
await original_hud(data)
event = data.get("event", "")
if event in progress_map:
tool = data.get("tool", "_default")
msg = progress_map[event].get(tool, progress_map[event].get("_default", ""))
if msg:
await sink.send_delta(msg + "\n")
return wrapper
async def _run_output_and_ui(self, thought: ThoughtResult, mem_ctx: str) -> str:
"""Run Output and UI nodes in parallel. Returns response text."""
self.sink.reset()
@ -686,16 +772,6 @@ class FrameEngine:
await self.sink.send_artifacts(artifacts)
return response
def _check_condition(self, name: str, command: Command = None,
thought: ThoughtResult = None) -> bool:
"""Evaluate a named condition."""
if name == "reflex" and command:
return (command.analysis.intent == "social"
and command.analysis.complexity == "trivial")
if name == "has_tool_output" and thought:
return bool(thought.tool_used and thought.tool_output)
return False
def _make_result(self, response: str) -> dict:
"""Build the result dict returned to callers."""
return {