This repository has been archived on 2026-04-03. You can view files and clone it, but cannot push or open issues or pull requests.
agent-runtime/tests/test_engine.py
Nico 097c7f31f3 Add engine test suite: 8 tests for graph loading, conditions, frame traces
New 'engine' suite in run_tests.py with tests that verify frame engine
mechanics without LLM calls. Covers graph loading, node instantiation,
edge type completeness, reflex/tool_output conditions, and frame trace
structure for reflex/expert/expert+interpreter pipelines.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 18:01:06 +02:00

492 lines
16 KiB
Python

"""Engine test suite — tests graph loading, node instantiation, frame engine
routing, conditions, and trace structure. No LLM calls — all nodes mocked.
Tests:
graph_load — load_graph returns correct structure for all graphs
node_instantiation — instantiate_nodes creates all roles from registry
edge_types_complete — all 3 edge types present, no orphan nodes
condition_reflex — reflex condition fires on social+trivial only
condition_tool_output — has_tool_output condition fires when tool data present
frame_trace_reflex — reflex path produces 2-frame trace
frame_trace_expert — expert path produces correct frame sequence
frame_trace_director — director path produces correct frame sequence
"""
import asyncio
import os
import sys
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from agent.engine import load_graph, instantiate_nodes, _graph_from_module
from agent.frame_engine import FrameEngine, FrameTrace, FrameRecord
from agent.types import (
Envelope, Command, InputAnalysis, ThoughtResult,
DirectorPlan, PARouting, InterpretedResult, Artifact,
)
# --- Helpers ---
class MockSink:
"""Captures streamed output."""
def __init__(self):
self.deltas = []
self.controls = []
self.artifacts = []
self.done_count = 0
async def send_delta(self, text):
self.deltas.append(text)
async def send_controls(self, controls):
self.controls = controls
async def send_artifacts(self, artifacts):
self.artifacts = artifacts
async def send_done(self):
self.done_count += 1
def reset(self):
self.deltas.clear()
class MockHud:
"""Captures HUD events."""
def __init__(self):
self.events = []
async def __call__(self, data):
self.events.append(data)
def find(self, event):
return [e for e in self.events if e.get("event") == event]
class MockMemorizer:
"""Minimal memorizer for frame engine."""
def __init__(self):
self.state = {
"user_name": "test",
"user_mood": "neutral",
"topic": "testing",
"topic_history": [],
"language": "en",
"style_hint": "casual",
"facts": [],
"user_expectation": "conversational",
}
def get_context_block(self, sensor_lines=None, ui_state=None):
return "Memory: test context"
async def update(self, history):
pass
class MockSensor:
"""Minimal sensor for frame engine."""
def __init__(self):
self._flags = []
def note_user_activity(self):
pass
def update_browser_dashboard(self, dashboard):
pass
def get_context_lines(self):
return ["Sensors: test"]
def consume_flags(self):
flags = self._flags[:]
self._flags.clear()
return flags
class MockUINode:
"""Minimal UI node for frame engine."""
def __init__(self):
self.thinker_controls = []
self.state = {}
self._artifacts = []
@property
def current_controls(self):
return self.thinker_controls
@current_controls.setter
def current_controls(self, value):
self.thinker_controls = value
async def process(self, thought, history, memory_context=""):
return self.thinker_controls
def get_machine_summary(self):
return ""
def get_machine_controls(self):
return []
def get_artifacts(self):
return self._artifacts
def try_machine_transition(self, action):
return False, ""
async def process_local_action(self, action, data):
return None, []
class MockInputNode:
"""Returns a preconfigured Command."""
def __init__(self, intent="request", complexity="simple", topic="test", language="en"):
self._intent = intent
self._complexity = complexity
self._topic = topic
self._language = language
async def process(self, envelope, history, memory_context="", identity="", channel=""):
return Command(
analysis=InputAnalysis(
intent=self._intent, topic=self._topic,
complexity=self._complexity, language=self._language,
tone="casual",
),
source_text=envelope.text,
)
class MockOutputNode:
"""Streams response text via sink."""
async def process(self, thought, history, sink, memory_context=""):
text = thought.response or "ok"
for i in range(0, len(text), 12):
await sink.send_delta(text[i:i+12])
await sink.send_done()
return text
class MockPANode:
"""Returns a preconfigured PARouting."""
def __init__(self, expert="eras", job="test query", thinking_msg="Working..."):
self._expert = expert
self._job = job
self._thinking_msg = thinking_msg
def set_available_experts(self, experts):
pass
async def route(self, command, history, memory_context="", identity="", channel=""):
return PARouting(
expert=self._expert,
job=self._job,
thinking_message=self._thinking_msg,
language="en",
)
async def route_retry(self, command, history, memory_context="", identity="",
channel="", original_job="", errors=None):
return PARouting(expert=self._expert, job=f"retry: {self._job}", language="en")
class MockExpertNode:
"""Returns a preconfigured ThoughtResult."""
def __init__(self, response="expert result", tool_used="", tool_output="", errors=None):
self._response = response
self._tool_used = tool_used
self._tool_output = tool_output
self._errors = errors or []
self.send_hud = MockHud()
async def execute(self, job, language):
return ThoughtResult(
response=self._response,
tool_used=self._tool_used,
tool_output=self._tool_output,
errors=self._errors,
)
class MockDirectorNode:
"""Returns a preconfigured DirectorPlan."""
def __init__(self, goal="test", tools=None, hint=""):
self._goal = goal
self._tools = tools or []
self._hint = hint
async def decide(self, command, history, memory_context=""):
return DirectorPlan(
goal=self._goal,
tool_sequence=self._tools,
response_hint=self._hint,
)
def get_context_line(self):
return ""
class MockThinkerNode:
"""Returns a preconfigured ThoughtResult."""
def __init__(self, response="thought result", tool_used="", tool_output=""):
self._response = response
self._tool_used = tool_used
self._tool_output = tool_output
async def process(self, command, plan=None, history=None, memory_context=""):
return ThoughtResult(
response=self._response,
tool_used=self._tool_used,
tool_output=self._tool_output,
)
class MockInterpreterNode:
"""Returns a preconfigured InterpretedResult."""
async def interpret(self, tool_used, tool_output, job):
return InterpretedResult(
summary=f"Interpreted: {tool_used} returned data",
row_count=5,
key_facts=["5 rows"],
)
def make_frame_engine(nodes, graph_name="v4-eras"):
"""Create a FrameEngine with mocked dependencies."""
graph = load_graph(graph_name)
sink = MockSink()
hud = MockHud()
memorizer = MockMemorizer()
sensor = MockSensor()
ui = MockUINode()
engine = FrameEngine(
graph=graph,
nodes=nodes,
sink=sink,
history=[],
send_hud=hud,
sensor=sensor,
memorizer=memorizer,
ui_node=ui,
identity="test_user",
channel="test",
)
return engine, sink, hud
# --- Tests ---
def test_graph_load():
"""load_graph returns correct structure for all frame-based graphs."""
for name in ["v3-framed", "v4-eras"]:
g = load_graph(name)
assert g["name"] == name, f"graph name mismatch: {g['name']} != {name}"
assert g["engine"] == "frames", f"{name} should use frames engine"
assert "nodes" in g and len(g["nodes"]) > 0, f"{name} has no nodes"
assert "edges" in g and len(g["edges"]) > 0, f"{name} has no edges"
assert "conditions" in g, f"{name} has no conditions"
# v1 should be imperative
g1 = load_graph("v1-current")
assert g1["engine"] == "imperative", "v1 should be imperative"
def test_node_instantiation():
"""instantiate_nodes creates all roles from registry."""
hud = MockHud()
for name in ["v3-framed", "v4-eras"]:
g = load_graph(name)
nodes = instantiate_nodes(g, hud)
for role in g["nodes"]:
assert role in nodes, f"missing node role '{role}' in {name}"
# Check specific node types exist
assert "input" in nodes
assert "output" in nodes
assert "memorizer" in nodes
assert "sensor" in nodes
def test_edge_types_complete():
"""All 3 edge types present in graph definitions, no orphan nodes."""
for name in ["v3-framed", "v4-eras"]:
g = load_graph(name)
edges = g["edges"]
edge_types = {e.get("type") for e in edges}
assert "data" in edge_types, f"{name} missing data edges"
assert "context" in edge_types, f"{name} missing context edges"
assert "state" in edge_types, f"{name} missing state edges"
# Every node should appear in at least one edge (from or to)
node_roles = set(g["nodes"].keys())
edge_nodes = set()
for e in edges:
edge_nodes.add(e["from"])
to = e["to"]
if isinstance(to, list):
edge_nodes.update(to)
else:
edge_nodes.add(to)
# runtime is a virtual target, not a real node
edge_nodes.discard("runtime")
missing = node_roles - edge_nodes
assert not missing, f"{name} has orphan nodes: {missing}"
def test_condition_reflex():
"""_check_condition('reflex') fires on social+trivial only."""
engine, _, _ = make_frame_engine({
"input": MockInputNode(),
"output": MockOutputNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}, "v4-eras")
# Should fire
cmd_social = Command(
analysis=InputAnalysis(intent="social", complexity="trivial"),
source_text="hi",
)
assert engine._check_condition("reflex", command=cmd_social), \
"reflex should fire for social+trivial"
# Should NOT fire
cmd_request = Command(
analysis=InputAnalysis(intent="request", complexity="simple"),
source_text="show data",
)
assert not engine._check_condition("reflex", command=cmd_request), \
"reflex should not fire for request+simple"
cmd_social_complex = Command(
analysis=InputAnalysis(intent="social", complexity="complex"),
source_text="tell me a long story",
)
assert not engine._check_condition("reflex", command=cmd_social_complex), \
"reflex should not fire for social+complex"
def test_condition_tool_output():
"""_check_condition('has_tool_output') fires when tool data present."""
engine, _, _ = make_frame_engine({
"input": MockInputNode(),
"output": MockOutputNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}, "v4-eras")
thought_with = ThoughtResult(
response="data", tool_used="query_db", tool_output="rows here",
)
assert engine._check_condition("has_tool_output", thought=thought_with), \
"should fire when tool_used and tool_output both set"
thought_without = ThoughtResult(response="just text")
assert not engine._check_condition("has_tool_output", thought=thought_without), \
"should not fire when no tool output"
thought_partial = ThoughtResult(response="x", tool_used="query_db", tool_output="")
assert not engine._check_condition("has_tool_output", thought=thought_partial), \
"should not fire when tool_output is empty string"
def test_frame_trace_reflex():
"""Reflex path: 2 frames (input → output), path='reflex'."""
nodes = {
"input": MockInputNode(intent="social", complexity="trivial"),
"output": MockOutputNode(),
"pa": MockPANode(),
"expert_eras": MockExpertNode(),
"interpreter": MockInterpreterNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
result = asyncio.get_event_loop().run_until_complete(
engine.process_message("hello")
)
trace = result["trace"]
assert trace["path"] == "reflex", f"expected reflex path, got {trace['path']}"
assert trace["total_frames"] == 2, f"expected 2 frames, got {trace['total_frames']}"
assert len(trace["frames"]) == 2
assert trace["frames"][0]["node"] == "input"
assert trace["frames"][1]["node"] == "output"
assert "reflex=True" in trace["frames"][0]["condition"]
def test_frame_trace_expert():
"""Expert path without tool output: F1(input)→F2(pa)→F3(expert)→F4(output+ui)."""
nodes = {
"input": MockInputNode(intent="request", complexity="simple"),
"output": MockOutputNode(),
"pa": MockPANode(expert="eras", job="get top customers"),
"expert_eras": MockExpertNode(response="Here are the customers"),
"interpreter": MockInterpreterNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
result = asyncio.get_event_loop().run_until_complete(
engine.process_message("show top customers")
)
trace = result["trace"]
assert trace["path"] == "expert", f"expected expert path, got {trace['path']}"
assert trace["total_frames"] >= 4, f"expected >=4 frames, got {trace['total_frames']}"
nodes_in_trace = [f["node"] for f in trace["frames"]]
assert nodes_in_trace[0] == "input"
assert nodes_in_trace[1] == "pa"
assert "expert_eras" in nodes_in_trace[2]
def test_frame_trace_expert_with_interpreter():
"""Expert path with tool output: includes interpreter frame, path='expert+interpreter'."""
nodes = {
"input": MockInputNode(intent="request", complexity="simple"),
"output": MockOutputNode(),
"pa": MockPANode(expert="eras", job="query customers"),
"expert_eras": MockExpertNode(
response="raw data",
tool_used="query_db",
tool_output="customer_name,revenue\nAcme,1000",
),
"interpreter": MockInterpreterNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
result = asyncio.get_event_loop().run_until_complete(
engine.process_message("show customer revenue")
)
trace = result["trace"]
assert trace["path"] == "expert+interpreter", \
f"expected expert+interpreter path, got {trace['path']}"
nodes_in_trace = [f["node"] for f in trace["frames"]]
assert "interpreter" in nodes_in_trace, "interpreter frame missing"
assert trace["total_frames"] >= 5, f"expected >=5 frames, got {trace['total_frames']}"
# --- Test registry (for run_tests.py) ---
TESTS = {
'graph_load': test_graph_load,
'node_instantiation': test_node_instantiation,
'edge_types_complete': test_edge_types_complete,
'condition_reflex': test_condition_reflex,
'condition_tool_output': test_condition_tool_output,
'frame_trace_reflex': test_frame_trace_reflex,
'frame_trace_expert': test_frame_trace_expert,
'frame_trace_expert_with_interpreter': test_frame_trace_expert_with_interpreter,
}