diff --git a/agent/mcp_app.py b/agent/mcp_app.py index b3ae7b0..abc2b6a 100644 --- a/agent/mcp_app.py +++ b/agent/mcp_app.py @@ -1,4 +1,4 @@ -"""Standalone MCP SSE app — proxies tool calls to assay-runtime.""" +"""Standalone MCP app — proxies tool calls to assay-runtime. Supports Streamable HTTP + SSE.""" import json import logging @@ -14,6 +14,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from mcp.server import Server from mcp.server.sse import SseServerTransport +from mcp.server.streamable_http import StreamableHTTPServerTransport from mcp.types import TextContent, Tool logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s", datefmt="%H:%M:%S") @@ -214,7 +215,29 @@ async def call_tool(name: str, arguments: dict): return [TextContent(type="text", text=f"Unknown tool: {name}")] -# Mount MCP SSE endpoints +# Mount MCP Streamable HTTP endpoint (primary — stateless, survives pod restarts) +_http_transports: dict[str, StreamableHTTPServerTransport] = {} +_http_tasks: dict[str, any] = {} + +@app.api_route("/mcp", methods=["GET", "POST", "DELETE"]) +async def mcp_http(request: Request, user=Depends(require_auth)): + import asyncio + # Get or create session-scoped transport + session_id = request.headers.get("mcp-session-id", "default") + if session_id not in _http_transports: + transport = StreamableHTTPServerTransport(mcp_session_id=session_id) + _http_transports[session_id] = transport + + async def _run(): + async with transport.connect() as streams: + await mcp_server.run(streams[0], streams[1], mcp_server.create_initialization_options()) + _http_tasks[session_id] = asyncio.create_task(_run()) + + transport = _http_transports[session_id] + await transport.handle_request(request.scope, request.receive, request._send) + + +# Mount MCP SSE endpoints (legacy fallback) @app.get("/mcp/sse") async def mcp_sse(request: Request, user=Depends(require_auth)): async with _mcp_transport.connect_sse(request.scope, request.receive, request._send) as streams: