Add Streamable HTTP transport for MCP (replaces SSE as primary)
- New /mcp endpoint with StreamableHTTPServerTransport (stateless per-request) - SSE kept as /mcp/sse fallback - Survives pod restarts — no persistent connection needed Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
38d63b0daa
commit
e94f1c437c
@ -1,4 +1,4 @@
|
|||||||
"""Standalone MCP SSE app — proxies tool calls to assay-runtime."""
|
"""Standalone MCP app — proxies tool calls to assay-runtime. Supports Streamable HTTP + SSE."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
@ -14,6 +14,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|||||||
|
|
||||||
from mcp.server import Server
|
from mcp.server import Server
|
||||||
from mcp.server.sse import SseServerTransport
|
from mcp.server.sse import SseServerTransport
|
||||||
|
from mcp.server.streamable_http import StreamableHTTPServerTransport
|
||||||
from mcp.types import TextContent, Tool
|
from mcp.types import TextContent, Tool
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s", datefmt="%H:%M:%S")
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s", datefmt="%H:%M:%S")
|
||||||
@ -214,7 +215,29 @@ async def call_tool(name: str, arguments: dict):
|
|||||||
return [TextContent(type="text", text=f"Unknown tool: {name}")]
|
return [TextContent(type="text", text=f"Unknown tool: {name}")]
|
||||||
|
|
||||||
|
|
||||||
# Mount MCP SSE endpoints
|
# Mount MCP Streamable HTTP endpoint (primary — stateless, survives pod restarts)
|
||||||
|
_http_transports: dict[str, StreamableHTTPServerTransport] = {}
|
||||||
|
_http_tasks: dict[str, any] = {}
|
||||||
|
|
||||||
|
@app.api_route("/mcp", methods=["GET", "POST", "DELETE"])
|
||||||
|
async def mcp_http(request: Request, user=Depends(require_auth)):
|
||||||
|
import asyncio
|
||||||
|
# Get or create session-scoped transport
|
||||||
|
session_id = request.headers.get("mcp-session-id", "default")
|
||||||
|
if session_id not in _http_transports:
|
||||||
|
transport = StreamableHTTPServerTransport(mcp_session_id=session_id)
|
||||||
|
_http_transports[session_id] = transport
|
||||||
|
|
||||||
|
async def _run():
|
||||||
|
async with transport.connect() as streams:
|
||||||
|
await mcp_server.run(streams[0], streams[1], mcp_server.create_initialization_options())
|
||||||
|
_http_tasks[session_id] = asyncio.create_task(_run())
|
||||||
|
|
||||||
|
transport = _http_transports[session_id]
|
||||||
|
await transport.handle_request(request.scope, request.receive, request._send)
|
||||||
|
|
||||||
|
|
||||||
|
# Mount MCP SSE endpoints (legacy fallback)
|
||||||
@app.get("/mcp/sse")
|
@app.get("/mcp/sse")
|
||||||
async def mcp_sse(request: Request, user=Depends(require_auth)):
|
async def mcp_sse(request: Request, user=Depends(require_auth)):
|
||||||
async with _mcp_transport.connect_sse(request.scope, request.receive, request._send) as streams:
|
async with _mcp_transport.connect_sse(request.scope, request.receive, request._send) as streams:
|
||||||
|
|||||||
Reference in New Issue
Block a user