- tests/test_testcases.py: new ChatClient using /api/chat SSE (replaces /api/send polling), each testcase gets own session_id - Registered as 'testcases' suite in run_tests.py (25 markdown testcases) - Results post to /api/test-results for real-time /tests dashboard - Reuses parser + assertion engine from runtime_test.py - Usage: python tests/run_tests.py testcases/fast Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
254 lines
9.3 KiB
Python
254 lines
9.3 KiB
Python
"""Testcases suite — runs markdown testcases from testcases/ via /api/chat SSE.
|
|
|
|
Each testcase gets its own session (session_id), enabling future parallel runs.
|
|
Results are posted to /api/test-results for real-time dashboard visibility.
|
|
|
|
Usage via run_tests.py:
|
|
python tests/run_tests.py testcases # all testcases
|
|
python tests/run_tests.py testcases/fast # single testcase
|
|
python tests/run_tests.py testcases/reflex_path # by name
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
# Add parent to path for runtime_test imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from runtime_test import (
|
|
parse_testcase, check_response, check_actions, check_state, check_trace,
|
|
)
|
|
|
|
_api_url = os.environ.get('ASSAY_API', 'http://assay-runtime-test:8000/api')
|
|
ASSAY_BASE = _api_url.removesuffix('/api') if _api_url.endswith('/api') else _api_url
|
|
API = f'{ASSAY_BASE}/api'
|
|
SERVICE_TOKEN = '7Oorb9S3OpwFyWgm4zi_Tq7GeamefbjjTgooPVPWAwPDOf6B4TvgvQlLbhmT4DjsqBS_D1g'
|
|
HEADERS = {'Authorization': f'Bearer {SERVICE_TOKEN}', 'Content-Type': 'application/json'}
|
|
TESTCASES_DIR = Path(__file__).parent.parent / 'testcases'
|
|
|
|
|
|
# --- SSE client using /api/chat ---
|
|
|
|
class ChatClient:
|
|
"""Sends messages via /api/chat SSE. Each instance has its own session."""
|
|
|
|
def __init__(self):
|
|
self.session_id = str(uuid.uuid4())[:12]
|
|
self.last_response = ""
|
|
self.last_memo = {}
|
|
self.last_actions = []
|
|
self.last_buttons = []
|
|
self.last_trace = [] # HUD events from this request
|
|
|
|
def send(self, text: str, dashboard: list = None) -> dict:
|
|
"""Send message via /api/chat, parse SSE stream."""
|
|
body = {'content': text, 'session_id': self.session_id}
|
|
if dashboard is not None:
|
|
body['dashboard'] = dashboard
|
|
|
|
payload = json.dumps(body).encode()
|
|
req = urllib.request.Request(
|
|
f'{API}/chat', data=payload, method='POST',
|
|
headers=HEADERS,
|
|
)
|
|
resp = urllib.request.urlopen(req, timeout=120)
|
|
output = resp.read().decode('utf-8')
|
|
|
|
# Parse SSE events
|
|
deltas = []
|
|
hud_events = []
|
|
controls = []
|
|
artifacts = []
|
|
|
|
for block in output.split('\n\n'):
|
|
event_type, data_str = '', ''
|
|
for line in block.strip().split('\n'):
|
|
if line.startswith('event: '):
|
|
event_type = line[7:]
|
|
elif line.startswith('data: '):
|
|
data_str = line[6:]
|
|
if not event_type or not data_str:
|
|
continue
|
|
try:
|
|
data = json.loads(data_str)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if event_type == 'delta':
|
|
deltas.append(data.get('content', ''))
|
|
elif event_type == 'hud':
|
|
hud_events.append(data)
|
|
elif event_type == 'controls':
|
|
controls = data.get('controls', [])
|
|
elif event_type == 'artifacts':
|
|
artifacts = data.get('artifacts', [])
|
|
|
|
self.last_response = ''.join(deltas)
|
|
self.last_trace = hud_events
|
|
|
|
# Extract controls from HUD if not sent as separate event
|
|
if not controls:
|
|
for h in reversed(hud_events):
|
|
if h.get('event') == 'controls':
|
|
controls = h.get('controls', [])
|
|
break
|
|
|
|
self.last_actions = controls
|
|
self.last_buttons = [c for c in controls if isinstance(c, dict) and c.get('type') == 'button']
|
|
|
|
return {'response': self.last_response, 'controls': controls, 'artifacts': artifacts}
|
|
|
|
def send_action(self, action: str) -> dict:
|
|
"""Send an action via /api/chat as ACTION: format."""
|
|
body = {
|
|
'content': f'ACTION:{action}',
|
|
'session_id': self.session_id,
|
|
}
|
|
payload = json.dumps(body).encode()
|
|
req = urllib.request.Request(
|
|
f'{API}/chat', data=payload, method='POST',
|
|
headers=HEADERS,
|
|
)
|
|
resp = urllib.request.urlopen(req, timeout=120)
|
|
output = resp.read().decode('utf-8')
|
|
|
|
deltas = []
|
|
hud_events = []
|
|
controls = []
|
|
|
|
for block in output.split('\n\n'):
|
|
event_type, data_str = '', ''
|
|
for line in block.strip().split('\n'):
|
|
if line.startswith('event: '):
|
|
event_type = line[7:]
|
|
elif line.startswith('data: '):
|
|
data_str = line[6:]
|
|
if not event_type or not data_str:
|
|
continue
|
|
try:
|
|
data = json.loads(data_str)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if event_type == 'delta':
|
|
deltas.append(data.get('content', ''))
|
|
elif event_type == 'hud':
|
|
hud_events.append(data)
|
|
elif event_type == 'controls':
|
|
controls = data.get('controls', [])
|
|
|
|
self.last_response = ''.join(deltas)
|
|
self.last_trace = hud_events
|
|
|
|
if not controls:
|
|
for h in reversed(hud_events):
|
|
if h.get('event') == 'controls':
|
|
controls = h.get('controls', [])
|
|
break
|
|
|
|
self.last_actions = controls
|
|
self.last_buttons = [c for c in controls if isinstance(c, dict) and c.get('type') == 'button']
|
|
|
|
return {'response': self.last_response}
|
|
|
|
def get_state(self) -> dict:
|
|
"""Fetch memorizer state from /api/session for this session."""
|
|
req = urllib.request.Request(
|
|
f'{API}/session?session={self.session_id}', headers=HEADERS)
|
|
resp = urllib.request.urlopen(req, timeout=10)
|
|
data = json.loads(resp.read().decode('utf-8'))
|
|
self.last_memo = data.get('memorizer', {})
|
|
return self.last_memo
|
|
|
|
|
|
# --- Testcase runner that returns (name, callable) pairs for run_tests.py ---
|
|
|
|
def _run_testcase(tc: dict):
|
|
"""Execute a parsed testcase. Raises AssertionError on first failure."""
|
|
client = ChatClient()
|
|
errors = []
|
|
|
|
for step in tc['steps']:
|
|
step_name = step['name']
|
|
for cmd in step['commands']:
|
|
if cmd['type'] == 'clear':
|
|
# No-op — each testcase has its own session, no need to clear
|
|
continue
|
|
|
|
elif cmd['type'] == 'send':
|
|
try:
|
|
client.send(cmd['text'], dashboard=cmd.get('dashboard'))
|
|
except Exception as e:
|
|
errors.append(f"[{step_name}] send failed: {e}")
|
|
continue
|
|
|
|
elif cmd['type'] == 'action':
|
|
try:
|
|
client.send_action(cmd['action'])
|
|
except Exception as e:
|
|
errors.append(f"[{step_name}] action failed: {e}")
|
|
continue
|
|
|
|
elif cmd['type'] == 'action_match':
|
|
patterns = cmd['patterns']
|
|
matched = None
|
|
for pattern in patterns:
|
|
pat = pattern.lower()
|
|
for a in client.last_buttons:
|
|
action_str = (a.get('action') or '').lower()
|
|
label_str = (a.get('label') or '').lower()
|
|
if pat in action_str or pat in label_str:
|
|
matched = a.get('action') or a.get('label', '')
|
|
break
|
|
if matched:
|
|
break
|
|
if matched:
|
|
try:
|
|
client.send_action(matched)
|
|
except Exception as e:
|
|
errors.append(f"[{step_name}] action_match failed: {e}")
|
|
else:
|
|
errors.append(f"[{step_name}] no button matching {patterns}")
|
|
|
|
elif cmd['type'] == 'expect_response':
|
|
passed, detail = check_response(client.last_response, cmd['check'])
|
|
if not passed:
|
|
errors.append(f"[{step_name}] response: {cmd['check']} — {detail}")
|
|
|
|
elif cmd['type'] == 'expect_actions':
|
|
passed, detail = check_actions(client.last_actions, cmd['check'])
|
|
if not passed:
|
|
errors.append(f"[{step_name}] actions: {cmd['check']} — {detail}")
|
|
|
|
elif cmd['type'] == 'expect_state':
|
|
client.get_state()
|
|
passed, detail = check_state(client.last_memo, cmd['check'])
|
|
if not passed:
|
|
errors.append(f"[{step_name}] state: {cmd['check']} — {detail}")
|
|
|
|
elif cmd['type'] == 'expect_trace':
|
|
passed, detail = check_trace(client.last_trace, cmd['check'])
|
|
if not passed:
|
|
errors.append(f"[{step_name}] trace: {cmd['check']} — {detail}")
|
|
|
|
if errors:
|
|
raise AssertionError(f"{len(errors)} check(s) failed:\n" + "\n".join(errors[:5]))
|
|
|
|
|
|
def get_testcase_tests() -> dict:
|
|
"""Load all testcases as {name: callable} for run_tests.py."""
|
|
tests = {}
|
|
for md_file in sorted(TESTCASES_DIR.glob('*.md')):
|
|
tc = parse_testcase(md_file)
|
|
if not tc['name'] or not tc['steps']:
|
|
continue
|
|
# Use filename stem as test name (e.g., "fast", "reflex_path")
|
|
name = md_file.stem
|
|
tests[name] = (lambda t: lambda: _run_testcase(t))(tc)
|
|
return tests
|