- run_tests.py: ThreadPoolExecutor runs N tests concurrently within a suite - Each testcase has its own session_id so parallel is safe - Engine tests: fixed asyncio.new_event_loop() for thread safety - Usage: python tests/run_tests.py testcases --parallel=3 - Wall time reduction: ~3x for testcases (15min → 5min with parallel=3) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
621 lines
21 KiB
Python
621 lines
21 KiB
Python
"""Engine test suite — tests graph loading, node instantiation, frame engine
|
|
routing, conditions, and trace structure. No LLM calls — all nodes mocked.
|
|
|
|
Tests:
|
|
graph_load — load_graph returns correct structure for all graphs
|
|
node_instantiation — instantiate_nodes creates all roles from registry
|
|
edge_types_complete — all 3 edge types present, no orphan nodes
|
|
condition_reflex — reflex condition fires on social+trivial only
|
|
condition_tool_output — has_tool_output condition fires when tool data present
|
|
frame_trace_reflex — reflex path produces 2-frame trace
|
|
frame_trace_expert — expert path produces correct frame sequence
|
|
frame_trace_director — director path produces correct frame sequence
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from agent.engine import load_graph, instantiate_nodes, _graph_from_module
|
|
from agent.frame_engine import FrameEngine, FrameTrace, FrameRecord
|
|
from agent.types import (
|
|
Envelope, Command, InputAnalysis, ThoughtResult,
|
|
DirectorPlan, PARouting, InterpretedResult, Artifact,
|
|
)
|
|
|
|
|
|
# --- Helpers ---
|
|
|
|
class MockSink:
|
|
"""Captures streamed output."""
|
|
def __init__(self):
|
|
self.deltas = []
|
|
self.controls = []
|
|
self.artifacts = []
|
|
self.done_count = 0
|
|
|
|
async def send_delta(self, text):
|
|
self.deltas.append(text)
|
|
|
|
async def send_controls(self, controls):
|
|
self.controls = controls
|
|
|
|
async def send_artifacts(self, artifacts):
|
|
self.artifacts = artifacts
|
|
|
|
async def send_done(self):
|
|
self.done_count += 1
|
|
|
|
def reset(self):
|
|
self.deltas.clear()
|
|
|
|
|
|
class MockHud:
|
|
"""Captures HUD events."""
|
|
def __init__(self):
|
|
self.events = []
|
|
|
|
async def __call__(self, data):
|
|
self.events.append(data)
|
|
|
|
def find(self, event):
|
|
return [e for e in self.events if e.get("event") == event]
|
|
|
|
|
|
class MockMemorizer:
|
|
"""Minimal memorizer for frame engine."""
|
|
def __init__(self):
|
|
self.state = {
|
|
"user_name": "test",
|
|
"user_mood": "neutral",
|
|
"topic": "testing",
|
|
"topic_history": [],
|
|
"language": "en",
|
|
"style_hint": "casual",
|
|
"facts": [],
|
|
"user_expectation": "conversational",
|
|
}
|
|
|
|
def get_context_block(self, sensor_lines=None, ui_state=None):
|
|
return "Memory: test context"
|
|
|
|
async def update(self, history):
|
|
pass
|
|
|
|
|
|
class MockSensor:
|
|
"""Minimal sensor for frame engine."""
|
|
def __init__(self):
|
|
self._flags = []
|
|
|
|
def note_user_activity(self):
|
|
pass
|
|
|
|
def update_browser_dashboard(self, dashboard):
|
|
pass
|
|
|
|
def get_context_lines(self):
|
|
return ["Sensors: test"]
|
|
|
|
def consume_flags(self):
|
|
flags = self._flags[:]
|
|
self._flags.clear()
|
|
return flags
|
|
|
|
|
|
class MockUINode:
|
|
"""Minimal UI node for frame engine."""
|
|
def __init__(self):
|
|
self.thinker_controls = []
|
|
self.state = {}
|
|
self._artifacts = []
|
|
|
|
@property
|
|
def current_controls(self):
|
|
return self.thinker_controls
|
|
|
|
@current_controls.setter
|
|
def current_controls(self, value):
|
|
self.thinker_controls = value
|
|
|
|
async def process(self, thought, history, memory_context=""):
|
|
return self.thinker_controls
|
|
|
|
def get_machine_summary(self):
|
|
return ""
|
|
|
|
def get_machine_controls(self):
|
|
return []
|
|
|
|
def get_artifacts(self):
|
|
return self._artifacts
|
|
|
|
def try_machine_transition(self, action):
|
|
return False, ""
|
|
|
|
async def process_local_action(self, action, data):
|
|
return None, []
|
|
|
|
|
|
class MockInputNode:
|
|
"""Returns a preconfigured Command."""
|
|
def __init__(self, intent="request", complexity="simple", topic="test", language="en"):
|
|
self._intent = intent
|
|
self._complexity = complexity
|
|
self._topic = topic
|
|
self._language = language
|
|
|
|
async def process(self, envelope, history, memory_context="", identity="", channel=""):
|
|
return Command(
|
|
analysis=InputAnalysis(
|
|
intent=self._intent, topic=self._topic,
|
|
complexity=self._complexity, language=self._language,
|
|
tone="casual",
|
|
),
|
|
source_text=envelope.text,
|
|
)
|
|
|
|
|
|
class MockOutputNode:
|
|
"""Streams response text via sink."""
|
|
async def process(self, thought, history, sink, memory_context=""):
|
|
text = thought.response or "ok"
|
|
for i in range(0, len(text), 12):
|
|
await sink.send_delta(text[i:i+12])
|
|
await sink.send_done()
|
|
return text
|
|
|
|
|
|
class MockPANode:
|
|
"""Returns a preconfigured PARouting."""
|
|
def __init__(self, expert="eras", job="test query", thinking_msg="Working..."):
|
|
self._expert = expert
|
|
self._job = job
|
|
self._thinking_msg = thinking_msg
|
|
|
|
def set_available_experts(self, experts):
|
|
pass
|
|
|
|
async def route(self, command, history, memory_context="", identity="", channel=""):
|
|
return PARouting(
|
|
expert=self._expert,
|
|
job=self._job,
|
|
thinking_message=self._thinking_msg,
|
|
language="en",
|
|
)
|
|
|
|
async def route_retry(self, command, history, memory_context="", identity="",
|
|
channel="", original_job="", errors=None):
|
|
return PARouting(expert=self._expert, job=f"retry: {self._job}", language="en")
|
|
|
|
|
|
class MockExpertNode:
|
|
"""Returns a preconfigured ThoughtResult."""
|
|
def __init__(self, response="expert result", tool_used="", tool_output="", errors=None):
|
|
self._response = response
|
|
self._tool_used = tool_used
|
|
self._tool_output = tool_output
|
|
self._errors = errors or []
|
|
self.send_hud = MockHud()
|
|
|
|
async def execute(self, job, language):
|
|
return ThoughtResult(
|
|
response=self._response,
|
|
tool_used=self._tool_used,
|
|
tool_output=self._tool_output,
|
|
errors=self._errors,
|
|
)
|
|
|
|
|
|
class MockDirectorNode:
|
|
"""Returns a preconfigured DirectorPlan."""
|
|
def __init__(self, goal="test", tools=None, hint=""):
|
|
self._goal = goal
|
|
self._tools = tools or []
|
|
self._hint = hint
|
|
|
|
async def decide(self, command, history, memory_context=""):
|
|
return DirectorPlan(
|
|
goal=self._goal,
|
|
tool_sequence=self._tools,
|
|
response_hint=self._hint,
|
|
)
|
|
|
|
def get_context_line(self):
|
|
return ""
|
|
|
|
|
|
class MockThinkerNode:
|
|
"""Returns a preconfigured ThoughtResult."""
|
|
def __init__(self, response="thought result", tool_used="", tool_output=""):
|
|
self._response = response
|
|
self._tool_used = tool_used
|
|
self._tool_output = tool_output
|
|
|
|
async def process(self, command, plan=None, history=None, memory_context=""):
|
|
return ThoughtResult(
|
|
response=self._response,
|
|
tool_used=self._tool_used,
|
|
tool_output=self._tool_output,
|
|
)
|
|
|
|
|
|
class MockInterpreterNode:
|
|
"""Returns a preconfigured InterpretedResult."""
|
|
async def interpret(self, tool_used, tool_output, job):
|
|
return InterpretedResult(
|
|
summary=f"Interpreted: {tool_used} returned data",
|
|
row_count=5,
|
|
key_facts=["5 rows"],
|
|
)
|
|
|
|
|
|
def make_frame_engine(nodes, graph_name="v4-eras"):
|
|
"""Create a FrameEngine with mocked dependencies."""
|
|
graph = load_graph(graph_name)
|
|
sink = MockSink()
|
|
hud = MockHud()
|
|
memorizer = MockMemorizer()
|
|
sensor = MockSensor()
|
|
ui = MockUINode()
|
|
|
|
engine = FrameEngine(
|
|
graph=graph,
|
|
nodes=nodes,
|
|
sink=sink,
|
|
history=[],
|
|
send_hud=hud,
|
|
sensor=sensor,
|
|
memorizer=memorizer,
|
|
ui_node=ui,
|
|
identity="test_user",
|
|
channel="test",
|
|
)
|
|
return engine, sink, hud
|
|
|
|
|
|
# --- Tests ---
|
|
|
|
def test_graph_load():
|
|
"""load_graph returns correct structure for all frame-based graphs."""
|
|
for name in ["v3-framed", "v4-eras"]:
|
|
g = load_graph(name)
|
|
assert g["name"] == name, f"graph name mismatch: {g['name']} != {name}"
|
|
assert g["engine"] == "frames", f"{name} should use frames engine"
|
|
assert "nodes" in g and len(g["nodes"]) > 0, f"{name} has no nodes"
|
|
assert "edges" in g and len(g["edges"]) > 0, f"{name} has no edges"
|
|
assert "conditions" in g, f"{name} has no conditions"
|
|
# v1 should be imperative
|
|
g1 = load_graph("v1-current")
|
|
assert g1["engine"] == "imperative", "v1 should be imperative"
|
|
|
|
|
|
def test_node_instantiation():
|
|
"""instantiate_nodes creates all roles from registry."""
|
|
hud = MockHud()
|
|
for name in ["v3-framed", "v4-eras"]:
|
|
g = load_graph(name)
|
|
nodes = instantiate_nodes(g, hud)
|
|
for role in g["nodes"]:
|
|
assert role in nodes, f"missing node role '{role}' in {name}"
|
|
# Check specific node types exist
|
|
assert "input" in nodes
|
|
assert "output" in nodes
|
|
assert "memorizer" in nodes
|
|
assert "sensor" in nodes
|
|
|
|
|
|
def test_edge_types_complete():
|
|
"""All 3 edge types present in graph definitions, no orphan nodes."""
|
|
for name in ["v3-framed", "v4-eras"]:
|
|
g = load_graph(name)
|
|
edges = g["edges"]
|
|
edge_types = {e.get("type") for e in edges}
|
|
assert "data" in edge_types, f"{name} missing data edges"
|
|
assert "context" in edge_types, f"{name} missing context edges"
|
|
assert "state" in edge_types, f"{name} missing state edges"
|
|
|
|
# Every node should appear in at least one edge (from or to)
|
|
node_roles = set(g["nodes"].keys())
|
|
edge_nodes = set()
|
|
for e in edges:
|
|
edge_nodes.add(e["from"])
|
|
to = e["to"]
|
|
if isinstance(to, list):
|
|
edge_nodes.update(to)
|
|
else:
|
|
edge_nodes.add(to)
|
|
# runtime is a virtual target, not a real node
|
|
edge_nodes.discard("runtime")
|
|
missing = node_roles - edge_nodes
|
|
assert not missing, f"{name} has orphan nodes: {missing}"
|
|
|
|
|
|
def test_condition_reflex():
|
|
"""_check_condition('reflex') fires on social+trivial only."""
|
|
engine, _, _ = make_frame_engine({
|
|
"input": MockInputNode(),
|
|
"output": MockOutputNode(),
|
|
"memorizer": MockMemorizer(),
|
|
"sensor": MockSensor(),
|
|
"ui": MockUINode(),
|
|
}, "v4-eras")
|
|
|
|
# Should fire
|
|
cmd_social = Command(
|
|
analysis=InputAnalysis(intent="social", complexity="trivial"),
|
|
source_text="hi",
|
|
)
|
|
assert engine._check_condition("reflex", command=cmd_social), \
|
|
"reflex should fire for social+trivial"
|
|
|
|
# Should NOT fire
|
|
cmd_request = Command(
|
|
analysis=InputAnalysis(intent="request", complexity="simple"),
|
|
source_text="show data",
|
|
)
|
|
assert not engine._check_condition("reflex", command=cmd_request), \
|
|
"reflex should not fire for request+simple"
|
|
|
|
cmd_social_complex = Command(
|
|
analysis=InputAnalysis(intent="social", complexity="complex"),
|
|
source_text="tell me a long story",
|
|
)
|
|
assert not engine._check_condition("reflex", command=cmd_social_complex), \
|
|
"reflex should not fire for social+complex"
|
|
|
|
|
|
def test_condition_tool_output():
|
|
"""_check_condition('has_tool_output') fires when tool data present."""
|
|
engine, _, _ = make_frame_engine({
|
|
"input": MockInputNode(),
|
|
"output": MockOutputNode(),
|
|
"memorizer": MockMemorizer(),
|
|
"sensor": MockSensor(),
|
|
"ui": MockUINode(),
|
|
}, "v4-eras")
|
|
|
|
thought_with = ThoughtResult(
|
|
response="data", tool_used="query_db", tool_output="rows here",
|
|
)
|
|
assert engine._check_condition("has_tool_output", thought=thought_with), \
|
|
"should fire when tool_used and tool_output both set"
|
|
|
|
thought_without = ThoughtResult(response="just text")
|
|
assert not engine._check_condition("has_tool_output", thought=thought_without), \
|
|
"should not fire when no tool output"
|
|
|
|
thought_partial = ThoughtResult(response="x", tool_used="query_db", tool_output="")
|
|
assert not engine._check_condition("has_tool_output", thought=thought_partial), \
|
|
"should not fire when tool_output is empty string"
|
|
|
|
|
|
def test_frame_trace_reflex():
|
|
"""Reflex path: 2 frames (input → output), path='reflex'."""
|
|
nodes = {
|
|
"input": MockInputNode(intent="social", complexity="trivial"),
|
|
"output": MockOutputNode(),
|
|
"pa": MockPANode(),
|
|
"expert_eras": MockExpertNode(),
|
|
"interpreter": MockInterpreterNode(),
|
|
"memorizer": MockMemorizer(),
|
|
"sensor": MockSensor(),
|
|
"ui": MockUINode(),
|
|
}
|
|
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
|
|
|
|
result = asyncio.new_event_loop().run_until_complete(
|
|
engine.process_message("hello")
|
|
)
|
|
|
|
trace = result["trace"]
|
|
assert trace["path"] == "reflex", f"expected reflex path, got {trace['path']}"
|
|
assert trace["total_frames"] == 2, f"expected 2 frames, got {trace['total_frames']}"
|
|
assert len(trace["frames"]) == 2
|
|
assert trace["frames"][0]["node"] == "input"
|
|
assert trace["frames"][1]["node"] == "output"
|
|
assert "reflex=True" in trace["frames"][0]["condition"]
|
|
|
|
|
|
def test_frame_trace_expert():
|
|
"""Expert path without tool output: F1(input)→F2(pa)→F3(expert)→F4(output+ui)."""
|
|
nodes = {
|
|
"input": MockInputNode(intent="request", complexity="simple"),
|
|
"output": MockOutputNode(),
|
|
"pa": MockPANode(expert="eras", job="get top customers"),
|
|
"expert_eras": MockExpertNode(response="Here are the customers"),
|
|
"interpreter": MockInterpreterNode(),
|
|
"memorizer": MockMemorizer(),
|
|
"sensor": MockSensor(),
|
|
"ui": MockUINode(),
|
|
}
|
|
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
|
|
|
|
result = asyncio.new_event_loop().run_until_complete(
|
|
engine.process_message("show top customers")
|
|
)
|
|
|
|
trace = result["trace"]
|
|
assert trace["path"] == "expert", f"expected expert path, got {trace['path']}"
|
|
assert trace["total_frames"] >= 4, f"expected >=4 frames, got {trace['total_frames']}"
|
|
nodes_in_trace = [f["node"] for f in trace["frames"]]
|
|
assert nodes_in_trace[0] == "input"
|
|
assert nodes_in_trace[1] == "pa"
|
|
assert "expert_eras" in nodes_in_trace[2]
|
|
|
|
|
|
def test_frame_trace_expert_with_interpreter():
|
|
"""Expert path with tool output: includes interpreter frame, path='expert+interpreter'."""
|
|
nodes = {
|
|
"input": MockInputNode(intent="request", complexity="simple"),
|
|
"output": MockOutputNode(),
|
|
"pa": MockPANode(expert="eras", job="query customers"),
|
|
"expert_eras": MockExpertNode(
|
|
response="raw data",
|
|
tool_used="query_db",
|
|
tool_output="customer_name,revenue\nAcme,1000",
|
|
),
|
|
"interpreter": MockInterpreterNode(),
|
|
"memorizer": MockMemorizer(),
|
|
"sensor": MockSensor(),
|
|
"ui": MockUINode(),
|
|
}
|
|
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
|
|
|
|
result = asyncio.new_event_loop().run_until_complete(
|
|
engine.process_message("show customer revenue")
|
|
)
|
|
|
|
trace = result["trace"]
|
|
assert trace["path"] == "expert+interpreter", \
|
|
f"expected expert+interpreter path, got {trace['path']}"
|
|
nodes_in_trace = [f["node"] for f in trace["frames"]]
|
|
assert "interpreter" in nodes_in_trace, "interpreter frame missing"
|
|
assert trace["total_frames"] >= 5, f"expected >=5 frames, got {trace['total_frames']}"
|
|
|
|
|
|
# --- Phase 1: Config-driven models (RED — will fail until implemented) ---
|
|
|
|
def test_graph_has_models():
|
|
"""All graph definitions include a MODELS dict mapping role → model."""
|
|
for name in ["v1-current", "v2-director-drives", "v3-framed", "v4-eras"]:
|
|
g = load_graph(name)
|
|
assert "models" in g, f"{name}: graph should have a 'models' key"
|
|
models = g["models"]
|
|
assert isinstance(models, dict), f"{name}: models should be a dict"
|
|
assert len(models) > 0, f"{name}: models should not be empty"
|
|
for role, model in models.items():
|
|
assert isinstance(model, str) and "/" in model, \
|
|
f"{name}: model for '{role}' should be provider/model, got {model}"
|
|
|
|
|
|
def test_instantiate_applies_graph_models():
|
|
"""instantiate_nodes applies model from graph config, overriding class default."""
|
|
hud = MockHud()
|
|
g = load_graph("v4-eras")
|
|
# Override a model in graph config
|
|
g["models"] = g.get("models", {})
|
|
g["models"]["input"] = "test/override-model"
|
|
nodes = instantiate_nodes(g, hud)
|
|
assert nodes["input"].model == "test/override-model", \
|
|
f"input node model should be 'test/override-model', got {nodes['input'].model}"
|
|
|
|
|
|
def test_model_override_per_request():
|
|
"""Engine accepts model overrides that are applied to nodes for one request."""
|
|
nodes = {
|
|
"input": MockInputNode(intent="social", complexity="trivial"),
|
|
"output": MockOutputNode(),
|
|
"pa": MockPANode(),
|
|
"expert_eras": MockExpertNode(),
|
|
"interpreter": MockInterpreterNode(),
|
|
"memorizer": MockMemorizer(),
|
|
"sensor": MockSensor(),
|
|
"ui": MockUINode(),
|
|
}
|
|
engine, sink, hud = make_frame_engine(nodes, "v4-eras")
|
|
|
|
# process_message should accept model_overrides param
|
|
result = asyncio.new_event_loop().run_until_complete(
|
|
engine.process_message("hello", model_overrides={"input": "test/fast-model"})
|
|
)
|
|
# Should complete without error (overrides applied internally)
|
|
assert result["trace"]["path"] == "reflex"
|
|
|
|
|
|
# --- Phase 2: Shared Node Pool (RED — will fail until implemented) ---
|
|
|
|
def test_pool_creates_shared_nodes():
|
|
"""NodePool creates shared instances for stateless nodes."""
|
|
from agent.node_pool import NodePool
|
|
pool = NodePool("v4-eras")
|
|
# Shared nodes should exist
|
|
assert "input" in pool.shared, "input should be shared"
|
|
assert "output" in pool.shared, "output should be shared"
|
|
assert "pa" in pool.shared, "pa should be shared"
|
|
assert "expert_eras" in pool.shared, "expert_eras should be shared"
|
|
assert "interpreter" in pool.shared, "interpreter should be shared"
|
|
|
|
|
|
def test_pool_excludes_stateful():
|
|
"""NodePool excludes stateful nodes (sensor, memorizer, ui)."""
|
|
from agent.node_pool import NodePool
|
|
pool = NodePool("v4-eras")
|
|
assert "sensor" not in pool.shared, "sensor should NOT be shared"
|
|
assert "memorizer" not in pool.shared, "memorizer should NOT be shared"
|
|
assert "ui" not in pool.shared, "ui should NOT be shared"
|
|
|
|
|
|
def test_pool_reuses_instances():
|
|
"""Two Runtimes using the same pool share node objects."""
|
|
from agent.node_pool import NodePool
|
|
pool = NodePool("v4-eras")
|
|
# Same pool → same node instances
|
|
input1 = pool.shared["input"]
|
|
input2 = pool.shared["input"]
|
|
assert input1 is input2, "pool should return same instance"
|
|
|
|
|
|
def test_contextvar_hud_isolation():
|
|
"""Contextvars isolate HUD events between concurrent tasks."""
|
|
from agent.nodes.base import _current_hud
|
|
|
|
results_a = []
|
|
results_b = []
|
|
|
|
async def hud_a(data):
|
|
results_a.append(data)
|
|
|
|
async def hud_b(data):
|
|
results_b.append(data)
|
|
|
|
async def task_a():
|
|
_current_hud.set(hud_a)
|
|
# Simulate work with a yield point
|
|
await asyncio.sleep(0)
|
|
hud_fn = _current_hud.get()
|
|
await hud_fn({"from": "a"})
|
|
|
|
async def task_b():
|
|
_current_hud.set(hud_b)
|
|
await asyncio.sleep(0)
|
|
hud_fn = _current_hud.get()
|
|
await hud_fn({"from": "b"})
|
|
|
|
async def run_both():
|
|
await asyncio.gather(task_a(), task_b())
|
|
|
|
asyncio.new_event_loop().run_until_complete(run_both())
|
|
|
|
assert len(results_a) == 1 and results_a[0]["from"] == "a", \
|
|
f"task_a HUD leaked: {results_a}"
|
|
assert len(results_b) == 1 and results_b[0]["from"] == "b", \
|
|
f"task_b HUD leaked: {results_b}"
|
|
|
|
|
|
# --- Test registry (for run_tests.py) ---
|
|
|
|
TESTS = {
|
|
# Green — engine mechanics
|
|
'graph_load': test_graph_load,
|
|
'node_instantiation': test_node_instantiation,
|
|
'edge_types_complete': test_edge_types_complete,
|
|
'condition_reflex': test_condition_reflex,
|
|
'condition_tool_output': test_condition_tool_output,
|
|
'frame_trace_reflex': test_frame_trace_reflex,
|
|
'frame_trace_expert': test_frame_trace_expert,
|
|
'frame_trace_expert_with_interpreter': test_frame_trace_expert_with_interpreter,
|
|
# Phase 1: config-driven models
|
|
'graph_has_models': test_graph_has_models,
|
|
'instantiate_applies_graph_models': test_instantiate_applies_graph_models,
|
|
'model_override_per_request': test_model_override_per_request,
|
|
# Phase 2: shared node pool
|
|
'pool_creates_shared_nodes': test_pool_creates_shared_nodes,
|
|
'pool_excludes_stateful': test_pool_excludes_stateful,
|
|
'pool_reuses_instances': test_pool_reuses_instances,
|
|
'contextvar_hud_isolation': test_contextvar_hud_isolation,
|
|
}
|