From f4e5a2d03bea931c582d7895972d85df53ee828e Mon Sep 17 00:00:00 2001 From: Vardhan Agnihotri Date: Fri, 30 Jan 2026 11:34:06 -0800 Subject: [PATCH] reset history --- .gitignore | 10 ++ README.md | 119 +++++++++++++ env.example | 37 ++++ requirements.txt | 6 + server.py | 442 +++++++++++++++++++++++++++++++++++++++++++++++ test_grok_mcp.py | 68 ++++++++ 6 files changed, 682 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 env.example create mode 100644 requirements.txt create mode 100644 server.py create mode 100644 test_grok_mcp.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..990748e --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.DS_Store +__pycache__/ +*.py[cod] +.venv/ +venv/ +env/ +.env +.env.* +!.env.example +scripts/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..bf4b1d8 --- /dev/null +++ b/README.md @@ -0,0 +1,119 @@ +# X API FastMCP Server + +Run a local MCP server that exposes the X API OpenAPI spec as tools using +FastMCP. Streaming and webhook endpoints are excluded. + +## Prerequisites + +- Python 3.9+ +- An X Developer Platform app (to get tokens) +- Optional: an xAI API key if you want to run the Grok test client + +## Quick start (local server) + +1. Create a virtual environment and install dependencies: + - `python -m venv .venv` + - `source .venv/bin/activate` + - `pip install -r requirements.txt` +2. Create your local `.env`: + - `cp env.example .env` + - Fill in the OAuth1 section (consumer key/secret and callback settings). +3. Run the server: + - `python server.py` + +The server starts at `http://127.0.0.1:8000` by default. +The MCP endpoint is `http://127.0.0.1:8000/mcp`. + +## Environment variables + +Required (OAuth1 user context): +- `TWITTER_CONSUMER_KEY` +- `TWITTER_CONSUMER_SECRET` +- `X_OAUTH_CALLBACK_HOST` (default `127.0.0.1`) +- `X_OAUTH_CALLBACK_PORT` (default `8976`) +- `X_OAUTH_CALLBACK_PATH` (default `/oauth/callback`) +- `X_OAUTH_CALLBACK_TIMEOUT` (default `300`) + +Optional auth fallback: +- `X_BEARER_TOKEN` (OAuth2 bearer token) + +Optional server config: +- `MCP_HOST` (default `127.0.0.1`) +- `MCP_PORT` (default `8000`) +- `X_API_BASE_URL` (default `https://api.x.com`) +- `X_API_TIMEOUT` (default `30`) +- `X_API_DEBUG` (default `1`) +- `FASTMCP_EXPERIMENTAL_ENABLE_NEW_OPENAPI_PARSER` + +Tool filtering (comma-separated): +- `X_API_TOOL_TAGS` +- `X_API_TOOL_ALLOWLIST` +- `X_API_TOOL_DENYLIST` + +Optional Grok test client: +- `XAI_API_KEY` +- `XAI_MODEL` (default `grok-4-1-fast`) +- `MCP_SERVER_URL` (default `http://127.0.0.1:8000/mcp`) + +## Auth flow (OAuth1 on startup) + +The server runs an OAuth1 browser flow on startup and uses the resulting +access token to sign every request. You must register a callback URL in your +X Developer App that matches: + +``` +http://: +``` + +Example: + +``` +http://127.0.0.1:8976/oauth/callback +``` + +When you start the server, it will open a browser tab for consent and wait +for the callback. Tokens are kept in memory only for the lifetime of the +server process. + +## Tool whitelisting + +If you want to limit the tool list (smaller context window, fewer tools), +use `X_API_TOOL_ALLOWLIST` or `X_API_TOOL_TAGS` in `.env`. + +Example allowlist: + +``` +X_API_TOOL_ALLOWLIST=getUsersByUsername,createDirectMessagesByParticipantId +``` + +Example tags: + +``` +X_API_TOOL_TAGS=users,dm +``` + +Allowlist and tags are applied at startup when the OpenAPI spec is loaded. + +## Generate an OAuth2 user token (optional) + +If you want a user-context OAuth2 token: +1. Add `CLIENT_ID` and `CLIENT_SECRET` to your `.env`. +2. Update `redirect_uri` in `generate_authtoken.py` to match your app settings. +3. Run `python generate_authtoken.py` and follow the prompts. +4. Copy the printed access token into `.env` as `X_OAUTH_ACCESS_TOKEN`. + +## Run the Grok MCP test client (optional) + +1. Set `XAI_API_KEY` in `.env`. +2. Make sure your MCP server is running locally (or set `MCP_SERVER_URL`). +3. If Grok cannot reach `http://127.0.0.1:8000/mcp`, use ngrok to tunnel your + local server and point `MCP_SERVER_URL` to the public ngrok URL. +4. Run `python test_grok_mcp.py`. + +## Notes + +- Endpoints with `/stream` or `/webhooks` in the path are excluded. +- Operations tagged `Stream` or `Webhooks`, or marked with + `x-twitter-streaming: true`, are excluded. +- The OpenAPI spec is fetched from `https://api.twitter.com/2/openapi.json` at + startup. diff --git a/env.example b/env.example new file mode 100644 index 0000000..76d61a7 --- /dev/null +++ b/env.example @@ -0,0 +1,37 @@ +# X API auth (fallback, optional) +X_OAUTH_ACCESS_TOKEN= +X_BEARER_TOKEN= + +# OAuth1 startup (required) +TWITTER_CONSUMER_KEY= +TWITTER_CONSUMER_SECRET= +X_OAUTH_CALLBACK_HOST=127.0.0.1 +X_OAUTH_CALLBACK_PORT=8976 +X_OAUTH_CALLBACK_PATH=/oauth/callback +X_OAUTH_CALLBACK_TIMEOUT=300 + +# X API settings +X_API_BASE_URL=https://api.x.com +X_API_TIMEOUT=30 +X_API_DEBUG=1 + +# MCP server settings +MCP_HOST=127.0.0.1 +MCP_PORT=8000 + +# FastMCP parser flag (optional) +FASTMCP_EXPERIMENTAL_ENABLE_NEW_OPENAPI_PARSER= + +# Tool filtering (optional, comma-separated) +X_API_TOOL_TAGS= +X_API_TOOL_ALLOWLIST= +X_API_TOOL_DENYLIST= + +# Local Grok client (optional) +XAI_API_KEY= +XAI_MODEL=grok-4-1-fast +MCP_SERVER_URL=[your ngrok url]/mcp + +# OAuth2 token generation (optional) +CLIENT_ID= +CLIENT_SECRET= diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..344e027 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +fastmcp +httpx +python-dotenv +requests-oauthlib +xai-sdk +xdk \ No newline at end of file diff --git a/server.py b/server.py new file mode 100644 index 0000000..70f4506 --- /dev/null +++ b/server.py @@ -0,0 +1,442 @@ +import copy +import http.server +import json +import logging +import os +import socketserver +import threading +import time +import urllib.parse +import webbrowser +from pathlib import Path + +import httpx +import requests +from oauthlib.oauth1 import Client as OAuth1Client +from requests_oauthlib import OAuth1Session +from fastmcp import FastMCP + +HTTP_METHODS = { + "get", + "post", + "put", + "patch", + "delete", + "options", + "head", + "trace", +} + +LOGGER = logging.getLogger("xmcp.x_api") +OAUTH_LOGGER = logging.getLogger("xmcp.oauth1") + +REQUEST_TOKEN_URL = "https://api.x.com/oauth/request_token" +AUTHORIZE_URL = "https://api.x.com/oauth/authorize" +ACCESS_TOKEN_URL = "https://api.x.com/oauth/access_token" + + +def is_truthy(value: str | None) -> bool: + if value is None: + return False + return value.strip().lower() in {"1", "true", "yes", "on"} + + +def parse_csv_env(key: str) -> set[str]: + raw = os.getenv(key, "") + if not raw.strip(): + return set() + return {item.strip() for item in raw.split(",") if item.strip()} + + +def should_join_query_param(param: dict) -> bool: + if param.get("in") != "query": + return False + schema = param.get("schema", {}) + if schema.get("type") != "array": + return False + return param.get("explode") is False + + +def collect_comma_params(spec: dict) -> set[str]: + comma_params: set[str] = set() + components = spec.get("components", {}).get("parameters", {}) + for param in components.values(): + if isinstance(param, dict) and should_join_query_param(param): + name = param.get("name") + if isinstance(name, str): + comma_params.add(name) + + for item in spec.get("paths", {}).values(): + if not isinstance(item, dict): + continue + for method, operation in item.items(): + if method.lower() not in HTTP_METHODS or not isinstance(operation, dict): + continue + for param in operation.get("parameters", []): + if not isinstance(param, dict) or "$ref" in param: + continue + if should_join_query_param(param): + name = param.get("name") + if isinstance(name, str): + comma_params.add(name) + + return comma_params + + +def load_openapi_spec() -> dict: + url = "https://api.twitter.com/2/openapi.json" + LOGGER.info("Fetching OpenAPI spec from %s", url) + response = requests.get(url, timeout=30) + response.raise_for_status() + return response.json() + + +def _get_env_int(key: str, default: int) -> int: + raw = os.getenv(key, "").strip() + if not raw: + return default + try: + return int(raw) + except ValueError: + raise RuntimeError(f"{key} must be an integer value.") + + +def _callback_url(host: str, port: int, path: str) -> str: + return f"http://{host}:{port}{path}" + + +def _wait_for_callback( + host: str, port: int, path: str, timeout_seconds: int +) -> tuple[str, str]: + params: dict[str, str | None] = {"oauth_token": None, "oauth_verifier": None} + event = threading.Event() + + class _Handler(http.server.BaseHTTPRequestHandler): + def do_GET(self) -> None: # noqa: N802 - required by BaseHTTPRequestHandler + parsed = urllib.parse.urlparse(self.path) + if parsed.path != path: + self.send_response(404) + self.end_headers() + self.wfile.write(b"Not found.") + return + query = urllib.parse.parse_qs(parsed.query) + params["oauth_token"] = (query.get("oauth_token") or [None])[0] + params["oauth_verifier"] = (query.get("oauth_verifier") or [None])[0] + event.set() + self.send_response(200) + self.end_headers() + self.wfile.write(b"OAuth complete. You may close this tab.") + + def log_message(self, format: str, *args: object) -> None: # noqa: A003 + OAUTH_LOGGER.debug("OAuth1 callback: " + format, *args) + + class _Server(socketserver.TCPServer): + allow_reuse_address = True + + server = _Server((host, port), _Handler) + server.timeout = 1 + + deadline = time.time() + timeout_seconds + try: + while time.time() < deadline: + server.handle_request() + if event.is_set(): + break + finally: + server.server_close() + + oauth_token = params.get("oauth_token") + oauth_verifier = params.get("oauth_verifier") + if not oauth_token or not oauth_verifier: + raise TimeoutError("OAuth callback not received before timeout.") + return oauth_token, oauth_verifier + + +def run_oauth1_flow() -> tuple[str, str]: + consumer_key = os.getenv("TWITTER_CONSUMER_KEY") + consumer_secret = os.getenv("TWITTER_CONSUMER_SECRET") + if not consumer_key or not consumer_secret: + raise RuntimeError( + "Missing TWITTER_CONSUMER_KEY or TWITTER_CONSUMER_SECRET for OAuth1 flow." + ) + + callback_host = os.getenv("X_OAUTH_CALLBACK_HOST", "127.0.0.1") + callback_port = _get_env_int("X_OAUTH_CALLBACK_PORT", 8976) + callback_path = os.getenv("X_OAUTH_CALLBACK_PATH", "/oauth/callback") + callback_timeout = _get_env_int("X_OAUTH_CALLBACK_TIMEOUT", 300) + + callback_url = _callback_url(callback_host, callback_port, callback_path) + + oauth = OAuth1Session( + client_key=consumer_key, + client_secret=consumer_secret, + callback_uri=callback_url, + ) + request_token = oauth.fetch_request_token(REQUEST_TOKEN_URL) + resource_owner_key = request_token.get("oauth_token") + resource_owner_secret = request_token.get("oauth_token_secret") + if not resource_owner_key or not resource_owner_secret: + raise RuntimeError("Failed to obtain OAuth request token.") + + authorization_url = oauth.authorization_url(AUTHORIZE_URL) + OAUTH_LOGGER.info("Opening browser for OAuth1 consent.") + webbrowser.open(authorization_url) + + oauth_token, oauth_verifier = _wait_for_callback( + callback_host, callback_port, callback_path, callback_timeout + ) + if oauth_token != resource_owner_key: + raise RuntimeError("OAuth callback token does not match request token.") + + oauth = OAuth1Session( + client_key=consumer_key, + client_secret=consumer_secret, + resource_owner_key=resource_owner_key, + resource_owner_secret=resource_owner_secret, + verifier=oauth_verifier, + ) + access_token = oauth.fetch_access_token(ACCESS_TOKEN_URL) + access_key = access_token.get("oauth_token") + access_secret = access_token.get("oauth_token_secret") + if not access_key or not access_secret: + raise RuntimeError("Failed to obtain OAuth access token.") + return access_key, access_secret + + +def load_env() -> None: + env_path = Path(__file__).resolve().parent / ".env" + if not env_path.exists(): + return + try: + from dotenv import load_dotenv + except ImportError: + return + load_dotenv(env_path, override=True) + + +def setup_logging() -> bool: + debug_enabled = is_truthy(os.getenv("X_API_DEBUG", "1")) + if debug_enabled: + logging.basicConfig(level=logging.INFO) + LOGGER.setLevel(logging.INFO) + return debug_enabled + + +def should_exclude_operation(path: str, operation: dict) -> bool: + if "/webhooks" in path or "/stream" in path: + return True + + tags = [tag.lower() for tag in operation.get("tags", []) if isinstance(tag, str)] + if "stream" in tags or "webhooks" in tags: + return True + + if operation.get("x-twitter-streaming") is True: + return True + + return False + + +def filter_openapi_spec(spec: dict) -> dict: + filtered = copy.deepcopy(spec) + paths = filtered.get("paths", {}) + new_paths = {} + allow_tags = {tag.lower() for tag in parse_csv_env("X_API_TOOL_TAGS")} + allow_ops = parse_csv_env("X_API_TOOL_ALLOWLIST") + deny_ops = parse_csv_env("X_API_TOOL_DENYLIST") + + for path, item in paths.items(): + if not isinstance(item, dict): + continue + + new_item = {} + for key, value in item.items(): + if key.lower() in HTTP_METHODS: + if should_exclude_operation(path, value): + continue + operation_id = value.get("operationId") + operation_tags = [ + tag.lower() + for tag in value.get("tags", []) + if isinstance(tag, str) + ] + if allow_tags and not (set(operation_tags) & allow_tags): + continue + if allow_ops and operation_id not in allow_ops: + continue + if deny_ops and operation_id in deny_ops: + continue + new_item[key] = value + else: + new_item[key] = value + + if any(method.lower() in HTTP_METHODS for method in new_item.keys()): + new_paths[path] = new_item + + filtered["paths"] = new_paths + return filtered + + +def print_tool_list(spec: dict) -> None: + tools: list[str] = [] + for path, item in spec.get("paths", {}).items(): + if not isinstance(item, dict): + continue + for method, operation in item.items(): + if method.lower() not in HTTP_METHODS or not isinstance(operation, dict): + continue + op_id = operation.get("operationId") + if op_id: + tools.append(op_id) + else: + tools.append(f"{method.upper()} {path}") + + tools.sort() + print(f"Loaded {len(tools)} tools from OpenAPI:") + for tool in tools: + print(f"- {tool}") + + +def get_auth_headers(oauth_token: str | None = None) -> dict: + env_oauth_token = os.getenv("X_OAUTH_ACCESS_TOKEN", "").strip() + bearer_token = os.getenv("X_BEARER_TOKEN", "").strip() + token = oauth_token or env_oauth_token or bearer_token + if not token: + raise RuntimeError( + "Set X_BEARER_TOKEN or provide OAuth1 access token on startup." + ) + return {"Authorization": f"Bearer {token}"} + + +def build_oauth1_client() -> OAuth1Client: + consumer_key = os.getenv("TWITTER_CONSUMER_KEY") + consumer_secret = os.getenv("TWITTER_CONSUMER_SECRET") + if not consumer_key or not consumer_secret: + raise RuntimeError( + "Missing TWITTER_CONSUMER_KEY or TWITTER_CONSUMER_SECRET for OAuth1 signing." + ) + access_token, access_secret = run_oauth1_flow() + LOGGER.info("OAuth1 access token: %s", access_token) + return OAuth1Client( + client_key=consumer_key, + client_secret=consumer_secret, + resource_owner_key=access_token, + resource_owner_secret=access_secret, + signature_type="AUTH_HEADER", + ) + + +def create_mcp() -> FastMCP: + load_env() + debug_enabled = setup_logging() + parser_flag = os.getenv("FASTMCP_EXPERIMENTAL_ENABLE_NEW_OPENAPI_PARSER") + if parser_flag is not None: + os.environ["FASTMCP_EXPERIMENTAL_ENABLE_NEW_OPENAPI_PARSER"] = parser_flag + + base_url = os.getenv("X_API_BASE_URL", "https://api.x.com") + timeout = float(os.getenv("X_API_TIMEOUT", "30")) + + oauth1_client = build_oauth1_client() + + spec = load_openapi_spec() + filtered_spec = filter_openapi_spec(spec) + comma_params = collect_comma_params(filtered_spec) + print_tool_list(filtered_spec) + async def normalize_query_params(request: httpx.Request) -> None: + if not comma_params: + return + params = list(request.url.params.multi_items()) + grouped: dict[str, list[str]] = {} + ordered: list[str] = [] + normalized: list[tuple[str, str]] = [] + + for key, value in params: + if key in comma_params: + if key not in grouped: + ordered.append(key) + grouped.setdefault(key, []).append(value) + else: + normalized.append((key, value)) + + if not grouped: + return + + for key in ordered: + values: list[str] = [] + for raw in grouped[key]: + for part in raw.split(","): + part = part.strip() + if part and part not in values: + values.append(part) + if values: + normalized.append((key, ",".join(values))) + + request.url = request.url.copy_with(params=normalized) + + b3_flags = os.getenv("X_B3_FLAGS", "1") + + async def sign_oauth1_request(request: httpx.Request) -> None: + request.headers["X-B3-Flags"] = b3_flags + headers = dict(request.headers) + content_type = headers.get("Content-Type", "") + body: str | None = None + if content_type.startswith("application/x-www-form-urlencoded"): + body_bytes = request.content or b"" + body = body_bytes.decode("utf-8") + signed_url, signed_headers, _ = oauth1_client.sign( + str(request.url), + http_method=request.method, + body=body, + headers=headers, + ) + request.url = httpx.URL(signed_url) + request.headers.update(signed_headers) + + async def log_request(request: httpx.Request) -> None: + if not debug_enabled: + return + LOGGER.info("X API request %s %s", request.method, request.url) + + async def log_response(response: httpx.Response) -> None: + if not debug_enabled: + return + LOGGER.info( + "X API response %s %s -> %s", + response.request.method, + response.request.url, + response.status_code, + ) + if response.status_code >= 400: + transaction_id = response.headers.get("x-transaction-id") + if transaction_id: + LOGGER.warning("X API x-transaction-id: %s", transaction_id) + body = await response.aread() + text = body.decode("utf-8", errors="replace") + if len(text) > 1000: + text = text[:1000] + "..." + LOGGER.warning("X API error body: %s", text) + + client = httpx.AsyncClient( + base_url=base_url, + headers={}, + timeout=timeout, + event_hooks={ + "request": [normalize_query_params, sign_oauth1_request, log_request], + "response": [log_response], + }, + ) + return FastMCP.from_openapi( + openapi_spec=filtered_spec, + client=client, + name="X API MCP", + ) + + +def main() -> None: + host = os.getenv("MCP_HOST", "127.0.0.1") + port = int(os.getenv("MCP_PORT", "8000")) + mcp = create_mcp() + mcp.run(transport="http", host=host, port=port) + + +if __name__ == "__main__": + main() diff --git a/test_grok_mcp.py b/test_grok_mcp.py new file mode 100644 index 0000000..71d9734 --- /dev/null +++ b/test_grok_mcp.py @@ -0,0 +1,68 @@ +import os +from pathlib import Path + +from xai_sdk import Client +from xai_sdk.chat import user +from xai_sdk.tools import mcp + + +def load_env() -> None: + env_path = Path(__file__).resolve().parent / ".env" + if not env_path.exists(): + return + try: + from dotenv import load_dotenv + except ImportError: + return + load_dotenv(env_path) + + +def get_xai_config() -> tuple[str, str]: + api_key = os.getenv("XAI_API_KEY", "").strip() + if not api_key: + raise RuntimeError("Set XAI_API_KEY to call Grok.") + model = os.getenv("XAI_MODEL", "grok-4-1-fast-reasoning") + return api_key, model + + +def main() -> None: + load_env() + server_url = os.getenv("MCP_SERVER_URL", "http://127.0.0.1:8000/mcp") + api_key, model = get_xai_config() + + client = Client(api_key=api_key) + chat = client.chat.create( + model=model, + tools=[ + mcp( + server_url=server_url, + ) + ], + include=["verbose_streaming"], + ) + + chat.append( + user('dm @taycaldwell the message "bot.grokcommand sent this message with xMCP". If we receive an error please explicity inform us what the error is. This incldues trace ids.') + ) + + print("Starting chat stream...\n") + + is_thinking = True + for response, chunk in chat.stream(): + if response.usage.reasoning_tokens and is_thinking: + print("\rGrok is thinking...", end="", flush=True) + is_thinking = False + + for tool_call in chunk.tool_calls: + print(f"\nGrok calling tool: {tool_call.function.name}") + print(f"With args: {tool_call.function.arguments}") + + if chunk.content: + print(chunk.content, end="", flush=True) + + print("\n\nFinal usage:", response.usage) + print("Server-side tool usage:", response.server_side_tool_usage) + + +if __name__ == "__main__": + main()