This repository has been archived on 2026-04-03. You can view files and clone it, but cannot push or open issues or pull requests.
agent-runtime/tests/run_tests.py
Nico 44f6116855 feat(tests): ui suite — toolbar groups, scroll preservation, DOM keep-alive
6 Playwright tests against nyx-test (localhost:30802, auth disabled):
- toolbar group counts per route (nyx=4, tests=2, home=1)
- toolbar survives full nav roundtrip without losing groups
- scroll position preserved across navigation (keep-alive working)
- all visited views stay in DOM (not removed on nav)

Run: NYX_URL=http://localhost:30802 python tests/run_tests.py ui

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 23:24:51 +02:00

368 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Test orchestrator — runs test suites and posts results to dev assay.
Usage:
python tests/run_tests.py # all suites
python tests/run_tests.py api # one suite
python tests/run_tests.py matrix/eras_query[haiku] # single test
python tests/run_tests.py matrix --repeat=3 # each test 3x, report avg/p50/p95
python tests/run_tests.py testcases --parallel=3 # 3 testcases concurrently
python tests/run_tests.py api/health roundtrip/full_chat # multiple tests
Test names: suite/name (without the suite prefix in the test registry).
engine tests: graph_load, node_instantiation, edge_types_complete,
condition_reflex, condition_tool_output,
frame_trace_reflex, frame_trace_expert, frame_trace_expert_with_interpreter
api tests: health, eras_umsatz_api, eras_umsatz_artifact
matrix tests: eras_query[variant], eras_artifact[variant], social_reflex[variant]
variants: gemini-flash, haiku, gpt-4o-mini
testcases: fast, reflex_path, expert_eras, domain_context, ... (from testcases/*.md)
roundtrip tests: nyx_loads, inject_artifact, inject_message, full_chat, full_eras
"""
import json
import os
import sys
import time
import urllib.request
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
RESULTS_ENDPOINT = os.environ.get('RESULTS_ENDPOINT', '')
RUN_ID = os.environ.get('RUN_ID', str(uuid.uuid4())[:8])
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
@dataclass
class TestResult:
run_id: str
test: str
suite: str
status: str # 'pass', 'fail', 'running', 'error'
duration_ms: float = 0
error: str = ''
ts: str = ''
stats: dict = field(default_factory=dict) # {runs, min_ms, avg_ms, p50_ms, p95_ms, max_ms, pass_rate}
def post_result(result: TestResult):
"""Post a single test result to the dev assay endpoint."""
print(json.dumps(asdict(result)), flush=True)
if not RESULTS_ENDPOINT:
return
try:
payload = json.dumps(asdict(result)).encode()
req = urllib.request.Request(
RESULTS_ENDPOINT,
data=payload,
headers={'Content-Type': 'application/json'},
)
urllib.request.urlopen(req, timeout=5)
except Exception as e:
print(f' [warn] failed to post result: {e}', file=sys.stderr)
def run_test(name: str, suite: str, fn) -> TestResult:
"""Run a single test function and return the result."""
result = TestResult(run_id=RUN_ID, test=name, suite=suite, status='running', ts=_now_iso())
post_result(result)
start = time.time()
try:
fn()
result.status = 'pass'
except AssertionError as e:
result.status = 'fail'
result.error = str(e)
except Exception as e:
result.status = 'error'
result.error = f'{type(e).__name__}: {e}'
result.duration_ms = round((time.time() - start) * 1000)
result.ts = _now_iso()
post_result(result)
return result
def get_api_tests() -> dict:
"""Load API tests from e2e_harness.py."""
sys.path.insert(0, os.path.dirname(__file__))
import e2e_harness
e2e_harness.ASSAY_BASE = os.environ.get('ASSAY_API', 'http://assay-runtime-test:8000').rstrip('/api')
# Skip browser-dependent tests
return {k: v for k, v in e2e_harness.TESTS.items() if 'takeover' not in k and 'panes' not in k}
def get_roundtrip_tests() -> dict:
"""Load Playwright roundtrip tests."""
sys.path.insert(0, os.path.dirname(__file__))
from test_roundtrip import TESTS
return TESTS
def get_engine_tests() -> dict:
"""Load engine-level tests (no LLM, no network)."""
sys.path.insert(0, os.path.dirname(__file__))
from test_engine import TESTS
return TESTS
def get_matrix_tests() -> dict:
"""Load model matrix tests (real LLM calls, test×variant combos)."""
sys.path.insert(0, os.path.dirname(__file__))
from test_matrix import get_matrix_tests
return get_matrix_tests()
def get_testcase_tests() -> dict:
"""Load markdown testcases from testcases/ (integration tests, real LLM)."""
sys.path.insert(0, os.path.dirname(__file__))
from test_testcases import get_testcase_tests
return get_testcase_tests()
def get_node_tests() -> dict:
"""Load node-level tests (direct node instantiation, real LLM + DB, no HTTP)."""
sys.path.insert(0, os.path.dirname(__file__))
from test_node_eras import TESTS
return TESTS
def get_ui_tests() -> dict:
"""Load UI tests — toolbar, navigation, scroll (Playwright, no backend needed)."""
sys.path.insert(0, os.path.dirname(__file__))
from test_ui import TESTS
return TESTS
SUITES = {
'engine': get_engine_tests,
'api': get_api_tests,
'node': get_node_tests,
'matrix': get_matrix_tests,
'testcases': get_testcase_tests,
'roundtrip': get_roundtrip_tests,
'ui': get_ui_tests,
}
def _compute_stats(durations: list[float], passed: int, total: int) -> dict:
"""Compute timing stats from a list of durations."""
if not durations:
return {}
durations.sort()
n = len(durations)
return {
'runs': total,
'passed': passed,
'pass_rate': round(100 * passed / total) if total else 0,
'min_ms': round(durations[0]),
'avg_ms': round(sum(durations) / n),
'p50_ms': round(durations[n // 2]),
'p95_ms': round(durations[min(int(n * 0.95), n - 1)]),
'max_ms': round(durations[-1]),
}
def run_test_repeated(name: str, suite: str, fn, repeat: int) -> TestResult:
"""Run a test N times, aggregate timing stats into one result."""
# Post running status
result = TestResult(run_id=RUN_ID, test=name, suite=suite, status='running', ts=_now_iso())
post_result(result)
durations = []
passed_count = 0
last_error = ''
for i in range(repeat):
start = time.time()
try:
fn()
elapsed = round((time.time() - start) * 1000)
durations.append(elapsed)
passed_count += 1
except (AssertionError, Exception) as e:
elapsed = round((time.time() - start) * 1000)
durations.append(elapsed)
last_error = str(e)[:200]
stats = _compute_stats(durations, passed_count, repeat)
result.stats = stats
result.duration_ms = stats.get('avg_ms', 0)
result.status = 'pass' if passed_count == repeat else ('fail' if passed_count > 0 else 'error')
result.error = f'{stats["pass_rate"]}% pass, avg={stats["avg_ms"]}ms p50={stats["p50_ms"]}ms p95={stats["p95_ms"]}ms'
if last_error and passed_count < repeat:
result.error += f' | last err: {last_error}'
result.ts = _now_iso()
post_result(result)
return result
def parse_args(args: list[str]) -> tuple[set[str] | None, set[str], int, int]:
"""Parse CLI args into (suite_filter, test_filter, repeat, parallel).
Supports: --repeat=N, --parallel=N
Returns:
suite_filter: set of suite names, or None for all suites
test_filter: set of 'suite/test' names (empty = run all in suite)
repeat: number of times to run each test (default 1)
parallel: max concurrent tests (default 1 = sequential)
"""
repeat = 1
parallel = 1
filtered_args = []
skip_next = False
for i, arg in enumerate(args):
if skip_next:
skip_next = False
continue
if arg.startswith('--repeat='):
repeat = int(arg.split('=', 1)[1])
elif arg == '--repeat' and i + 1 < len(args):
repeat = int(args[i + 1])
skip_next = True
elif arg.startswith('--parallel='):
parallel = int(arg.split('=', 1)[1])
elif arg == '--parallel' and i + 1 < len(args):
parallel = int(args[i + 1])
skip_next = True
else:
filtered_args.append(arg)
if not filtered_args:
return None, set(), repeat, parallel
suites = set()
tests = set()
for arg in filtered_args:
if '/' in arg:
tests.add(arg)
suites.add(arg.split('/')[0])
else:
suites.add(arg)
return suites, tests, repeat, parallel
def _run_one(name: str, suite_name: str, fn, repeat: int) -> TestResult:
"""Run a single test (with optional repeat). Thread-safe."""
if repeat > 1:
return run_test_repeated(name, suite_name, fn, repeat)
return run_test(name, suite_name, fn)
def _print_result(suite_name: str, name: str, r: TestResult, repeat: int):
"""Print a test result line."""
status = 'PASS' if r.status == 'pass' else 'FAIL'
if repeat > 1:
stats = r.stats
print(f' [{status}] {suite_name}/{name} ×{repeat} '
f'(avg={stats.get("avg_ms", 0)}ms p50={stats.get("p50_ms", 0)}ms '
f'p95={stats.get("p95_ms", 0)}ms pass={stats.get("pass_rate", 0)}%)', flush=True)
else:
print(f' [{status}] {suite_name}/{name} ({r.duration_ms:.0f}ms)', flush=True)
if r.error and repeat == 1:
print(f' {r.error[:200]}', flush=True)
def run_suite(suite_name: str, tests: dict, test_filter: set[str],
repeat: int = 1, parallel: int = 1) -> list[TestResult]:
"""Run tests from a suite, optionally filtered, repeated, and parallelized."""
# Build filtered test list
filtered = []
for name, fn in tests.items():
full_name = f'{suite_name}/{name}'
short_name = name.replace(f'{suite_name}_', '')
if test_filter and full_name not in test_filter and f'{suite_name}/{short_name}' not in test_filter:
continue
filtered.append((name, fn))
if not filtered:
return []
# Sequential execution
if parallel <= 1 or len(filtered) <= 1:
results = []
for name, fn in filtered:
r = _run_one(name, suite_name, fn, repeat)
_print_result(suite_name, name, r, repeat)
results.append(r)
return results
# Parallel execution
results = []
with ThreadPoolExecutor(max_workers=parallel) as pool:
futures = {}
for name, fn in filtered:
f = pool.submit(_run_one, name, suite_name, fn, repeat)
futures[f] = name
for future in as_completed(futures):
name = futures[future]
try:
r = future.result()
except Exception as e:
r = TestResult(run_id=RUN_ID, test=name, suite=suite_name,
status='error', error=f'ThreadError: {e}', ts=_now_iso())
_print_result(suite_name, name, r, repeat)
results.append(r)
return results
def main():
suite_filter, test_filter, repeat, parallel = parse_args(sys.argv[1:])
print(f'=== Test Run {RUN_ID} ===', flush=True)
if suite_filter:
print(f'Filter: suites={suite_filter}, tests={test_filter or "all"}', flush=True)
if repeat > 1:
print(f'Repeat: {repeat}x per test', flush=True)
if parallel > 1:
print(f'Parallel: {parallel} concurrent tests', flush=True)
print(f'ASSAY_API: {os.environ.get("ASSAY_API", "not set")}', flush=True)
print(f'NYX_URL: {os.environ.get("NYX_URL", "not set")}', flush=True)
print(flush=True)
all_results = []
for suite_name, loader in SUITES.items():
if suite_filter and suite_name not in suite_filter:
continue
label = suite_name
if repeat > 1:
label += f' ×{repeat}'
if parallel > 1:
label += f'{parallel}'
print(f'--- {label} ---', flush=True)
tests = loader()
all_results.extend(run_suite(suite_name, tests, test_filter, repeat, parallel))
print(flush=True)
# Summary
passed = sum(1 for r in all_results if r.status == 'pass')
failed = sum(1 for r in all_results if r.status in ('fail', 'error'))
total_ms = sum(r.duration_ms for r in all_results)
print(f'=== {passed} passed, {failed} failed, {len(all_results)} total ({total_ms:.0f}ms) ===', flush=True)
if RESULTS_ENDPOINT:
summary = TestResult(
run_id=RUN_ID, test='__summary__', suite='summary',
status='pass' if failed == 0 else 'fail',
duration_ms=total_ms,
error=f'{passed} passed, {failed} failed',
)
post_result(summary)
sys.exit(1 if failed else 0)
if __name__ == '__main__':
main()