"""Engine test suite — tests graph loading, node instantiation, frame engine routing, conditions, and trace structure. No LLM calls — all nodes mocked. Tests: graph_load — load_graph returns correct structure for all graphs node_instantiation — instantiate_nodes creates all roles from registry edge_types_complete — all 3 edge types present, no orphan nodes condition_reflex — reflex condition fires on social+trivial only condition_tool_output — has_tool_output condition fires when tool data present frame_trace_reflex — reflex path produces 2-frame trace frame_trace_expert — expert path produces correct frame sequence frame_trace_director — director path produces correct frame sequence """ import asyncio import os import sys import time sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from agent.engine import load_graph, instantiate_nodes, _graph_from_module from agent.frame_engine import FrameEngine, FrameTrace, FrameRecord from agent.types import ( Envelope, Command, InputAnalysis, ThoughtResult, DirectorPlan, PARouting, InterpretedResult, Artifact, ) # --- Helpers --- class MockSink: """Captures streamed output.""" def __init__(self): self.deltas = [] self.controls = [] self.artifacts = [] self.done_count = 0 async def send_delta(self, text): self.deltas.append(text) async def send_controls(self, controls): self.controls = controls async def send_artifacts(self, artifacts): self.artifacts = artifacts async def send_done(self): self.done_count += 1 def reset(self): self.deltas.clear() class MockHud: """Captures HUD events.""" def __init__(self): self.events = [] async def __call__(self, data): self.events.append(data) def find(self, event): return [e for e in self.events if e.get("event") == event] class MockMemorizer: """Minimal memorizer for frame engine.""" def __init__(self): self.state = { "user_name": "test", "user_mood": "neutral", "topic": "testing", "topic_history": [], "language": "en", "style_hint": "casual", "facts": [], "user_expectation": "conversational", } def get_context_block(self, sensor_lines=None, ui_state=None): return "Memory: test context" async def update(self, history): pass class MockSensor: """Minimal sensor for frame engine.""" def __init__(self): self._flags = [] def note_user_activity(self): pass def update_browser_dashboard(self, dashboard): pass def get_context_lines(self): return ["Sensors: test"] def consume_flags(self): flags = self._flags[:] self._flags.clear() return flags class MockUINode: """Minimal UI node for frame engine.""" def __init__(self): self.thinker_controls = [] self.state = {} self._artifacts = [] @property def current_controls(self): return self.thinker_controls @current_controls.setter def current_controls(self, value): self.thinker_controls = value async def process(self, thought, history, memory_context=""): return self.thinker_controls def get_machine_summary(self): return "" def get_machine_controls(self): return [] def get_artifacts(self): return self._artifacts def try_machine_transition(self, action): return False, "" async def process_local_action(self, action, data): return None, [] class MockInputNode: """Returns a preconfigured Command.""" def __init__(self, intent="request", complexity="simple", topic="test", language="en"): self._intent = intent self._complexity = complexity self._topic = topic self._language = language async def process(self, envelope, history, memory_context="", identity="", channel=""): return Command( analysis=InputAnalysis( intent=self._intent, topic=self._topic, complexity=self._complexity, language=self._language, tone="casual", ), source_text=envelope.text, ) class MockOutputNode: """Streams response text via sink.""" async def process(self, thought, history, sink, memory_context=""): text = thought.response or "ok" for i in range(0, len(text), 12): await sink.send_delta(text[i:i+12]) await sink.send_done() return text class MockPANode: """Returns a preconfigured PARouting.""" def __init__(self, expert="eras", job="test query", thinking_msg="Working..."): self._expert = expert self._job = job self._thinking_msg = thinking_msg def set_available_experts(self, experts): pass async def route(self, command, history, memory_context="", identity="", channel=""): return PARouting( expert=self._expert, job=self._job, thinking_message=self._thinking_msg, language="en", ) async def route_retry(self, command, history, memory_context="", identity="", channel="", original_job="", errors=None): return PARouting(expert=self._expert, job=f"retry: {self._job}", language="en") class MockExpertNode: """Returns a preconfigured ThoughtResult.""" def __init__(self, response="expert result", tool_used="", tool_output="", errors=None): self._response = response self._tool_used = tool_used self._tool_output = tool_output self._errors = errors or [] self.send_hud = MockHud() async def execute(self, job, language): return ThoughtResult( response=self._response, tool_used=self._tool_used, tool_output=self._tool_output, errors=self._errors, ) class MockDirectorNode: """Returns a preconfigured DirectorPlan.""" def __init__(self, goal="test", tools=None, hint=""): self._goal = goal self._tools = tools or [] self._hint = hint async def decide(self, command, history, memory_context=""): return DirectorPlan( goal=self._goal, tool_sequence=self._tools, response_hint=self._hint, ) def get_context_line(self): return "" class MockThinkerNode: """Returns a preconfigured ThoughtResult.""" def __init__(self, response="thought result", tool_used="", tool_output=""): self._response = response self._tool_used = tool_used self._tool_output = tool_output async def process(self, command, plan=None, history=None, memory_context=""): return ThoughtResult( response=self._response, tool_used=self._tool_used, tool_output=self._tool_output, ) class MockInterpreterNode: """Returns a preconfigured InterpretedResult.""" async def interpret(self, tool_used, tool_output, job): return InterpretedResult( summary=f"Interpreted: {tool_used} returned data", row_count=5, key_facts=["5 rows"], ) def make_frame_engine(nodes, graph_name="v4-eras"): """Create a FrameEngine with mocked dependencies.""" graph = load_graph(graph_name) sink = MockSink() hud = MockHud() memorizer = MockMemorizer() sensor = MockSensor() ui = MockUINode() engine = FrameEngine( graph=graph, nodes=nodes, sink=sink, history=[], send_hud=hud, sensor=sensor, memorizer=memorizer, ui_node=ui, identity="test_user", channel="test", ) return engine, sink, hud # --- Tests --- def test_graph_load(): """load_graph returns correct structure for all frame-based graphs.""" for name in ["v3-framed", "v4-eras"]: g = load_graph(name) assert g["name"] == name, f"graph name mismatch: {g['name']} != {name}" assert g["engine"] == "frames", f"{name} should use frames engine" assert "nodes" in g and len(g["nodes"]) > 0, f"{name} has no nodes" assert "edges" in g and len(g["edges"]) > 0, f"{name} has no edges" assert "conditions" in g, f"{name} has no conditions" # v1 should be imperative g1 = load_graph("v1-current") assert g1["engine"] == "imperative", "v1 should be imperative" def test_node_instantiation(): """instantiate_nodes creates all roles from registry.""" hud = MockHud() for name in ["v3-framed", "v4-eras"]: g = load_graph(name) nodes = instantiate_nodes(g, hud) for role in g["nodes"]: assert role in nodes, f"missing node role '{role}' in {name}" # Check specific node types exist assert "input" in nodes assert "output" in nodes assert "memorizer" in nodes assert "sensor" in nodes def test_edge_types_complete(): """All 3 edge types present in graph definitions, no orphan nodes.""" for name in ["v3-framed", "v4-eras"]: g = load_graph(name) edges = g["edges"] edge_types = {e.get("type") for e in edges} assert "data" in edge_types, f"{name} missing data edges" assert "context" in edge_types, f"{name} missing context edges" assert "state" in edge_types, f"{name} missing state edges" # Every node should appear in at least one edge (from or to) node_roles = set(g["nodes"].keys()) edge_nodes = set() for e in edges: edge_nodes.add(e["from"]) to = e["to"] if isinstance(to, list): edge_nodes.update(to) else: edge_nodes.add(to) # runtime is a virtual target, not a real node edge_nodes.discard("runtime") missing = node_roles - edge_nodes assert not missing, f"{name} has orphan nodes: {missing}" def test_condition_reflex(): """_check_condition('reflex') fires on social+trivial only.""" engine, _, _ = make_frame_engine({ "input": MockInputNode(), "output": MockOutputNode(), "memorizer": MockMemorizer(), "sensor": MockSensor(), "ui": MockUINode(), }, "v4-eras") # Should fire cmd_social = Command( analysis=InputAnalysis(intent="social", complexity="trivial"), source_text="hi", ) assert engine._check_condition("reflex", command=cmd_social), \ "reflex should fire for social+trivial" # Should NOT fire cmd_request = Command( analysis=InputAnalysis(intent="request", complexity="simple"), source_text="show data", ) assert not engine._check_condition("reflex", command=cmd_request), \ "reflex should not fire for request+simple" cmd_social_complex = Command( analysis=InputAnalysis(intent="social", complexity="complex"), source_text="tell me a long story", ) assert not engine._check_condition("reflex", command=cmd_social_complex), \ "reflex should not fire for social+complex" def test_condition_tool_output(): """_check_condition('has_tool_output') fires when tool data present.""" engine, _, _ = make_frame_engine({ "input": MockInputNode(), "output": MockOutputNode(), "memorizer": MockMemorizer(), "sensor": MockSensor(), "ui": MockUINode(), }, "v4-eras") thought_with = ThoughtResult( response="data", tool_used="query_db", tool_output="rows here", ) assert engine._check_condition("has_tool_output", thought=thought_with), \ "should fire when tool_used and tool_output both set" thought_without = ThoughtResult(response="just text") assert not engine._check_condition("has_tool_output", thought=thought_without), \ "should not fire when no tool output" thought_partial = ThoughtResult(response="x", tool_used="query_db", tool_output="") assert not engine._check_condition("has_tool_output", thought=thought_partial), \ "should not fire when tool_output is empty string" def test_frame_trace_reflex(): """Reflex path: 2 frames (input → output), path='reflex'.""" nodes = { "input": MockInputNode(intent="social", complexity="trivial"), "output": MockOutputNode(), "pa": MockPANode(), "expert_eras": MockExpertNode(), "interpreter": MockInterpreterNode(), "memorizer": MockMemorizer(), "sensor": MockSensor(), "ui": MockUINode(), } engine, sink, hud = make_frame_engine(nodes, "v4-eras") result = asyncio.get_event_loop().run_until_complete( engine.process_message("hello") ) trace = result["trace"] assert trace["path"] == "reflex", f"expected reflex path, got {trace['path']}" assert trace["total_frames"] == 2, f"expected 2 frames, got {trace['total_frames']}" assert len(trace["frames"]) == 2 assert trace["frames"][0]["node"] == "input" assert trace["frames"][1]["node"] == "output" assert "reflex=True" in trace["frames"][0]["condition"] def test_frame_trace_expert(): """Expert path without tool output: F1(input)→F2(pa)→F3(expert)→F4(output+ui).""" nodes = { "input": MockInputNode(intent="request", complexity="simple"), "output": MockOutputNode(), "pa": MockPANode(expert="eras", job="get top customers"), "expert_eras": MockExpertNode(response="Here are the customers"), "interpreter": MockInterpreterNode(), "memorizer": MockMemorizer(), "sensor": MockSensor(), "ui": MockUINode(), } engine, sink, hud = make_frame_engine(nodes, "v4-eras") result = asyncio.get_event_loop().run_until_complete( engine.process_message("show top customers") ) trace = result["trace"] assert trace["path"] == "expert", f"expected expert path, got {trace['path']}" assert trace["total_frames"] >= 4, f"expected >=4 frames, got {trace['total_frames']}" nodes_in_trace = [f["node"] for f in trace["frames"]] assert nodes_in_trace[0] == "input" assert nodes_in_trace[1] == "pa" assert "expert_eras" in nodes_in_trace[2] def test_frame_trace_expert_with_interpreter(): """Expert path with tool output: includes interpreter frame, path='expert+interpreter'.""" nodes = { "input": MockInputNode(intent="request", complexity="simple"), "output": MockOutputNode(), "pa": MockPANode(expert="eras", job="query customers"), "expert_eras": MockExpertNode( response="raw data", tool_used="query_db", tool_output="customer_name,revenue\nAcme,1000", ), "interpreter": MockInterpreterNode(), "memorizer": MockMemorizer(), "sensor": MockSensor(), "ui": MockUINode(), } engine, sink, hud = make_frame_engine(nodes, "v4-eras") result = asyncio.get_event_loop().run_until_complete( engine.process_message("show customer revenue") ) trace = result["trace"] assert trace["path"] == "expert+interpreter", \ f"expected expert+interpreter path, got {trace['path']}" nodes_in_trace = [f["node"] for f in trace["frames"]] assert "interpreter" in nodes_in_trace, "interpreter frame missing" assert trace["total_frames"] >= 5, f"expected >=5 frames, got {trace['total_frames']}" # --- Phase 1: Config-driven models (RED — will fail until implemented) --- def test_graph_has_models(): """All graph definitions include a MODELS dict mapping role → model.""" for name in ["v1-current", "v2-director-drives", "v3-framed", "v4-eras"]: g = load_graph(name) assert "models" in g, f"{name}: graph should have a 'models' key" models = g["models"] assert isinstance(models, dict), f"{name}: models should be a dict" assert len(models) > 0, f"{name}: models should not be empty" for role, model in models.items(): assert isinstance(model, str) and "/" in model, \ f"{name}: model for '{role}' should be provider/model, got {model}" def test_instantiate_applies_graph_models(): """instantiate_nodes applies model from graph config, overriding class default.""" hud = MockHud() g = load_graph("v4-eras") # Override a model in graph config g["models"] = g.get("models", {}) g["models"]["input"] = "test/override-model" nodes = instantiate_nodes(g, hud) assert nodes["input"].model == "test/override-model", \ f"input node model should be 'test/override-model', got {nodes['input'].model}" def test_model_override_per_request(): """Engine accepts model overrides that are applied to nodes for one request.""" nodes = { "input": MockInputNode(intent="social", complexity="trivial"), "output": MockOutputNode(), "pa": MockPANode(), "expert_eras": MockExpertNode(), "interpreter": MockInterpreterNode(), "memorizer": MockMemorizer(), "sensor": MockSensor(), "ui": MockUINode(), } engine, sink, hud = make_frame_engine(nodes, "v4-eras") # process_message should accept model_overrides param result = asyncio.get_event_loop().run_until_complete( engine.process_message("hello", model_overrides={"input": "test/fast-model"}) ) # Should complete without error (overrides applied internally) assert result["trace"]["path"] == "reflex" # --- Test registry (for run_tests.py) --- TESTS = { # Green — engine mechanics 'graph_load': test_graph_load, 'node_instantiation': test_node_instantiation, 'edge_types_complete': test_edge_types_complete, 'condition_reflex': test_condition_reflex, 'condition_tool_output': test_condition_tool_output, 'frame_trace_reflex': test_frame_trace_reflex, 'frame_trace_expert': test_frame_trace_expert, 'frame_trace_expert_with_interpreter': test_frame_trace_expert_with_interpreter, # Red — Phase 1: config-driven models 'graph_has_models': test_graph_has_models, 'instantiate_applies_graph_models': test_instantiate_applies_graph_models, 'model_override_per_request': test_model_override_per_request, }