diff --git a/tests/run_tests.py b/tests/run_tests.py new file mode 100644 index 0000000..c638145 --- /dev/null +++ b/tests/run_tests.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +""" +Test orchestrator — runs test suites and posts results to dev assay. + +Usage: + python tests/run_all_tests.py # all suites + python tests/run_all_tests.py api # one suite + python tests/run_all_tests.py roundtrip # one suite + python tests/run_all_tests.py api/health # single test + python tests/run_all_tests.py roundtrip/full_eras # single test + python tests/run_all_tests.py api/health roundtrip/full_chat # multiple tests + +Test names: suite/name (without the suite prefix in the test registry). + engine tests: graph_load, node_instantiation, edge_types_complete, + condition_reflex, condition_tool_output, + frame_trace_reflex, frame_trace_expert, frame_trace_expert_with_interpreter + api tests: health, eras_umsatz_api, eras_umsatz_artifact + roundtrip tests: nyx_loads, inject_artifact, inject_message, full_chat, full_eras +""" + +import json +import os +import sys +import time +import urllib.request +import uuid +from datetime import datetime, timezone +from dataclasses import dataclass, field, asdict + +RESULTS_ENDPOINT = os.environ.get('RESULTS_ENDPOINT', '') +RUN_ID = os.environ.get('RUN_ID', str(uuid.uuid4())[:8]) + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +@dataclass +class TestResult: + run_id: str + test: str + suite: str + status: str # 'pass', 'fail', 'running', 'error' + duration_ms: float = 0 + error: str = '' + ts: str = '' + + +def post_result(result: TestResult): + """Post a single test result to the dev assay endpoint.""" + print(json.dumps(asdict(result)), flush=True) + if not RESULTS_ENDPOINT: + return + try: + payload = json.dumps(asdict(result)).encode() + req = urllib.request.Request( + RESULTS_ENDPOINT, + data=payload, + headers={'Content-Type': 'application/json'}, + ) + urllib.request.urlopen(req, timeout=5) + except Exception as e: + print(f' [warn] failed to post result: {e}', file=sys.stderr) + + +def run_test(name: str, suite: str, fn) -> TestResult: + """Run a single test function and return the result.""" + result = TestResult(run_id=RUN_ID, test=name, suite=suite, status='running', ts=_now_iso()) + post_result(result) + + start = time.time() + try: + fn() + result.status = 'pass' + except AssertionError as e: + result.status = 'fail' + result.error = str(e) + except Exception as e: + result.status = 'error' + result.error = f'{type(e).__name__}: {e}' + result.duration_ms = round((time.time() - start) * 1000) + result.ts = _now_iso() + + post_result(result) + return result + + +def get_api_tests() -> dict: + """Load API tests from e2e_harness.py.""" + sys.path.insert(0, os.path.dirname(__file__)) + import e2e_harness + e2e_harness.ASSAY_BASE = os.environ.get('ASSAY_API', 'http://assay-runtime-test:8000').rstrip('/api') + # Skip browser-dependent tests + return {k: v for k, v in e2e_harness.TESTS.items() if 'takeover' not in k and 'panes' not in k} + + +def get_roundtrip_tests() -> dict: + """Load Playwright roundtrip tests.""" + sys.path.insert(0, os.path.dirname(__file__)) + from test_roundtrip import TESTS + return TESTS + + +def get_engine_tests() -> dict: + """Load engine-level tests (no LLM, no network).""" + sys.path.insert(0, os.path.dirname(__file__)) + from test_engine import TESTS + return TESTS + + +SUITES = { + 'engine': get_engine_tests, + 'api': get_api_tests, + 'roundtrip': get_roundtrip_tests, +} + + +def parse_filters(args: list[str]) -> tuple[set[str] | None, set[str]]: + """Parse CLI args into (suite_filter, test_filter). + + Returns: + suite_filter: set of suite names, or None for all suites + test_filter: set of 'suite/test' names (empty = run all in suite) + """ + if not args: + return None, set() + + suites = set() + tests = set() + for arg in args: + if '/' in arg: + tests.add(arg) + suites.add(arg.split('/')[0]) + else: + suites.add(arg) + return suites, tests + + +def run_suite(suite_name: str, tests: dict, test_filter: set[str]) -> list[TestResult]: + """Run tests from a suite, optionally filtered.""" + results = [] + for name, fn in tests.items(): + # Apply test filter if specified + full_name = f'{suite_name}/{name}' + # Strip suite prefix for matching (roundtrip/full_eras matches roundtrip_full_eras) + short_name = name.replace(f'{suite_name}_', '') + if test_filter and full_name not in test_filter and f'{suite_name}/{short_name}' not in test_filter: + continue + + r = run_test(name, suite_name, fn) + results.append(r) + status = 'PASS' if r.status == 'pass' else 'FAIL' + print(f' [{status}] {suite_name}/{name} ({r.duration_ms:.0f}ms)', flush=True) + if r.error: + print(f' {r.error[:200]}', flush=True) + return results + + +def main(): + suite_filter, test_filter = parse_filters(sys.argv[1:]) + + print(f'=== Test Run {RUN_ID} ===', flush=True) + if suite_filter: + print(f'Filter: suites={suite_filter}, tests={test_filter or "all"}', flush=True) + print(f'ASSAY_API: {os.environ.get("ASSAY_API", "not set")}', flush=True) + print(f'NYX_URL: {os.environ.get("NYX_URL", "not set")}', flush=True) + print(flush=True) + + all_results = [] + + for suite_name, loader in SUITES.items(): + if suite_filter and suite_name not in suite_filter: + continue + print(f'--- {suite_name} ---', flush=True) + tests = loader() + all_results.extend(run_suite(suite_name, tests, test_filter)) + print(flush=True) + + # Summary + passed = sum(1 for r in all_results if r.status == 'pass') + failed = sum(1 for r in all_results if r.status in ('fail', 'error')) + total_ms = sum(r.duration_ms for r in all_results) + print(f'=== {passed} passed, {failed} failed, {len(all_results)} total ({total_ms:.0f}ms) ===', flush=True) + + if RESULTS_ENDPOINT: + summary = TestResult( + run_id=RUN_ID, test='__summary__', suite='summary', + status='pass' if failed == 0 else 'fail', + duration_ms=total_ms, + error=f'{passed} passed, {failed} failed', + ) + post_result(summary) + + sys.exit(1 if failed else 0) + + +if __name__ == '__main__': + main() diff --git a/tests/test_engine.py b/tests/test_engine.py new file mode 100644 index 0000000..d1cfc70 --- /dev/null +++ b/tests/test_engine.py @@ -0,0 +1,491 @@ +"""Engine test suite — tests graph loading, node instantiation, frame engine +routing, conditions, and trace structure. No LLM calls — all nodes mocked. + +Tests: + graph_load — load_graph returns correct structure for all graphs + node_instantiation — instantiate_nodes creates all roles from registry + edge_types_complete — all 3 edge types present, no orphan nodes + condition_reflex — reflex condition fires on social+trivial only + condition_tool_output — has_tool_output condition fires when tool data present + frame_trace_reflex — reflex path produces 2-frame trace + frame_trace_expert — expert path produces correct frame sequence + frame_trace_director — director path produces correct frame sequence +""" + +import asyncio +import os +import sys +import time + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from agent.engine import load_graph, instantiate_nodes, _graph_from_module +from agent.frame_engine import FrameEngine, FrameTrace, FrameRecord +from agent.types import ( + Envelope, Command, InputAnalysis, ThoughtResult, + DirectorPlan, PARouting, InterpretedResult, Artifact, +) + + +# --- Helpers --- + +class MockSink: + """Captures streamed output.""" + def __init__(self): + self.deltas = [] + self.controls = [] + self.artifacts = [] + self.done_count = 0 + + async def send_delta(self, text): + self.deltas.append(text) + + async def send_controls(self, controls): + self.controls = controls + + async def send_artifacts(self, artifacts): + self.artifacts = artifacts + + async def send_done(self): + self.done_count += 1 + + def reset(self): + self.deltas.clear() + + +class MockHud: + """Captures HUD events.""" + def __init__(self): + self.events = [] + + async def __call__(self, data): + self.events.append(data) + + def find(self, event): + return [e for e in self.events if e.get("event") == event] + + +class MockMemorizer: + """Minimal memorizer for frame engine.""" + def __init__(self): + self.state = { + "user_name": "test", + "user_mood": "neutral", + "topic": "testing", + "topic_history": [], + "language": "en", + "style_hint": "casual", + "facts": [], + "user_expectation": "conversational", + } + + def get_context_block(self, sensor_lines=None, ui_state=None): + return "Memory: test context" + + async def update(self, history): + pass + + +class MockSensor: + """Minimal sensor for frame engine.""" + def __init__(self): + self._flags = [] + + def note_user_activity(self): + pass + + def update_browser_dashboard(self, dashboard): + pass + + def get_context_lines(self): + return ["Sensors: test"] + + def consume_flags(self): + flags = self._flags[:] + self._flags.clear() + return flags + + +class MockUINode: + """Minimal UI node for frame engine.""" + def __init__(self): + self.thinker_controls = [] + self.state = {} + self._artifacts = [] + + @property + def current_controls(self): + return self.thinker_controls + + @current_controls.setter + def current_controls(self, value): + self.thinker_controls = value + + async def process(self, thought, history, memory_context=""): + return self.thinker_controls + + def get_machine_summary(self): + return "" + + def get_machine_controls(self): + return [] + + def get_artifacts(self): + return self._artifacts + + def try_machine_transition(self, action): + return False, "" + + async def process_local_action(self, action, data): + return None, [] + + +class MockInputNode: + """Returns a preconfigured Command.""" + def __init__(self, intent="request", complexity="simple", topic="test", language="en"): + self._intent = intent + self._complexity = complexity + self._topic = topic + self._language = language + + async def process(self, envelope, history, memory_context="", identity="", channel=""): + return Command( + analysis=InputAnalysis( + intent=self._intent, topic=self._topic, + complexity=self._complexity, language=self._language, + tone="casual", + ), + source_text=envelope.text, + ) + + +class MockOutputNode: + """Streams response text via sink.""" + async def process(self, thought, history, sink, memory_context=""): + text = thought.response or "ok" + for i in range(0, len(text), 12): + await sink.send_delta(text[i:i+12]) + await sink.send_done() + return text + + +class MockPANode: + """Returns a preconfigured PARouting.""" + def __init__(self, expert="eras", job="test query", thinking_msg="Working..."): + self._expert = expert + self._job = job + self._thinking_msg = thinking_msg + + def set_available_experts(self, experts): + pass + + async def route(self, command, history, memory_context="", identity="", channel=""): + return PARouting( + expert=self._expert, + job=self._job, + thinking_message=self._thinking_msg, + language="en", + ) + + async def route_retry(self, command, history, memory_context="", identity="", + channel="", original_job="", errors=None): + return PARouting(expert=self._expert, job=f"retry: {self._job}", language="en") + + +class MockExpertNode: + """Returns a preconfigured ThoughtResult.""" + def __init__(self, response="expert result", tool_used="", tool_output="", errors=None): + self._response = response + self._tool_used = tool_used + self._tool_output = tool_output + self._errors = errors or [] + self.send_hud = MockHud() + + async def execute(self, job, language): + return ThoughtResult( + response=self._response, + tool_used=self._tool_used, + tool_output=self._tool_output, + errors=self._errors, + ) + + +class MockDirectorNode: + """Returns a preconfigured DirectorPlan.""" + def __init__(self, goal="test", tools=None, hint=""): + self._goal = goal + self._tools = tools or [] + self._hint = hint + + async def decide(self, command, history, memory_context=""): + return DirectorPlan( + goal=self._goal, + tool_sequence=self._tools, + response_hint=self._hint, + ) + + def get_context_line(self): + return "" + + +class MockThinkerNode: + """Returns a preconfigured ThoughtResult.""" + def __init__(self, response="thought result", tool_used="", tool_output=""): + self._response = response + self._tool_used = tool_used + self._tool_output = tool_output + + async def process(self, command, plan=None, history=None, memory_context=""): + return ThoughtResult( + response=self._response, + tool_used=self._tool_used, + tool_output=self._tool_output, + ) + + +class MockInterpreterNode: + """Returns a preconfigured InterpretedResult.""" + async def interpret(self, tool_used, tool_output, job): + return InterpretedResult( + summary=f"Interpreted: {tool_used} returned data", + row_count=5, + key_facts=["5 rows"], + ) + + +def make_frame_engine(nodes, graph_name="v4-eras"): + """Create a FrameEngine with mocked dependencies.""" + graph = load_graph(graph_name) + sink = MockSink() + hud = MockHud() + memorizer = MockMemorizer() + sensor = MockSensor() + ui = MockUINode() + + engine = FrameEngine( + graph=graph, + nodes=nodes, + sink=sink, + history=[], + send_hud=hud, + sensor=sensor, + memorizer=memorizer, + ui_node=ui, + identity="test_user", + channel="test", + ) + return engine, sink, hud + + +# --- Tests --- + +def test_graph_load(): + """load_graph returns correct structure for all frame-based graphs.""" + for name in ["v3-framed", "v4-eras"]: + g = load_graph(name) + assert g["name"] == name, f"graph name mismatch: {g['name']} != {name}" + assert g["engine"] == "frames", f"{name} should use frames engine" + assert "nodes" in g and len(g["nodes"]) > 0, f"{name} has no nodes" + assert "edges" in g and len(g["edges"]) > 0, f"{name} has no edges" + assert "conditions" in g, f"{name} has no conditions" + # v1 should be imperative + g1 = load_graph("v1-current") + assert g1["engine"] == "imperative", "v1 should be imperative" + + +def test_node_instantiation(): + """instantiate_nodes creates all roles from registry.""" + hud = MockHud() + for name in ["v3-framed", "v4-eras"]: + g = load_graph(name) + nodes = instantiate_nodes(g, hud) + for role in g["nodes"]: + assert role in nodes, f"missing node role '{role}' in {name}" + # Check specific node types exist + assert "input" in nodes + assert "output" in nodes + assert "memorizer" in nodes + assert "sensor" in nodes + + +def test_edge_types_complete(): + """All 3 edge types present in graph definitions, no orphan nodes.""" + for name in ["v3-framed", "v4-eras"]: + g = load_graph(name) + edges = g["edges"] + edge_types = {e.get("type") for e in edges} + assert "data" in edge_types, f"{name} missing data edges" + assert "context" in edge_types, f"{name} missing context edges" + assert "state" in edge_types, f"{name} missing state edges" + + # Every node should appear in at least one edge (from or to) + node_roles = set(g["nodes"].keys()) + edge_nodes = set() + for e in edges: + edge_nodes.add(e["from"]) + to = e["to"] + if isinstance(to, list): + edge_nodes.update(to) + else: + edge_nodes.add(to) + # runtime is a virtual target, not a real node + edge_nodes.discard("runtime") + missing = node_roles - edge_nodes + assert not missing, f"{name} has orphan nodes: {missing}" + + +def test_condition_reflex(): + """_check_condition('reflex') fires on social+trivial only.""" + engine, _, _ = make_frame_engine({ + "input": MockInputNode(), + "output": MockOutputNode(), + "memorizer": MockMemorizer(), + "sensor": MockSensor(), + "ui": MockUINode(), + }, "v4-eras") + + # Should fire + cmd_social = Command( + analysis=InputAnalysis(intent="social", complexity="trivial"), + source_text="hi", + ) + assert engine._check_condition("reflex", command=cmd_social), \ + "reflex should fire for social+trivial" + + # Should NOT fire + cmd_request = Command( + analysis=InputAnalysis(intent="request", complexity="simple"), + source_text="show data", + ) + assert not engine._check_condition("reflex", command=cmd_request), \ + "reflex should not fire for request+simple" + + cmd_social_complex = Command( + analysis=InputAnalysis(intent="social", complexity="complex"), + source_text="tell me a long story", + ) + assert not engine._check_condition("reflex", command=cmd_social_complex), \ + "reflex should not fire for social+complex" + + +def test_condition_tool_output(): + """_check_condition('has_tool_output') fires when tool data present.""" + engine, _, _ = make_frame_engine({ + "input": MockInputNode(), + "output": MockOutputNode(), + "memorizer": MockMemorizer(), + "sensor": MockSensor(), + "ui": MockUINode(), + }, "v4-eras") + + thought_with = ThoughtResult( + response="data", tool_used="query_db", tool_output="rows here", + ) + assert engine._check_condition("has_tool_output", thought=thought_with), \ + "should fire when tool_used and tool_output both set" + + thought_without = ThoughtResult(response="just text") + assert not engine._check_condition("has_tool_output", thought=thought_without), \ + "should not fire when no tool output" + + thought_partial = ThoughtResult(response="x", tool_used="query_db", tool_output="") + assert not engine._check_condition("has_tool_output", thought=thought_partial), \ + "should not fire when tool_output is empty string" + + +def test_frame_trace_reflex(): + """Reflex path: 2 frames (input → output), path='reflex'.""" + nodes = { + "input": MockInputNode(intent="social", complexity="trivial"), + "output": MockOutputNode(), + "pa": MockPANode(), + "expert_eras": MockExpertNode(), + "interpreter": MockInterpreterNode(), + "memorizer": MockMemorizer(), + "sensor": MockSensor(), + "ui": MockUINode(), + } + engine, sink, hud = make_frame_engine(nodes, "v4-eras") + + result = asyncio.get_event_loop().run_until_complete( + engine.process_message("hello") + ) + + trace = result["trace"] + assert trace["path"] == "reflex", f"expected reflex path, got {trace['path']}" + assert trace["total_frames"] == 2, f"expected 2 frames, got {trace['total_frames']}" + assert len(trace["frames"]) == 2 + assert trace["frames"][0]["node"] == "input" + assert trace["frames"][1]["node"] == "output" + assert "reflex=True" in trace["frames"][0]["condition"] + + +def test_frame_trace_expert(): + """Expert path without tool output: F1(input)→F2(pa)→F3(expert)→F4(output+ui).""" + nodes = { + "input": MockInputNode(intent="request", complexity="simple"), + "output": MockOutputNode(), + "pa": MockPANode(expert="eras", job="get top customers"), + "expert_eras": MockExpertNode(response="Here are the customers"), + "interpreter": MockInterpreterNode(), + "memorizer": MockMemorizer(), + "sensor": MockSensor(), + "ui": MockUINode(), + } + engine, sink, hud = make_frame_engine(nodes, "v4-eras") + + result = asyncio.get_event_loop().run_until_complete( + engine.process_message("show top customers") + ) + + trace = result["trace"] + assert trace["path"] == "expert", f"expected expert path, got {trace['path']}" + assert trace["total_frames"] >= 4, f"expected >=4 frames, got {trace['total_frames']}" + nodes_in_trace = [f["node"] for f in trace["frames"]] + assert nodes_in_trace[0] == "input" + assert nodes_in_trace[1] == "pa" + assert "expert_eras" in nodes_in_trace[2] + + +def test_frame_trace_expert_with_interpreter(): + """Expert path with tool output: includes interpreter frame, path='expert+interpreter'.""" + nodes = { + "input": MockInputNode(intent="request", complexity="simple"), + "output": MockOutputNode(), + "pa": MockPANode(expert="eras", job="query customers"), + "expert_eras": MockExpertNode( + response="raw data", + tool_used="query_db", + tool_output="customer_name,revenue\nAcme,1000", + ), + "interpreter": MockInterpreterNode(), + "memorizer": MockMemorizer(), + "sensor": MockSensor(), + "ui": MockUINode(), + } + engine, sink, hud = make_frame_engine(nodes, "v4-eras") + + result = asyncio.get_event_loop().run_until_complete( + engine.process_message("show customer revenue") + ) + + trace = result["trace"] + assert trace["path"] == "expert+interpreter", \ + f"expected expert+interpreter path, got {trace['path']}" + nodes_in_trace = [f["node"] for f in trace["frames"]] + assert "interpreter" in nodes_in_trace, "interpreter frame missing" + assert trace["total_frames"] >= 5, f"expected >=5 frames, got {trace['total_frames']}" + + +# --- Test registry (for run_tests.py) --- + +TESTS = { + 'graph_load': test_graph_load, + 'node_instantiation': test_node_instantiation, + 'edge_types_complete': test_edge_types_complete, + 'condition_reflex': test_condition_reflex, + 'condition_tool_output': test_condition_tool_output, + 'frame_trace_reflex': test_frame_trace_reflex, + 'frame_trace_expert': test_frame_trace_expert, + 'frame_trace_expert_with_interpreter': test_frame_trace_expert_with_interpreter, +}