From e94f1c437cd5201cdfe87af77249853b26ff2ae0 Mon Sep 17 00:00:00 2001 From: Nico Date: Wed, 1 Apr 2026 00:23:11 +0200 Subject: [PATCH] Add Streamable HTTP transport for MCP (replaces SSE as primary) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New /mcp endpoint with StreamableHTTPServerTransport (stateless per-request) - SSE kept as /mcp/sse fallback - Survives pod restarts — no persistent connection needed Co-Authored-By: Claude Opus 4.6 (1M context) --- agent/mcp_app.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/agent/mcp_app.py b/agent/mcp_app.py index b3ae7b0..abc2b6a 100644 --- a/agent/mcp_app.py +++ b/agent/mcp_app.py @@ -1,4 +1,4 @@ -"""Standalone MCP SSE app — proxies tool calls to assay-runtime.""" +"""Standalone MCP app — proxies tool calls to assay-runtime. Supports Streamable HTTP + SSE.""" import json import logging @@ -14,6 +14,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from mcp.server import Server from mcp.server.sse import SseServerTransport +from mcp.server.streamable_http import StreamableHTTPServerTransport from mcp.types import TextContent, Tool logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s", datefmt="%H:%M:%S") @@ -214,7 +215,29 @@ async def call_tool(name: str, arguments: dict): return [TextContent(type="text", text=f"Unknown tool: {name}")] -# Mount MCP SSE endpoints +# Mount MCP Streamable HTTP endpoint (primary — stateless, survives pod restarts) +_http_transports: dict[str, StreamableHTTPServerTransport] = {} +_http_tasks: dict[str, any] = {} + +@app.api_route("/mcp", methods=["GET", "POST", "DELETE"]) +async def mcp_http(request: Request, user=Depends(require_auth)): + import asyncio + # Get or create session-scoped transport + session_id = request.headers.get("mcp-session-id", "default") + if session_id not in _http_transports: + transport = StreamableHTTPServerTransport(mcp_session_id=session_id) + _http_transports[session_id] = transport + + async def _run(): + async with transport.connect() as streams: + await mcp_server.run(streams[0], streams[1], mcp_server.create_initialization_options()) + _http_tasks[session_id] = asyncio.create_task(_run()) + + transport = _http_transports[session_id] + await transport.handle_request(request.scope, request.receive, request._send) + + +# Mount MCP SSE endpoints (legacy fallback) @app.get("/mcp/sse") async def mcp_sse(request: Request, user=Depends(require_auth)): async with _mcp_transport.connect_sse(request.scope, request.receive, request._send) as streams: