This repository has been archived on 2026-04-03. You can view files and clone it, but cannot push or open issues or pull requests.
agent-runtime/tests/test_engine.py
Nico d8e832d2d4 Add --parallel=N for concurrent test execution
- run_tests.py: ThreadPoolExecutor runs N tests concurrently within a suite
- Each testcase has its own session_id so parallel is safe
- Engine tests: fixed asyncio.new_event_loop() for thread safety
- Usage: python tests/run_tests.py testcases --parallel=3
- Wall time reduction: ~3x for testcases (15min → 5min with parallel=3)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 20:01:06 +02:00

621 lines
21 KiB
Python

"""Engine test suite — tests graph loading, node instantiation, frame engine
routing, conditions, and trace structure. No LLM calls — all nodes mocked.
Tests:
graph_load — load_graph returns correct structure for all graphs
node_instantiation — instantiate_nodes creates all roles from registry
edge_types_complete — all 3 edge types present, no orphan nodes
condition_reflex — reflex condition fires on social+trivial only
condition_tool_output — has_tool_output condition fires when tool data present
frame_trace_reflex — reflex path produces 2-frame trace
frame_trace_expert — expert path produces correct frame sequence
frame_trace_director — director path produces correct frame sequence
"""
import asyncio
import os
import sys
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from agent.engine import load_graph, instantiate_nodes, _graph_from_module
from agent.frame_engine import FrameEngine, FrameTrace, FrameRecord
from agent.types import (
Envelope, Command, InputAnalysis, ThoughtResult,
DirectorPlan, PARouting, InterpretedResult, Artifact,
)
# --- Helpers ---
class MockSink:
"""Captures streamed output."""
def __init__(self):
self.deltas = []
self.controls = []
self.artifacts = []
self.done_count = 0
async def send_delta(self, text):
self.deltas.append(text)
async def send_controls(self, controls):
self.controls = controls
async def send_artifacts(self, artifacts):
self.artifacts = artifacts
async def send_done(self):
self.done_count += 1
def reset(self):
self.deltas.clear()
class MockHud:
"""Captures HUD events."""
def __init__(self):
self.events = []
async def __call__(self, data):
self.events.append(data)
def find(self, event):
return [e for e in self.events if e.get("event") == event]
class MockMemorizer:
"""Minimal memorizer for frame engine."""
def __init__(self):
self.state = {
"user_name": "test",
"user_mood": "neutral",
"topic": "testing",
"topic_history": [],
"language": "en",
"style_hint": "casual",
"facts": [],
"user_expectation": "conversational",
}
def get_context_block(self, sensor_lines=None, ui_state=None):
return "Memory: test context"
async def update(self, history):
pass
class MockSensor:
"""Minimal sensor for frame engine."""
def __init__(self):
self._flags = []
def note_user_activity(self):
pass
def update_browser_dashboard(self, dashboard):
pass
def get_context_lines(self):
return ["Sensors: test"]
def consume_flags(self):
flags = self._flags[:]
self._flags.clear()
return flags
class MockUINode:
"""Minimal UI node for frame engine."""
def __init__(self):
self.thinker_controls = []
self.state = {}
self._artifacts = []
@property
def current_controls(self):
return self.thinker_controls
@current_controls.setter
def current_controls(self, value):
self.thinker_controls = value
async def process(self, thought, history, memory_context=""):
return self.thinker_controls
def get_machine_summary(self):
return ""
def get_machine_controls(self):
return []
def get_artifacts(self):
return self._artifacts
def try_machine_transition(self, action):
return False, ""
async def process_local_action(self, action, data):
return None, []
class MockInputNode:
"""Returns a preconfigured Command."""
def __init__(self, intent="request", complexity="simple", topic="test", language="en"):
self._intent = intent
self._complexity = complexity
self._topic = topic
self._language = language
async def process(self, envelope, history, memory_context="", identity="", channel=""):
return Command(
analysis=InputAnalysis(
intent=self._intent, topic=self._topic,
complexity=self._complexity, language=self._language,
tone="casual",
),
source_text=envelope.text,
)
class MockOutputNode:
"""Streams response text via sink."""
async def process(self, thought, history, sink, memory_context=""):
text = thought.response or "ok"
for i in range(0, len(text), 12):
await sink.send_delta(text[i:i+12])
await sink.send_done()
return text
class MockPANode:
"""Returns a preconfigured PARouting."""
def __init__(self, expert="eras", job="test query", thinking_msg="Working..."):
self._expert = expert
self._job = job
self._thinking_msg = thinking_msg
def set_available_experts(self, experts):
pass
async def route(self, command, history, memory_context="", identity="", channel=""):
return PARouting(
expert=self._expert,
job=self._job,
thinking_message=self._thinking_msg,
language="en",
)
async def route_retry(self, command, history, memory_context="", identity="",
channel="", original_job="", errors=None):
return PARouting(expert=self._expert, job=f"retry: {self._job}", language="en")
class MockExpertNode:
"""Returns a preconfigured ThoughtResult."""
def __init__(self, response="expert result", tool_used="", tool_output="", errors=None):
self._response = response
self._tool_used = tool_used
self._tool_output = tool_output
self._errors = errors or []
self.send_hud = MockHud()
async def execute(self, job, language):
return ThoughtResult(
response=self._response,
tool_used=self._tool_used,
tool_output=self._tool_output,
errors=self._errors,
)
class MockDirectorNode:
"""Returns a preconfigured DirectorPlan."""
def __init__(self, goal="test", tools=None, hint=""):
self._goal = goal
self._tools = tools or []
self._hint = hint
async def decide(self, command, history, memory_context=""):
return DirectorPlan(
goal=self._goal,
tool_sequence=self._tools,
response_hint=self._hint,
)
def get_context_line(self):
return ""
class MockThinkerNode:
"""Returns a preconfigured ThoughtResult."""
def __init__(self, response="thought result", tool_used="", tool_output=""):
self._response = response
self._tool_used = tool_used
self._tool_output = tool_output
async def process(self, command, plan=None, history=None, memory_context=""):
return ThoughtResult(
response=self._response,
tool_used=self._tool_used,
tool_output=self._tool_output,
)
class MockInterpreterNode:
"""Returns a preconfigured InterpretedResult."""
async def interpret(self, tool_used, tool_output, job):
return InterpretedResult(
summary=f"Interpreted: {tool_used} returned data",
row_count=5,
key_facts=["5 rows"],
)
def make_frame_engine(nodes, graph_name="v4-eras"):
"""Create a FrameEngine with mocked dependencies."""
graph = load_graph(graph_name)
sink = MockSink()
hud = MockHud()
memorizer = MockMemorizer()
sensor = MockSensor()
ui = MockUINode()
engine = FrameEngine(
graph=graph,
nodes=nodes,
sink=sink,
history=[],
send_hud=hud,
sensor=sensor,
memorizer=memorizer,
ui_node=ui,
identity="test_user",
channel="test",
)
return engine, sink, hud
# --- Tests ---
def test_graph_load():
"""load_graph returns correct structure for all frame-based graphs."""
for name in ["v3-framed", "v4-eras"]:
g = load_graph(name)
assert g["name"] == name, f"graph name mismatch: {g['name']} != {name}"
assert g["engine"] == "frames", f"{name} should use frames engine"
assert "nodes" in g and len(g["nodes"]) > 0, f"{name} has no nodes"
assert "edges" in g and len(g["edges"]) > 0, f"{name} has no edges"
assert "conditions" in g, f"{name} has no conditions"
# v1 should be imperative
g1 = load_graph("v1-current")
assert g1["engine"] == "imperative", "v1 should be imperative"
def test_node_instantiation():
"""instantiate_nodes creates all roles from registry."""
hud = MockHud()
for name in ["v3-framed", "v4-eras"]:
g = load_graph(name)
nodes = instantiate_nodes(g, hud)
for role in g["nodes"]:
assert role in nodes, f"missing node role '{role}' in {name}"
# Check specific node types exist
assert "input" in nodes
assert "output" in nodes
assert "memorizer" in nodes
assert "sensor" in nodes
def test_edge_types_complete():
"""All 3 edge types present in graph definitions, no orphan nodes."""
for name in ["v3-framed", "v4-eras"]:
g = load_graph(name)
edges = g["edges"]
edge_types = {e.get("type") for e in edges}
assert "data" in edge_types, f"{name} missing data edges"
assert "context" in edge_types, f"{name} missing context edges"
assert "state" in edge_types, f"{name} missing state edges"
# Every node should appear in at least one edge (from or to)
node_roles = set(g["nodes"].keys())
edge_nodes = set()
for e in edges:
edge_nodes.add(e["from"])
to = e["to"]
if isinstance(to, list):
edge_nodes.update(to)
else:
edge_nodes.add(to)
# runtime is a virtual target, not a real node
edge_nodes.discard("runtime")
missing = node_roles - edge_nodes
assert not missing, f"{name} has orphan nodes: {missing}"
def test_condition_reflex():
"""_check_condition('reflex') fires on social+trivial only."""
engine, _, _ = make_frame_engine({
"input": MockInputNode(),
"output": MockOutputNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}, "v4-eras")
# Should fire
cmd_social = Command(
analysis=InputAnalysis(intent="social", complexity="trivial"),
source_text="hi",
)
assert engine._check_condition("reflex", command=cmd_social), \
"reflex should fire for social+trivial"
# Should NOT fire
cmd_request = Command(
analysis=InputAnalysis(intent="request", complexity="simple"),
source_text="show data",
)
assert not engine._check_condition("reflex", command=cmd_request), \
"reflex should not fire for request+simple"
cmd_social_complex = Command(
analysis=InputAnalysis(intent="social", complexity="complex"),
source_text="tell me a long story",
)
assert not engine._check_condition("reflex", command=cmd_social_complex), \
"reflex should not fire for social+complex"
def test_condition_tool_output():
"""_check_condition('has_tool_output') fires when tool data present."""
engine, _, _ = make_frame_engine({
"input": MockInputNode(),
"output": MockOutputNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}, "v4-eras")
thought_with = ThoughtResult(
response="data", tool_used="query_db", tool_output="rows here",
)
assert engine._check_condition("has_tool_output", thought=thought_with), \
"should fire when tool_used and tool_output both set"
thought_without = ThoughtResult(response="just text")
assert not engine._check_condition("has_tool_output", thought=thought_without), \
"should not fire when no tool output"
thought_partial = ThoughtResult(response="x", tool_used="query_db", tool_output="")
assert not engine._check_condition("has_tool_output", thought=thought_partial), \
"should not fire when tool_output is empty string"
def test_frame_trace_reflex():
"""Reflex path: 2 frames (input → output), path='reflex'."""
nodes = {
"input": MockInputNode(intent="social", complexity="trivial"),
"output": MockOutputNode(),
"pa": MockPANode(),
"expert_eras": MockExpertNode(),
"interpreter": MockInterpreterNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
result = asyncio.new_event_loop().run_until_complete(
engine.process_message("hello")
)
trace = result["trace"]
assert trace["path"] == "reflex", f"expected reflex path, got {trace['path']}"
assert trace["total_frames"] == 2, f"expected 2 frames, got {trace['total_frames']}"
assert len(trace["frames"]) == 2
assert trace["frames"][0]["node"] == "input"
assert trace["frames"][1]["node"] == "output"
assert "reflex=True" in trace["frames"][0]["condition"]
def test_frame_trace_expert():
"""Expert path without tool output: F1(input)→F2(pa)→F3(expert)→F4(output+ui)."""
nodes = {
"input": MockInputNode(intent="request", complexity="simple"),
"output": MockOutputNode(),
"pa": MockPANode(expert="eras", job="get top customers"),
"expert_eras": MockExpertNode(response="Here are the customers"),
"interpreter": MockInterpreterNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
result = asyncio.new_event_loop().run_until_complete(
engine.process_message("show top customers")
)
trace = result["trace"]
assert trace["path"] == "expert", f"expected expert path, got {trace['path']}"
assert trace["total_frames"] >= 4, f"expected >=4 frames, got {trace['total_frames']}"
nodes_in_trace = [f["node"] for f in trace["frames"]]
assert nodes_in_trace[0] == "input"
assert nodes_in_trace[1] == "pa"
assert "expert_eras" in nodes_in_trace[2]
def test_frame_trace_expert_with_interpreter():
"""Expert path with tool output: includes interpreter frame, path='expert+interpreter'."""
nodes = {
"input": MockInputNode(intent="request", complexity="simple"),
"output": MockOutputNode(),
"pa": MockPANode(expert="eras", job="query customers"),
"expert_eras": MockExpertNode(
response="raw data",
tool_used="query_db",
tool_output="customer_name,revenue\nAcme,1000",
),
"interpreter": MockInterpreterNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
result = asyncio.new_event_loop().run_until_complete(
engine.process_message("show customer revenue")
)
trace = result["trace"]
assert trace["path"] == "expert+interpreter", \
f"expected expert+interpreter path, got {trace['path']}"
nodes_in_trace = [f["node"] for f in trace["frames"]]
assert "interpreter" in nodes_in_trace, "interpreter frame missing"
assert trace["total_frames"] >= 5, f"expected >=5 frames, got {trace['total_frames']}"
# --- Phase 1: Config-driven models (RED — will fail until implemented) ---
def test_graph_has_models():
"""All graph definitions include a MODELS dict mapping role → model."""
for name in ["v1-current", "v2-director-drives", "v3-framed", "v4-eras"]:
g = load_graph(name)
assert "models" in g, f"{name}: graph should have a 'models' key"
models = g["models"]
assert isinstance(models, dict), f"{name}: models should be a dict"
assert len(models) > 0, f"{name}: models should not be empty"
for role, model in models.items():
assert isinstance(model, str) and "/" in model, \
f"{name}: model for '{role}' should be provider/model, got {model}"
def test_instantiate_applies_graph_models():
"""instantiate_nodes applies model from graph config, overriding class default."""
hud = MockHud()
g = load_graph("v4-eras")
# Override a model in graph config
g["models"] = g.get("models", {})
g["models"]["input"] = "test/override-model"
nodes = instantiate_nodes(g, hud)
assert nodes["input"].model == "test/override-model", \
f"input node model should be 'test/override-model', got {nodes['input'].model}"
def test_model_override_per_request():
"""Engine accepts model overrides that are applied to nodes for one request."""
nodes = {
"input": MockInputNode(intent="social", complexity="trivial"),
"output": MockOutputNode(),
"pa": MockPANode(),
"expert_eras": MockExpertNode(),
"interpreter": MockInterpreterNode(),
"memorizer": MockMemorizer(),
"sensor": MockSensor(),
"ui": MockUINode(),
}
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
# process_message should accept model_overrides param
result = asyncio.new_event_loop().run_until_complete(
engine.process_message("hello", model_overrides={"input": "test/fast-model"})
)
# Should complete without error (overrides applied internally)
assert result["trace"]["path"] == "reflex"
# --- Phase 2: Shared Node Pool (RED — will fail until implemented) ---
def test_pool_creates_shared_nodes():
"""NodePool creates shared instances for stateless nodes."""
from agent.node_pool import NodePool
pool = NodePool("v4-eras")
# Shared nodes should exist
assert "input" in pool.shared, "input should be shared"
assert "output" in pool.shared, "output should be shared"
assert "pa" in pool.shared, "pa should be shared"
assert "expert_eras" in pool.shared, "expert_eras should be shared"
assert "interpreter" in pool.shared, "interpreter should be shared"
def test_pool_excludes_stateful():
"""NodePool excludes stateful nodes (sensor, memorizer, ui)."""
from agent.node_pool import NodePool
pool = NodePool("v4-eras")
assert "sensor" not in pool.shared, "sensor should NOT be shared"
assert "memorizer" not in pool.shared, "memorizer should NOT be shared"
assert "ui" not in pool.shared, "ui should NOT be shared"
def test_pool_reuses_instances():
"""Two Runtimes using the same pool share node objects."""
from agent.node_pool import NodePool
pool = NodePool("v4-eras")
# Same pool → same node instances
input1 = pool.shared["input"]
input2 = pool.shared["input"]
assert input1 is input2, "pool should return same instance"
def test_contextvar_hud_isolation():
"""Contextvars isolate HUD events between concurrent tasks."""
from agent.nodes.base import _current_hud
results_a = []
results_b = []
async def hud_a(data):
results_a.append(data)
async def hud_b(data):
results_b.append(data)
async def task_a():
_current_hud.set(hud_a)
# Simulate work with a yield point
await asyncio.sleep(0)
hud_fn = _current_hud.get()
await hud_fn({"from": "a"})
async def task_b():
_current_hud.set(hud_b)
await asyncio.sleep(0)
hud_fn = _current_hud.get()
await hud_fn({"from": "b"})
async def run_both():
await asyncio.gather(task_a(), task_b())
asyncio.new_event_loop().run_until_complete(run_both())
assert len(results_a) == 1 and results_a[0]["from"] == "a", \
f"task_a HUD leaked: {results_a}"
assert len(results_b) == 1 and results_b[0]["from"] == "b", \
f"task_b HUD leaked: {results_b}"
# --- Test registry (for run_tests.py) ---
TESTS = {
# Green — engine mechanics
'graph_load': test_graph_load,
'node_instantiation': test_node_instantiation,
'edge_types_complete': test_edge_types_complete,
'condition_reflex': test_condition_reflex,
'condition_tool_output': test_condition_tool_output,
'frame_trace_reflex': test_frame_trace_reflex,
'frame_trace_expert': test_frame_trace_expert,
'frame_trace_expert_with_interpreter': test_frame_trace_expert_with_interpreter,
# Phase 1: config-driven models
'graph_has_models': test_graph_has_models,
'instantiate_applies_graph_models': test_instantiate_applies_graph_models,
'model_override_per_request': test_model_override_per_request,
# Phase 2: shared node pool
'pool_creates_shared_nodes': test_pool_creates_shared_nodes,
'pool_excludes_stateful': test_pool_excludes_stateful,
'pool_reuses_instances': test_pool_reuses_instances,
'contextvar_hud_isolation': test_contextvar_hud_isolation,
}