From cf42951b77bee52c209e22bab5f69a8679e33096 Mon Sep 17 00:00:00 2001 From: Nico Date: Fri, 3 Apr 2026 18:05:21 +0200 Subject: [PATCH] Implement config-driven models (Phase 1): graph MODELS dict, instantiate applies, per-request overrides MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Graph definitions (v3, v4) now declare MODELS mapping role → model string - engine.py extracts MODELS and applies to nodes during instantiation - frame_engine.process_message() accepts model_overrides for per-request swaps (restored via try/finally after processing) - 11/11 engine tests green Co-Authored-By: Claude Opus 4.6 (1M context) --- agent/engine.py | 9 +++- agent/frame_engine.py | 103 ++++++++++++++++++++++---------------- agent/graphs/v3_framed.py | 9 ++++ agent/graphs/v4_eras.py | 9 ++++ 4 files changed, 87 insertions(+), 43 deletions(-) diff --git a/agent/engine.py b/agent/engine.py index e7e014b..bbd5ed9 100644 --- a/agent/engine.py +++ b/agent/engine.py @@ -63,6 +63,7 @@ def _graph_from_module(mod) -> dict: "conditions": getattr(mod, "CONDITIONS", {}), "audit": getattr(mod, "AUDIT", {}), "engine": getattr(mod, "ENGINE", "imperative"), + "models": getattr(mod, "MODELS", {}), } @@ -79,7 +80,13 @@ def instantiate_nodes(graph: dict, send_hud, process_manager: ProcessManager = N nodes[role] = cls(send_hud=send_hud, process_manager=process_manager) else: nodes[role] = cls(send_hud=send_hud) - log.info(f"[engine] {role} = {impl_name} ({cls.__name__})") + # Apply model from graph config (overrides class default) + model = graph.get("models", {}).get(role) + if model and hasattr(nodes[role], "model"): + nodes[role].model = model + log.info(f"[engine] {role} = {impl_name} ({cls.__name__}) model={model}") + else: + log.info(f"[engine] {role} = {impl_name} ({cls.__name__})") return nodes diff --git a/agent/frame_engine.py b/agent/frame_engine.py index 8cf2c46..9d224a8 100644 --- a/agent/frame_engine.py +++ b/agent/frame_engine.py @@ -173,57 +173,76 @@ class FrameEngine: # --- Main entry point --- - async def process_message(self, text: str, dashboard: list = None) -> dict: + async def process_message(self, text: str, dashboard: list = None, + model_overrides: dict = None) -> dict: """Process a message through the frame pipeline. - Returns {response, controls, memorizer, frames, trace}.""" + Returns {response, controls, memorizer, frames, trace}. - self._begin_trace(text) + model_overrides: optional {role: model} to override node models for this request only. + """ + # Apply per-request model overrides (restored after processing) + saved_models = {} + if model_overrides: + for role, model in model_overrides.items(): + node = self.nodes.get(role) + if node and hasattr(node, "model"): + saved_models[role] = node.model + node.model = model - # Handle ACTION: prefix - if text.startswith("ACTION:"): - return await self._handle_action(text, dashboard) + try: + self._begin_trace(text) - # Setup - envelope = Envelope( - text=text, user_id=self.identity, - session_id="test", timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), - ) - self.sensor.note_user_activity() - if dashboard is not None: - self.sensor.update_browser_dashboard(dashboard) - self.history.append({"role": "user", "content": text}) + # Handle ACTION: prefix + if text.startswith("ACTION:"): + return await self._handle_action(text, dashboard) - # --- Frame 1: Input --- - mem_ctx = self._build_context(dashboard) - rec = self._begin_frame(1, "input", input_summary=text[:100]) + # Setup + envelope = Envelope( + text=text, user_id=self.identity, + session_id="test", timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), + ) + self.sensor.note_user_activity() + if dashboard is not None: + self.sensor.update_browser_dashboard(dashboard) + self.history.append({"role": "user", "content": text}) - command = await self.nodes["input"].process( - envelope, self.history, memory_context=mem_ctx, - identity=self.identity, channel=self.channel) + # --- Frame 1: Input --- + mem_ctx = self._build_context(dashboard) + rec = self._begin_frame(1, "input", input_summary=text[:100]) - a = command.analysis - cmd_summary = f"intent={a.intent} language={a.language} tone={a.tone} complexity={a.complexity}" + command = await self.nodes["input"].process( + envelope, self.history, memory_context=mem_ctx, + identity=self.identity, channel=self.channel) - # Check reflex condition - is_reflex = self._check_condition("reflex", command=command) - if is_reflex: - self._end_frame(rec, output_summary=cmd_summary, - route="output (reflex)", condition="reflex=True") - await self._send_hud({"node": "runtime", "event": "reflex_path", - "detail": f"{a.intent}/{a.complexity}"}) - return await self._run_reflex(command, mem_ctx) - else: - next_node = "pa" if self.has_pa else ("director" if self.has_director else "thinker") - self._end_frame(rec, output_summary=cmd_summary, - route=next_node, condition=f"reflex=False") + a = command.analysis + cmd_summary = f"intent={a.intent} language={a.language} tone={a.tone} complexity={a.complexity}" - # --- Frame 2+: Pipeline --- - if self.has_pa: - return await self._run_expert_pipeline(command, mem_ctx, dashboard) - elif self.has_director: - return await self._run_director_pipeline(command, mem_ctx, dashboard) - else: - return await self._run_thinker_pipeline(command, mem_ctx, dashboard) + # Check reflex condition + is_reflex = self._check_condition("reflex", command=command) + if is_reflex: + self._end_frame(rec, output_summary=cmd_summary, + route="output (reflex)", condition="reflex=True") + await self._send_hud({"node": "runtime", "event": "reflex_path", + "detail": f"{a.intent}/{a.complexity}"}) + return await self._run_reflex(command, mem_ctx) + else: + next_node = "pa" if self.has_pa else ("director" if self.has_director else "thinker") + self._end_frame(rec, output_summary=cmd_summary, + route=next_node, condition=f"reflex=False") + + # --- Frame 2+: Pipeline --- + if self.has_pa: + return await self._run_expert_pipeline(command, mem_ctx, dashboard) + elif self.has_director: + return await self._run_director_pipeline(command, mem_ctx, dashboard) + else: + return await self._run_thinker_pipeline(command, mem_ctx, dashboard) + finally: + # Restore original models after per-request overrides + for role, original_model in saved_models.items(): + node = self.nodes.get(role) + if node: + node.model = original_model # --- Pipeline variants --- diff --git a/agent/graphs/v3_framed.py b/agent/graphs/v3_framed.py index 7d921d5..7dd0f87 100644 --- a/agent/graphs/v3_framed.py +++ b/agent/graphs/v3_framed.py @@ -62,4 +62,13 @@ CONDITIONS = { "has_tool_output": "thinker.tool_used is not empty", } +MODELS = { + "input": "google/gemini-2.0-flash-001", + "director": "anthropic/claude-haiku-4.5", + "thinker": "google/gemini-2.0-flash-001", + "interpreter": "google/gemini-2.0-flash-001", + "output": "google/gemini-2.0-flash-001", + "memorizer": "google/gemini-2.0-flash-001", +} + AUDIT = {} diff --git a/agent/graphs/v4_eras.py b/agent/graphs/v4_eras.py index 3fe0013..ac005ec 100644 --- a/agent/graphs/v4_eras.py +++ b/agent/graphs/v4_eras.py @@ -68,4 +68,13 @@ CONDITIONS = { "has_tool_output": "expert.tool_used is not empty", } +MODELS = { + "input": "google/gemini-2.0-flash-001", + "pa": "anthropic/claude-haiku-4.5", + "expert_eras": "google/gemini-2.0-flash-001", + "interpreter": "google/gemini-2.0-flash-001", + "output": "google/gemini-2.0-flash-001", + "memorizer": "google/gemini-2.0-flash-001", +} + AUDIT = {}