From f30da0763684f9eeb4613730f866d57014922e81 Mon Sep 17 00:00:00 2001 From: Nico Date: Fri, 3 Apr 2026 19:00:57 +0200 Subject: [PATCH] Phase 3: Engine walks graph edges instead of hardcoded pipelines Replace 4 hardcoded pipeline methods (_run_expert_pipeline, _run_director_pipeline, _run_thinker_pipeline + dispatch logic) with a single _walk_edges() method that follows declared data edges. Key changes: - _WalkCtx dataclass carries state through the edge walk - _eval_condition() evaluates conditions against walk context - _resolve_edge() picks active edge (conditional > unconditional) - Dispatch adapters per node role (_dispatch_pa, _dispatch_expert, etc.) - PA retry + progress wrapping stay as expert dispatch adapter logic - process_message() and _handle_action() use _walk_edges() - _run_reflex() kept as simple 2-frame shortcut 15/15 engine tests + 9/9 matrix tests green, identical traces. Co-Authored-By: Claude Opus 4.6 (1M context) --- agent/frame_engine.py | 702 +++++++++++++++++++++++------------------- 1 file changed, 389 insertions(+), 313 deletions(-) diff --git a/agent/frame_engine.py b/agent/frame_engine.py index 7aed5fb..d33c8ea 100644 --- a/agent/frame_engine.py +++ b/agent/frame_engine.py @@ -1,12 +1,15 @@ -"""Frame Engine: tick-based deterministic pipeline execution. +"""Frame Engine: edge-walking deterministic pipeline execution. -Replaces the imperative handle_message() with a frame-stepping model: -- Each frame dispatches all nodes that have pending input -- Frames advance on completion (not on a timer) -- 0ms when idle, engine awaits external input -- Deterministic ordering: reflex=2 frames, thinker=3-4, interpreter=5 +Walks the graph's data edges to determine pipeline flow. Each step +dispatches a node and evaluates conditions on outgoing edges to pick +the next node. Frames advance on node completion (not on a timer). -Works with any graph definition (v1, v2, v3). Node implementations unchanged. +Edge types: + data — typed objects flowing between nodes (walked by engine) + context — text injected into LLM prompts (aggregated via _build_context) + state — shared mutable state reads (consumed by sensor/runtime) + +Works with any graph definition (v1-v4). Node implementations unchanged. """ import asyncio @@ -72,8 +75,22 @@ class FrameTrace: } +# --- Walk context: carries state through the edge walk --- + +@dataclass +class _WalkCtx: + """Mutable context passed through the edge walker.""" + command: Command = None + routing: PARouting = None + plan: DirectorPlan = None + thought: ThoughtResult = None + mem_ctx: str = "" + dashboard: list = None + path_nodes: list = field(default_factory=list) # visited node names for path label + + class FrameEngine: - """Tick-based engine that steps through graph nodes frame by frame.""" + """Edge-walking engine that steps through graph nodes frame by frame.""" def __init__(self, graph: dict, nodes: dict, sink, history: list, send_hud, sensor, memorizer, ui_node, identity: str = "unknown", @@ -93,7 +110,7 @@ class FrameEngine: self.frame = 0 self.bus = {} self.conditions = graph.get("conditions", {}) - self.edges = [e for e in graph.get("edges", []) if e.get("type") == "data"] + self.data_edges = [e for e in graph.get("edges", []) if e.get("type") == "data"] self.has_director = "director" in nodes and hasattr(nodes.get("director"), "decide") self.has_interpreter = "interpreter" in nodes @@ -172,6 +189,328 @@ class FrameEngine: "trace": t.to_dict(), }) + # --- Condition evaluation --- + + def _eval_condition(self, name: str, ctx: _WalkCtx) -> bool: + """Evaluate a named condition against walk context.""" + if name == "reflex": + return (ctx.command and ctx.command.analysis.intent == "social" + and ctx.command.analysis.complexity == "trivial") + if name == "has_tool_output": + return bool(ctx.thought and ctx.thought.tool_used and ctx.thought.tool_output) + # PA routing conditions + if name == "expert_is_none": + return ctx.routing is not None and ctx.routing.expert == "none" + if name.startswith("expert_is_"): + expert = name[len("expert_is_"):] + return ctx.routing is not None and ctx.routing.expert == expert + return False + + def _check_condition(self, name: str, command: Command = None, + thought: ThoughtResult = None) -> bool: + """Legacy wrapper for _eval_condition (used by tests).""" + ctx = _WalkCtx(command=command, thought=thought) + return self._eval_condition(name, ctx) + + # --- Edge resolution --- + + def _resolve_edge(self, outgoing: list, ctx: _WalkCtx) -> dict | None: + """Pick the active edge from a node's outgoing data edges. + Conditional edges take priority when they match.""" + conditional = [e for e in outgoing if e.get("condition")] + unconditional = [e for e in outgoing if not e.get("condition")] + + for edge in conditional: + if self._eval_condition(edge["condition"], ctx): + return edge + + return unconditional[0] if unconditional else None + + # --- Node dispatch adapters --- + + async def _dispatch_pa(self, ctx: _WalkCtx) -> str: + """Dispatch PA node. Returns route summary.""" + a = ctx.command.analysis + rec = self._begin_frame(self.frame + 1, "pa", + input_summary=f"intent={a.intent} topic={a.topic}") + routing = await self.nodes["pa"].route( + ctx.command, self.history, memory_context=ctx.mem_ctx, + identity=self.identity, channel=self.channel) + ctx.routing = routing + route_summary = f"expert={routing.expert} job={routing.job[:60]}" + self._end_frame(rec, output_summary=route_summary, + route=f"expert_{routing.expert}" if routing.expert != "none" else "output") + + # Stream thinking message + if routing.thinking_message: + await self.sink.send_delta(routing.thinking_message + "\n\n") + + return routing.expert + + async def _dispatch_expert(self, expert_role: str, ctx: _WalkCtx): + """Dispatch an expert node with progress wrapping and PA retry.""" + expert = self._experts.get(expert_role.replace("expert_", "")) + if not expert: + expert_name = expert_role.replace("expert_", "") + log.error(f"[frame] expert '{expert_name}' not found") + ctx.thought = ThoughtResult(response=f"Expert '{expert_name}' not available.") + return + + expert_name = expert_role.replace("expert_", "") + rec = self._begin_frame(self.frame + 1, expert_role, + input_summary=f"job: {ctx.routing.job[:80]}") + + # Wrap expert HUD for progress streaming + original_hud = expert.send_hud + expert.send_hud = self._make_progress_wrapper(original_hud, ctx.routing.language) + try: + thought = await expert.execute(ctx.routing.job, ctx.routing.language) + finally: + expert.send_hud = original_hud + + ctx.thought = thought + thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} " + f"actions={len(thought.actions)} errors={len(thought.errors)}") + has_tool = bool(thought.tool_used and thought.tool_output) + + # --- PA retry: expert failed or skipped tools --- + expectation = self.memorizer.state.get("user_expectation", "conversational") + job_needs_data = any(k in (ctx.routing.job or "").lower() + for k in ["query", "select", "tabelle", "table", "daten", "data", + "cost", "kosten", "count", "anzahl", "average", "schnitt", + "find", "finde", "show", "zeig", "list", "beschreib"]) + expert_skipped_tools = not has_tool and not thought.errors and job_needs_data + if (thought.errors or expert_skipped_tools) and not has_tool and expectation in ("delegated", "waiting_input", "conversational"): + retry_reason = f"{len(thought.errors)} errors" if thought.errors else "no tool calls for data job" + self._end_frame(rec, output_summary=thought_summary, + route="pa_retry", condition=f"expert_failed ({retry_reason}), expectation={expectation}") + await self._send_hud({"node": "runtime", "event": "pa_retry", + "detail": f"expert failed: {retry_reason}, retrying via PA"}) + + retry_msg = "Anderer Ansatz..." if ctx.routing.language == "de" else "Trying a different approach..." + await self.sink.send_delta(retry_msg + "\n") + + retry_errors = thought.errors if thought.errors else [ + {"query": "(none)", "error": "Expert produced no database queries. The job requires data lookup but the expert answered without querying. Reformulate with explicit query instructions."} + ] + error_summary = "; ".join(e.get("error", "")[:80] for e in retry_errors[-2:]) + rec = self._begin_frame(self.frame + 1, "pa_retry", + input_summary=f"errors: {error_summary[:100]}") + routing2 = await self.nodes["pa"].route_retry( + ctx.command, self.history, memory_context=ctx.mem_ctx, + identity=self.identity, channel=self.channel, + original_job=ctx.routing.job, errors=retry_errors) + self._end_frame(rec, output_summary=f"retry_job: {(routing2.job or '')[:60]}", + route=f"expert_{routing2.expert}" if routing2.expert != "none" else "output") + + if routing2.expert != "none": + expert2 = self._experts.get(routing2.expert, expert) + rec = self._begin_frame(self.frame + 1, f"expert_{routing2.expert}_retry", + input_summary=f"retry job: {(routing2.job or '')[:80]}") + original_hud2 = expert2.send_hud + expert2.send_hud = self._make_progress_wrapper(original_hud2, routing2.language) + try: + thought = await expert2.execute(routing2.job, routing2.language) + finally: + expert2.send_hud = original_hud2 + ctx.thought = thought + thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} " + f"errors={len(thought.errors)}") + has_tool = bool(thought.tool_used and thought.tool_output) + self._end_frame(rec, output_summary=thought_summary, + route="interpreter" if has_tool else "output+ui") + ctx.routing = routing2 + return + + # Normal completion (no retry) + # Don't end frame yet — caller checks interpreter condition and sets route + + async def _dispatch_director(self, ctx: _WalkCtx): + """Dispatch Director node.""" + a = ctx.command.analysis + rec = self._begin_frame(self.frame + 1, "director", + input_summary=f"intent={a.intent} topic={a.topic}") + plan = await self.nodes["director"].decide(ctx.command, self.history, memory_context=ctx.mem_ctx) + ctx.plan = plan + plan_summary = f"goal={plan.goal} tools={len(plan.tool_sequence)} hint={plan.response_hint[:50]}" + self._end_frame(rec, output_summary=plan_summary, route="thinker") + + async def _dispatch_thinker(self, ctx: _WalkCtx): + """Dispatch Thinker node (v1 or v2).""" + a = ctx.command.analysis + rec = self._begin_frame(self.frame + 1, "thinker", + input_summary=f"intent={a.intent} topic={a.topic}" if not ctx.plan + else f"goal={ctx.plan.goal} tools={len(ctx.plan.tool_sequence)}") + + # v1 hybrid: optional director pre-planning + director = self.nodes.get("director") + if director and hasattr(director, "plan") and not ctx.plan: + is_complex = ctx.command.analysis.complexity == "complex" + text = ctx.command.source_text + is_data_request = (ctx.command.analysis.intent in ("request", "action") + and any(k in text.lower() + for k in ["daten", "data", "database", "db", "tabelle", "table", + "query", "abfrage", "untersuche", "investigate", + "analyse", "analyze", "customer", "kunde"])) + if is_complex or (is_data_request and len(text.split()) > 8): + await director.plan(self.history, self.memorizer.state, text) + ctx.mem_ctx = self._build_context(ctx.dashboard) + + if ctx.plan: + thought = await self.nodes["thinker"].process( + ctx.command, ctx.plan, self.history, memory_context=ctx.mem_ctx) + else: + thought = await self.nodes["thinker"].process( + ctx.command, self.history, memory_context=ctx.mem_ctx) + + if director and hasattr(director, "current_plan"): + director.current_plan = "" + + ctx.thought = thought + thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} " + f"actions={len(thought.actions)}") + self._end_frame(rec, output_summary=thought_summary, route="output+ui") + + async def _dispatch_interpreter(self, ctx: _WalkCtx): + """Dispatch Interpreter node.""" + rec = self._begin_frame(self.frame + 1, "interpreter", + input_summary=f"tool={ctx.thought.tool_used} output[{len(ctx.thought.tool_output)}]") + # Use routing.job for expert pipeline, source_text for director pipeline + job = ctx.routing.job if ctx.routing else ctx.command.source_text + interpreted = await self.nodes["interpreter"].interpret( + ctx.thought.tool_used, ctx.thought.tool_output, job) + ctx.thought.response = interpreted.summary + self._end_frame(rec, output_summary=f"summary[{len(interpreted.summary)}]", route="output+ui") + + async def _finish_pipeline(self, ctx: _WalkCtx) -> dict: + """Common tail: output+ui parallel, memorizer update, trace.""" + # If no thought yet (pa_direct path), create from routing + if not ctx.thought and ctx.routing: + ctx.thought = ThoughtResult(response=ctx.routing.response_hint, actions=[]) + + rec = self._begin_frame(self.frame + 1, "output+ui", + input_summary=f"response: {(ctx.thought.response or '')[:80]}") + + self.sink.reset() + response = await self._run_output_and_ui(ctx.thought, ctx.mem_ctx) + self.history.append({"role": "assistant", "content": response}) + await self.memorizer.update(self.history) + + # v1 director post-processing + director = self.nodes.get("director") + if director and hasattr(director, "update") and not self.has_pa: + await director.update(self.history, self.memorizer.state) + + self._trim_history() + + controls_count = len(self.ui_node.current_controls) + self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}") + + # Build path label from visited nodes + path = self._build_path_label(ctx) + self._end_trace(path) + await self._emit_trace_hud() + return self._make_result(response) + + def _build_path_label(self, ctx: _WalkCtx) -> str: + """Build trace path label from visited nodes.""" + nodes = ctx.path_nodes + if not nodes: + return "unknown" + # Map visited nodes to path labels + has_interpreter = "interpreter" in nodes + if any(n.startswith("expert_") for n in nodes): + return "expert+interpreter" if has_interpreter else "expert" + if "director" in nodes: + return "director+interpreter" if has_interpreter else "director" + if "thinker" in nodes: + return "thinker" + if "pa" in nodes and not any(n.startswith("expert_") for n in nodes): + return "pa_direct" + return "unknown" + + # --- Edge walker --- + + async def _walk_edges(self, ctx: _WalkCtx) -> dict: + """Walk data edges from input node through the graph. + Returns the pipeline result dict.""" + current = "input" # just finished frame 1 + + while True: + # Find outgoing data edges from current node + outgoing = [e for e in self.data_edges if e["from"] == current] + if not outgoing: + break + + # Resolve which edge to follow + edge = self._resolve_edge(outgoing, ctx) + if not edge: + break + + target = edge["to"] + + # Parallel target [output, ui] or terminal output → finish + if isinstance(target, list) or target == "output" or target == "memorizer": + return await self._finish_pipeline(ctx) + + # Dispatch the target node + ctx.path_nodes.append(target) + + if target == "pa": + await self._dispatch_pa(ctx) + current = "pa" + + elif target.startswith("expert_"): + await self._dispatch_expert(target, ctx) + # After expert, check interpreter condition + has_tool = bool(ctx.thought and ctx.thought.tool_used and ctx.thought.tool_output) + if self.has_interpreter and has_tool: + # End expert frame with interpreter route + last_rec = self.last_trace.frames[-1] + if not last_rec.route: # not already ended by retry + self._end_frame(last_rec, + output_summary=f"response[{len(ctx.thought.response)}] tool={ctx.thought.tool_used}", + route="interpreter", condition="has_tool_output=True") + ctx.path_nodes.append("interpreter") + await self._dispatch_interpreter(ctx) + else: + # End expert frame with output route + last_rec = self.last_trace.frames[-1] + if not last_rec.route: + thought_summary = (f"response[{len(ctx.thought.response)}] tool={ctx.thought.tool_used or 'none'}") + self._end_frame(last_rec, output_summary=thought_summary, + route="output+ui", + condition="has_tool_output=False" if not has_tool else "") + return await self._finish_pipeline(ctx) + + elif target == "director": + await self._dispatch_director(ctx) + current = "director" + + elif target == "thinker": + await self._dispatch_thinker(ctx) + # After thinker, check interpreter condition + has_tool = bool(ctx.thought and ctx.thought.tool_used and ctx.thought.tool_output) + if self.has_interpreter and has_tool: + last_rec = self.last_trace.frames[-1] + self._end_frame(last_rec, + output_summary=f"response[{len(ctx.thought.response)}] tool={ctx.thought.tool_used}", + route="interpreter", condition="has_tool_output=True") + ctx.path_nodes.append("interpreter") + await self._dispatch_interpreter(ctx) + return await self._finish_pipeline(ctx) + + elif target == "interpreter": + ctx.path_nodes.append("interpreter") + await self._dispatch_interpreter(ctx) + return await self._finish_pipeline(ctx) + + else: + log.warning(f"[frame] unknown target node: {target}") + break + + return await self._finish_pipeline(ctx) + # --- Main entry point --- async def process_message(self, text: str, dashboard: list = None, @@ -221,26 +560,26 @@ class FrameEngine: a = command.analysis cmd_summary = f"intent={a.intent} language={a.language} tone={a.tone} complexity={a.complexity}" + # Build walk context + ctx = _WalkCtx(command=command, mem_ctx=mem_ctx, dashboard=dashboard) + # Check reflex condition - is_reflex = self._check_condition("reflex", command=command) + is_reflex = self._eval_condition("reflex", ctx) if is_reflex: self._end_frame(rec, output_summary=cmd_summary, route="output (reflex)", condition="reflex=True") await self._send_hud({"node": "runtime", "event": "reflex_path", "detail": f"{a.intent}/{a.complexity}"}) return await self._run_reflex(command, mem_ctx) - else: - next_node = "pa" if self.has_pa else ("director" if self.has_director else "thinker") - self._end_frame(rec, output_summary=cmd_summary, - route=next_node, condition=f"reflex=False") - # --- Frame 2+: Pipeline --- - if self.has_pa: - return await self._run_expert_pipeline(command, mem_ctx, dashboard) - elif self.has_director: - return await self._run_director_pipeline(command, mem_ctx, dashboard) - else: - return await self._run_thinker_pipeline(command, mem_ctx, dashboard) + # Find next node from edges + outgoing = [e for e in self.data_edges if e["from"] == "input" and not e.get("condition")] + next_node = outgoing[0]["to"] if outgoing else "unknown" + self._end_frame(rec, output_summary=cmd_summary, + route=next_node, condition="reflex=False") + + # Walk remaining edges + return await self._walk_edges(ctx) finally: # Restore original models after per-request overrides for role, original_model in saved_models.items(): @@ -248,7 +587,7 @@ class FrameEngine: if node: node.model = original_model - # --- Pipeline variants --- + # --- Reflex (kept simple — 2 frames, no edge walking needed) --- async def _run_reflex(self, command: Command, mem_ctx: str) -> dict: """Reflex: Input(F1) → Output(F2).""" @@ -266,279 +605,7 @@ class FrameEngine: await self._emit_trace_hud() return self._make_result(response) - async def _run_expert_pipeline(self, command: Command, mem_ctx: str, - dashboard: list = None) -> dict: - """Expert pipeline: Input(F1) → PA(F2) → Expert(F3) → [Interpreter(F4)] → Output.""" - a = command.analysis - - # Frame 2: PA routes - rec = self._begin_frame(2, "pa", - input_summary=f"intent={a.intent} topic={a.topic}") - routing = await self.nodes["pa"].route( - command, self.history, memory_context=mem_ctx, - identity=self.identity, channel=self.channel) - route_summary = f"expert={routing.expert} job={routing.job[:60]}" - self._end_frame(rec, output_summary=route_summary, - route=f"expert_{routing.expert}" if routing.expert != "none" else "output") - - # Stream thinking message to user - if routing.thinking_message: - await self.sink.send_delta(routing.thinking_message + "\n\n") - - # Direct PA response (no expert needed) - if routing.expert == "none": - rec = self._begin_frame(3, "output+ui", - input_summary=f"pa_direct: {routing.response_hint[:80]}") - thought = ThoughtResult(response=routing.response_hint, actions=[]) - response = await self._run_output_and_ui(thought, mem_ctx) - self.history.append({"role": "assistant", "content": response}) - await self.memorizer.update(self.history) - self._trim_history() - self._end_frame(rec, output_summary=f"response[{len(response)}]") - self._end_trace("pa_direct") - await self._emit_trace_hud() - return self._make_result(response) - - # Frame 3: Expert executes - expert = self._experts.get(routing.expert) - if not expert: - log.error(f"[frame] expert '{routing.expert}' not found") - thought = ThoughtResult(response=f"Expert '{routing.expert}' not available.") - rec = self._begin_frame(3, "output+ui", input_summary="expert_not_found") - response = await self._run_output_and_ui(thought, mem_ctx) - self.history.append({"role": "assistant", "content": response}) - self._end_frame(rec, output_summary="error", error=f"expert '{routing.expert}' not found") - self._end_trace("expert_error") - await self._emit_trace_hud() - return self._make_result(response) - - rec = self._begin_frame(3, f"expert_{routing.expert}", - input_summary=f"job: {routing.job[:80]}") - - # Wrap expert's HUD to stream progress to user - original_hud = expert.send_hud - expert.send_hud = self._make_progress_wrapper(original_hud, routing.language) - - try: - thought = await expert.execute(routing.job, routing.language) - finally: - expert.send_hud = original_hud - - thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} " - f"actions={len(thought.actions)} errors={len(thought.errors)}") - has_tool = bool(thought.tool_used and thought.tool_output) - - # PA retry: if expert failed OR skipped tools when data was needed - expectation = self.memorizer.state.get("user_expectation", "conversational") - # Detect hallucination: expert returned no tool output for a data job - job_needs_data = any(k in (routing.job or "").lower() - for k in ["query", "select", "tabelle", "table", "daten", "data", - "cost", "kosten", "count", "anzahl", "average", "schnitt", - "find", "finde", "show", "zeig", "list", "beschreib"]) - expert_skipped_tools = not has_tool and not thought.errors and job_needs_data - if (thought.errors or expert_skipped_tools) and not has_tool and expectation in ("delegated", "waiting_input", "conversational"): - retry_reason = f"{len(thought.errors)} errors" if thought.errors else "no tool calls for data job" - self._end_frame(rec, output_summary=thought_summary, - route="pa_retry", condition=f"expert_failed ({retry_reason}), expectation={expectation}") - await self._send_hud({"node": "runtime", "event": "pa_retry", - "detail": f"expert failed: {retry_reason}, retrying via PA"}) - - # Stream retry notice to user - retry_msg = "Anderer Ansatz..." if routing.language == "de" else "Trying a different approach..." - await self.sink.send_delta(retry_msg + "\n") - - # PA reformulates with error context - retry_errors = thought.errors if thought.errors else [ - {"query": "(none)", "error": "Expert produced no database queries. The job requires data lookup but the expert answered without querying. Reformulate with explicit query instructions."} - ] - error_summary = "; ".join(e.get("error", "")[:80] for e in retry_errors[-2:]) - rec = self._begin_frame(self.frame + 1, "pa_retry", - input_summary=f"errors: {error_summary[:100]}") - routing2 = await self.nodes["pa"].route_retry( - command, self.history, memory_context=mem_ctx, - identity=self.identity, channel=self.channel, - original_job=routing.job, errors=retry_errors) - self._end_frame(rec, output_summary=f"retry_job: {(routing2.job or '')[:60]}", - route=f"expert_{routing2.expert}" if routing2.expert != "none" else "output") - - if routing2.expert != "none": - expert2 = self._experts.get(routing2.expert, expert) - rec = self._begin_frame(self.frame + 1, f"expert_{routing2.expert}_retry", - input_summary=f"retry job: {(routing2.job or '')[:80]}") - original_hud2 = expert2.send_hud - expert2.send_hud = self._make_progress_wrapper(original_hud2, routing2.language) - try: - thought = await expert2.execute(routing2.job, routing2.language) - finally: - expert2.send_hud = original_hud2 - thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} " - f"errors={len(thought.errors)}") - has_tool = bool(thought.tool_used and thought.tool_output) - self._end_frame(rec, output_summary=thought_summary, - route="interpreter" if has_tool else "output+ui") - routing = routing2 # use retry routing for rest of pipeline - - # Interpreter (conditional) - if self.has_interpreter and has_tool: - self._end_frame(rec, output_summary=thought_summary, - route="interpreter", condition="has_tool_output=True") - rec = self._begin_frame(4, "interpreter", - input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]") - interpreted = await self.nodes["interpreter"].interpret( - thought.tool_used, thought.tool_output, routing.job) - thought.response = interpreted.summary - self._end_frame(rec, output_summary=f"summary[{len(interpreted.summary)}]", route="output+ui") - - rec = self._begin_frame(5, "output+ui", - input_summary=f"interpreted: {interpreted.summary[:80]}") - path = "expert+interpreter" - else: - self._end_frame(rec, output_summary=thought_summary, - route="output+ui", - condition="has_tool_output=False" if not has_tool else "") - rec = self._begin_frame(4, "output+ui", - input_summary=f"response: {thought.response[:80]}") - path = "expert" - - # Clear progress text, render final response - self.sink.reset() - response = await self._run_output_and_ui(thought, mem_ctx) - self.history.append({"role": "assistant", "content": response}) - await self.memorizer.update(self.history) - self._trim_history() - - controls_count = len(self.ui_node.current_controls) - self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}") - self._end_trace(path) - await self._emit_trace_hud() - return self._make_result(response) - - def _make_progress_wrapper(self, original_hud, language: str): - """Wrap an expert's send_hud to also stream progress deltas to the user.""" - sink = self.sink - progress_map = { - "tool_call": {"query_db": "Daten werden abgerufen..." if language == "de" else "Fetching data...", - "emit_actions": "UI wird erstellt..." if language == "de" else "Building UI...", - "create_machine": "Maschine wird erstellt..." if language == "de" else "Creating machine...", - "_default": "Verarbeite..." if language == "de" else "Processing..."}, - "tool_result": {"_default": ""}, # silent on result - "planned": {"_default": "Plan erstellt..." if language == "de" else "Plan ready..."}, - } - - async def wrapper(data: dict): - await original_hud(data) - event = data.get("event", "") - if event in progress_map: - tool = data.get("tool", "_default") - msg = progress_map[event].get(tool, progress_map[event].get("_default", "")) - if msg: - await sink.send_delta(msg + "\n") - - return wrapper - - async def _run_director_pipeline(self, command: Command, mem_ctx: str, - dashboard: list = None) -> dict: - """Director: Input(F1) → Director(F2) → Thinker(F3) → [Interpreter(F4)] → Output.""" - a = command.analysis - - # Frame 2: Director - rec = self._begin_frame(2, "director", - input_summary=f"intent={a.intent} topic={a.topic}") - plan = await self.nodes["director"].decide(command, self.history, memory_context=mem_ctx) - plan_summary = f"goal={plan.goal} tools={len(plan.tool_sequence)} hint={plan.response_hint[:50]}" - self._end_frame(rec, output_summary=plan_summary, route="thinker") - - # Frame 3: Thinker - rec = self._begin_frame(3, "thinker", - input_summary=plan_summary[:100]) - thought = await self.nodes["thinker"].process( - command, plan, self.history, memory_context=mem_ctx) - thought_summary = (f"response[{len(thought.response)}] tool={thought.tool_used or 'none'} " - f"actions={len(thought.actions)} machines={len(thought.machine_ops)}") - has_tool = bool(thought.tool_used and thought.tool_output) - - # Check interpreter condition - if self.has_interpreter and has_tool: - self._end_frame(rec, output_summary=thought_summary, - route="interpreter", condition="has_tool_output=True") - - # Frame 4: Interpreter - rec = self._begin_frame(4, "interpreter", - input_summary=f"tool={thought.tool_used} output[{len(thought.tool_output)}]") - interpreted = await self.nodes["interpreter"].interpret( - thought.tool_used, thought.tool_output, command.source_text) - thought.response = interpreted.summary - interp_summary = f"summary[{len(interpreted.summary)}] facts={interpreted.key_facts}" - self._end_frame(rec, output_summary=interp_summary, route="output+ui") - - # Frame 5: Output - rec = self._begin_frame(5, "output+ui", - input_summary=f"interpreted: {interpreted.summary[:80]}") - path = "director+interpreter" - else: - self._end_frame(rec, output_summary=thought_summary, - route="output+ui", - condition="has_tool_output=False" if not has_tool else "") - - # Frame 4: Output - rec = self._begin_frame(4, "output+ui", - input_summary=f"response: {thought.response[:80]}") - path = "director" - - response = await self._run_output_and_ui(thought, mem_ctx) - self.history.append({"role": "assistant", "content": response}) - await self.memorizer.update(self.history) - self._trim_history() - - controls_count = len(self.ui_node.current_controls) - self._end_frame(rec, output_summary=f"response[{len(response)}] controls={controls_count}") - self._end_trace(path) - await self._emit_trace_hud() - return self._make_result(response) - - async def _run_thinker_pipeline(self, command: Command, mem_ctx: str, - dashboard: list = None) -> dict: - """v1: Input(F1) → Thinker(F2) → Output(F3).""" - a = command.analysis - - # Frame 2: Thinker - rec = self._begin_frame(2, "thinker", - input_summary=f"intent={a.intent} topic={a.topic}") - - director = self.nodes.get("director") - if director and hasattr(director, "plan"): - is_complex = command.analysis.complexity == "complex" - text = command.source_text - is_data_request = (command.analysis.intent in ("request", "action") - and any(k in text.lower() - for k in ["daten", "data", "database", "db", "tabelle", "table", - "query", "abfrage", "untersuche", "investigate", - "analyse", "analyze", "customer", "kunde"])) - if is_complex or (is_data_request and len(text.split()) > 8): - await director.plan(self.history, self.memorizer.state, text) - mem_ctx = self._build_context(dashboard) - - thought = await self.nodes["thinker"].process(command, self.history, memory_context=mem_ctx) - if director and hasattr(director, "current_plan"): - director.current_plan = "" - - thought_summary = f"response[{len(thought.response)}] tool={thought.tool_used or 'none'}" - self._end_frame(rec, output_summary=thought_summary, route="output+ui") - - # Frame 3: Output - rec = self._begin_frame(3, "output+ui", - input_summary=f"response: {thought.response[:80]}") - response = await self._run_output_and_ui(thought, mem_ctx) - self.history.append({"role": "assistant", "content": response}) - await self.memorizer.update(self.history) - if director and hasattr(director, "update"): - await director.update(self.history, self.memorizer.state) - self._trim_history() - - self._end_frame(rec, output_summary=f"response[{len(response)}]") - self._end_trace("thinker") - await self._emit_trace_hud() - return self._make_result(response) + # --- Action handling --- async def _handle_action(self, text: str, dashboard: list = None) -> dict: """Handle ACTION: messages (button clicks).""" @@ -595,8 +662,8 @@ class FrameEngine: await self._emit_trace_hud() return self._make_result(result) - # Complex action — needs full pipeline - self._end_frame(rec, output_summary="no local handler", route="pa/director/thinker") + # Complex action — needs full pipeline via edge walking + self._end_frame(rec, output_summary="no local handler", route="edge_walk") action_desc = f"ACTION: {action}" if data: @@ -608,12 +675,8 @@ class FrameEngine: analysis=InputAnalysis(intent="action", topic=action, complexity="simple"), source_text=action_desc) - if self.has_pa: - return await self._run_expert_pipeline(command, mem_ctx, dashboard) - elif self.has_director: - return await self._run_director_pipeline(command, mem_ctx, dashboard) - else: - return await self._run_thinker_pipeline(command, mem_ctx, dashboard) + ctx = _WalkCtx(command=command, mem_ctx=mem_ctx, dashboard=dashboard) + return await self._walk_edges(ctx) # --- Helpers --- @@ -670,6 +733,29 @@ class FrameEngine: lines.append(f" - {ctype}: {ctrl.get('label', ctrl.get('text', '?'))}") return "\n".join(lines) + def _make_progress_wrapper(self, original_hud, language: str): + """Wrap an expert's send_hud to also stream progress deltas to the user.""" + sink = self.sink + progress_map = { + "tool_call": {"query_db": "Daten werden abgerufen..." if language == "de" else "Fetching data...", + "emit_actions": "UI wird erstellt..." if language == "de" else "Building UI...", + "create_machine": "Maschine wird erstellt..." if language == "de" else "Creating machine...", + "_default": "Verarbeite..." if language == "de" else "Processing..."}, + "tool_result": {"_default": ""}, # silent on result + "planned": {"_default": "Plan erstellt..." if language == "de" else "Plan ready..."}, + } + + async def wrapper(data: dict): + await original_hud(data) + event = data.get("event", "") + if event in progress_map: + tool = data.get("tool", "_default") + msg = progress_map[event].get(tool, progress_map[event].get("_default", "")) + if msg: + await sink.send_delta(msg + "\n") + + return wrapper + async def _run_output_and_ui(self, thought: ThoughtResult, mem_ctx: str) -> str: """Run Output and UI nodes in parallel. Returns response text.""" self.sink.reset() @@ -686,16 +772,6 @@ class FrameEngine: await self.sink.send_artifacts(artifacts) return response - def _check_condition(self, name: str, command: Command = None, - thought: ThoughtResult = None) -> bool: - """Evaluate a named condition.""" - if name == "reflex" and command: - return (command.analysis.intent == "social" - and command.analysis.complexity == "trivial") - if name == "has_tool_output" and thought: - return bool(thought.tool_used and thought.tool_output) - return False - def _make_result(self, response: str) -> dict: """Build the result dict returned to callers.""" return {