refactor: XHS + Exa 全面切换到 mcporter MCP 后端

- xiaohongshu.py: 删除网页爬取代码,100% 走 mcporter + xiaohongshu-mcp
- exa_search.py: 删除直接 API 调用,100% 走 mcporter + exa MCP(免 Key)
- cli.py install: 新增 _install_mcporter(),自动装 mcporter + 配置 exa/xhs
- cli.py configure: 移除 exa-key 和 xhs-cookie(不再需要)
- doctor.py: 更新 tier 1 描述

净减 166 行代码,架构更简单:
之前: 每个渠道两套后端代码(mcporter + fallback)
现在: 每个渠道一套后端(mcporter only)
This commit is contained in:
Panniantong 2026-02-24 13:20:23 +01:00
parent 4e0c008d42
commit bb9ce6f7f1
4 changed files with 241 additions and 407 deletions

View file

@ -1,18 +1,13 @@
# -*- coding: utf-8 -*-
"""Exa semantic search — the search backbone for Agent Reach.
"""Exa semantic search — via mcporter + Exa MCP server.
Backend priority:
1. mcporter + Exa MCP server (OAuth, no API key needed)
2. Direct Exa API (requires EXA_API_KEY)
Swap to: Tavily, SerpAPI, or any search API
Backend: Exa MCP at mcp.exa.ai (OAuth, no API key needed)
Requires: mcporter CLI
"""
import os
import json
import shutil
import subprocess
import requests
from .base import Channel, SearchResult
from typing import List
@ -20,160 +15,96 @@ from typing import List
class ExaSearchChannel(Channel):
name = "exa_search"
description = "全网语义搜索(同时支持 Reddit/Twitter 搜索)"
backends = ["Exa MCP Server", "Exa API"]
backends = ["exa-mcp"]
tier = 1
API_URL = "https://api.exa.ai/search"
def _has_mcporter_exa(self):
"""Check if mcporter CLI is available and exa MCP is configured."""
def _mcporter_ok(self) -> bool:
if not shutil.which("mcporter"):
return False
try:
result = subprocess.run(
["mcporter", "list"],
capture_output=True, text=True, timeout=10,
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
return "exa" in result.stdout
return "exa" in r.stdout
except Exception:
return False
def _mcporter_call(self, tool_call: str, timeout: int = 30) -> str:
"""Call an MCP tool via mcporter and return the output."""
result = subprocess.run(
["mcporter", "call", tool_call],
def _call(self, expr: str, timeout: int = 30) -> str:
r = subprocess.run(
["mcporter", "call", expr],
capture_output=True, text=True, timeout=timeout,
)
if result.returncode != 0:
raise RuntimeError(result.stderr or result.stdout)
return result.stdout
if r.returncode != 0:
raise RuntimeError(r.stderr or r.stdout)
return r.stdout
# ── Channel interface ──
def can_handle(self, url: str) -> bool:
return False # Search-only channel, doesn't read URLs
return False # search-only
async def read(self, url: str, config=None) -> None:
async def read(self, url: str, config=None):
raise NotImplementedError("Exa is a search engine, not a reader")
def check(self, config=None):
# Priority 1: mcporter
if self._has_mcporter_exa():
return "ok", "MCP 已连接,免 Key 直接可用(全网搜索 + Reddit + Twitter"
# Priority 2: API key
key = None
if config:
key = config.get("exa_api_key")
if not key:
key = os.environ.get("EXA_API_KEY")
if key:
return "ok", "API Key 已配置,全网搜索可用"
return "off", "注册 exa.ai 获取免费 Key配置一下就能用。或安装 mcporter 免 Key 使用"
def _get_key(self, config=None) -> str:
if config:
key = config.get("exa_api_key")
if key:
return key
key = os.environ.get("EXA_API_KEY")
if key:
return key
return ""
if not shutil.which("mcporter"):
return "off", (
"需要 mcporter。安装npm install -g mcporter && "
"mcporter config add exa https://mcp.exa.ai/mcp"
)
if not self._mcporter_ok():
return "off", "mcporter 已装但 Exa 未配置。运行mcporter config add exa https://mcp.exa.ai/mcp"
return "ok", "MCP 已连接,免 Key 直接可用(全网搜索 + Reddit + Twitter"
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
limit = kwargs.get("limit", 5)
# Priority 1: mcporter + Exa MCP
if self._has_mcporter_exa():
return await self._search_via_mcp(query, limit)
# Priority 2: Direct API
api_key = self._get_key(config)
if not api_key:
if not self._mcporter_ok():
raise ValueError(
"Exa search not configured.\n\n"
"Option 1 (easiest): Install mcporter — no API key needed:\n"
" npm install -g mcporter && mcporter config add exa https://mcp.exa.ai/mcp\n\n"
"Option 2: Get a free API key:\n"
" Sign up at https://exa.ai (1000 searches/month free)\n"
" Then run: agent-reach configure exa-key YOUR_KEY"
"Exa 搜索需要 mcporter。安装\n"
" npm install -g mcporter\n"
" mcporter config add exa https://mcp.exa.ai/mcp"
)
return await self._search_via_api(query, api_key, limit)
async def _search_via_mcp(self, query: str, limit: int) -> List[SearchResult]:
"""Search via mcporter + Exa MCP server."""
# Escape quotes in query
safe_query = query.replace('"', '\\"')
output = self._mcporter_call(
f'exa.web_search_exa(query: "{safe_query}", numResults: {min(limit, 10)})',
limit = kwargs.get("limit", 5)
safe_q = query.replace('"', '\\"')
out = self._call(
f'exa.web_search_exa(query: "{safe_q}", numResults: {min(limit, 10)})',
timeout=30,
)
return self._parse_output(out, limit)
# mcporter returns formatted text blocks like:
# Title: ...
# URL: ...
# Published Date: ...
# Text: ...
# ── Parse mcporter text output ──
def _parse_output(self, text: str, limit: int) -> List[SearchResult]:
"""Parse mcporter's Title/URL/Text block format."""
results = []
current = {}
cur = {}
for line in output.split("\n"):
for line in text.split("\n"):
line = line.strip()
if line.startswith("Title: "):
if current.get("title"):
results.append(SearchResult(
title=current.get("title", ""),
url=current.get("url", ""),
snippet=current.get("text", ""),
date=current.get("date", ""),
score=0,
))
current = {"title": line[7:]}
if cur.get("title"):
results.append(self._make_result(cur))
cur = {"title": line[7:]}
elif line.startswith("URL: "):
current["url"] = line[5:]
cur["url"] = line[5:]
elif line.startswith("Published Date: "):
current["date"] = line[16:]
cur["date"] = line[16:]
elif line.startswith("Text: "):
current["text"] = line[6:]
elif current.get("text") is not None and line:
# Continue text block
current["text"] += " " + line
cur["text"] = line[6:]
elif "text" in cur and line:
cur["text"] += " " + line
# Don't forget the last entry
if current.get("title"):
results.append(SearchResult(
title=current.get("title", ""),
url=current.get("url", ""),
snippet=current.get("text", "")[:500],
date=current.get("date", ""),
score=0,
))
if cur.get("title"):
results.append(self._make_result(cur))
return results[:limit]
async def _search_via_api(self, query: str, api_key: str, limit: int) -> List[SearchResult]:
"""Search via direct Exa API."""
resp = requests.post(
self.API_URL,
headers={"Content-Type": "application/json", "x-api-key": api_key},
json={
"query": query,
"numResults": min(limit, 10),
"type": "auto",
"contents": {"text": {"maxCharacters": 500}},
},
timeout=15,
@staticmethod
def _make_result(d: dict) -> SearchResult:
return SearchResult(
title=d.get("title", ""),
url=d.get("url", ""),
snippet=d.get("text", "")[:500],
date=d.get("date", ""),
score=0,
)
resp.raise_for_status()
results = []
for item in resp.json().get("results", []):
results.append(SearchResult(
title=item.get("title", ""),
url=item.get("url", ""),
snippet=item.get("text", ""),
date=item.get("publishedDate", ""),
score=item.get("score", 0),
))
return results

View file

@ -1,120 +1,133 @@
# -*- coding: utf-8 -*-
"""XiaoHongShu (小红书) — via MCP server or cookie-based web scraping.
"""XiaoHongShu (小红书) — via mcporter + xiaohongshu MCP server.
Backend priority:
1. mcporter + xiaohongshu MCP server (internal API, reliable)
2. Direct web scraping with cookies (fallback, may be blocked by anti-bot)
Swap to: any XHS access method
Backend: xiaohongshu-mcp server (internal API, reliable)
Requires: mcporter CLI + xiaohongshu MCP server running
"""
import re
import json
import shutil
import subprocess
import requests
from urllib.parse import urlparse
from .base import Channel, ReadResult, SearchResult
from typing import List
from typing import List, Optional
class XiaoHongShuChannel(Channel):
name = "xiaohongshu"
description = "小红书笔记"
backends = ["XHS MCP Server", "XHS Web API"]
backends = ["xiaohongshu-mcp"]
tier = 2
def _has_mcporter(self):
"""Check if mcporter CLI is available and xiaohongshu MCP is configured."""
def _mcporter_ok(self) -> bool:
"""Check if mcporter + xiaohongshu MCP is available."""
if not shutil.which("mcporter"):
return False
try:
result = subprocess.run(
["mcporter", "list"],
capture_output=True, text=True, timeout=10,
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
return "xiaohongshu" in result.stdout
return "xiaohongshu" in r.stdout
except Exception:
return False
def _mcporter_call(self, tool_call: str, timeout: int = 30) -> str:
"""Call an MCP tool via mcporter and return the output."""
result = subprocess.run(
["mcporter", "call", tool_call],
def _call(self, expr: str, timeout: int = 30) -> str:
r = subprocess.run(
["mcporter", "call", expr],
capture_output=True, text=True, timeout=timeout,
)
if result.returncode != 0:
raise RuntimeError(result.stderr or result.stdout)
return result.stdout
if r.returncode != 0:
raise RuntimeError(r.stderr or r.stdout)
return r.stdout
# ── Channel interface ──
def can_handle(self, url: str) -> bool:
domain = urlparse(url).netloc.lower()
return "xiaohongshu.com" in domain or "xhslink.com" in domain
d = urlparse(url).netloc.lower()
return "xiaohongshu.com" in d or "xhslink.com" in d
def check(self, config=None):
if self._has_mcporter():
# Check login status
try:
output = self._mcporter_call("xiaohongshu.check_login_status()")
if "已登录" in output or "logged" in output.lower():
return "ok", "MCP 已连接,完整可用(阅读、搜索、发帖、评论、点赞)"
else:
return "warn", "MCP 已连接但未登录。运行 agent-reach 后用小红书扫码登录"
except Exception:
return "warn", "mcporter 可用但小红书 MCP 连接失败,检查服务是否在运行"
cookie = config.get("xhs_cookie") if config else None
if cookie:
return "ok", "Cookie 已配置(注意:服务器端可能被反爬拦截)"
return "off", "需要配置 Cookie 才能访问。导入浏览器 Cookie 即可agent-reach configure --from-browser chrome"
if not shutil.which("mcporter"):
return "off", (
"需要 mcporter + xiaohongshu-mcp。安装\n"
" npm install -g mcporter\n"
" 详见 https://github.com/user/xiaohongshu-mcp"
)
if not self._mcporter_ok():
return "off", (
"mcporter 已装但小红书 MCP 未配置。运行:\n"
" mcporter config add xiaohongshu http://localhost:18060/mcp"
)
try:
out = self._call("xiaohongshu.check_login_status()", timeout=10)
if "已登录" in out or "logged" in out.lower():
return "ok", "完整可用(阅读、搜索、发帖、评论、点赞)"
return "warn", "MCP 已连接但未登录,需扫码登录"
except Exception:
return "warn", "MCP 连接异常,检查 xiaohongshu-mcp 服务是否在运行"
async def read(self, url: str, config=None) -> ReadResult:
note_id = self._extract_note_id(url)
# Priority 1: mcporter + MCP server
if self._has_mcporter() and note_id:
try:
return await self._read_via_mcp(note_id, url)
except Exception:
pass # Fall through to web scraping
# Priority 2: Web scraping with cookies
cookie = config.get("xhs_cookie") if config else None
if not cookie:
if not self._mcporter_ok():
return ReadResult(
title="XiaoHongShu",
content="⚠️ XiaoHongShu requires cookies to access.\n"
"Set up: agent-reach configure xhs-cookie \"YOUR_COOKIE_STRING\"\n"
"How to get it: install Cookie-Editor extension → go to xiaohongshu.com → Export → Header String\n\n"
"💡 Tip: If you have mcporter + xiaohongshu MCP server, it works without cookies.\n"
"Install: pip install mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp",
url=url,
platform="xiaohongshu",
content=(
"⚠️ 小红书需要 mcporter + xiaohongshu-mcp 才能使用。\n\n"
"安装步骤:\n"
"1. npm install -g mcporter\n"
"2. 安装 xiaohongshu-mcp 服务\n"
"3. mcporter config add xiaohongshu http://localhost:18060/mcp\n"
"4. 运行 agent-reach install --env=auto"
),
url=url, platform="xiaohongshu",
)
note_id = self._extract_note_id(url)
if not note_id:
from agent_reach.channels.web import WebChannel
return await WebChannel().read(url, config)
return ReadResult(
title="XiaoHongShu",
content=f"⚠️ 无法从 URL 提取笔记 ID: {url}",
url=url, platform="xiaohongshu",
)
return await self._read_via_web(note_id, url, cookie)
# Step 1: get xsec_token from feeds
xsec_token = self._find_token(note_id)
if not xsec_token:
return ReadResult(
title="XiaoHongShu",
content=(
f"⚠️ 无法获取笔记 {note_id} 的访问令牌。\n"
"小红书需要 xsec_token 才能读取笔记详情。\n"
"请先通过搜索找到这篇笔记,或直接使用搜索功能。"
),
url=url, platform="xiaohongshu",
)
# Step 2: get detail
out = self._call(
f'xiaohongshu.get_feed_detail(feed_id: "{note_id}", xsec_token: "{xsec_token}")',
timeout=15,
)
return ReadResult(
title=self._extract_title(out) or f"XHS {note_id}",
content=out.strip(),
url=url, platform="xiaohongshu",
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
"""Search XiaoHongShu via MCP server."""
if not self._has_mcporter():
if not self._mcporter_ok():
raise ValueError(
"XiaoHongShu search requires mcporter + xiaohongshu MCP server.\n"
"Install: pip install mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp"
"小红书搜索需要 mcporter + xiaohongshu-mcp。\n"
"安装: npm install -g mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp"
)
limit = kwargs.get("limit", 10)
output = self._mcporter_call(
f'xiaohongshu.search_feeds(keyword: "{query}")',
timeout=30,
)
safe_q = query.replace('"', '\\"')
out = self._call(f'xiaohongshu.search_feeds(keyword: "{safe_q}")', timeout=30)
results = []
try:
data = json.loads(output)
data = json.loads(out)
for item in data.get("feeds", [])[:limit]:
card = item.get("noteCard", {})
user = card.get("user", {})
@ -127,150 +140,29 @@ class XiaoHongShuChannel(Channel):
))
except (json.JSONDecodeError, KeyError):
pass
return results
async def _read_via_mcp(self, note_id: str, url: str) -> ReadResult:
"""Read a note via MCP server: search → get xsec_token → get detail."""
# Step 1: Get xsec_token by listing feeds or searching
# Try to find the note in recent feeds first
output = self._mcporter_call("xiaohongshu.list_feeds()", timeout=15)
xsec_token = None
try:
data = json.loads(output)
for feed in data.get("feeds", []):
if feed.get("id") == note_id:
xsec_token = feed.get("xsecToken", "")
break
except (json.JSONDecodeError, KeyError):
pass
# If not found in feeds, search for it
if not xsec_token:
# Use a generic token - XHS MCP may accept it
xsec_token = ""
if not xsec_token:
return ReadResult(
title="XiaoHongShu",
content=f"⚠️ 无法获取笔记 {note_id} 的访问令牌。\n"
"请先通过首页或搜索找到这篇笔记。",
url=url,
platform="xiaohongshu",
)
# Step 2: Get detail
output = self._mcporter_call(
f'xiaohongshu.get_feed_detail(feed_id: "{note_id}", xsec_token: "{xsec_token}")',
timeout=15,
)
# Parse MCP output (it's typically formatted text, not JSON)
title = ""
content = output.strip()
author = ""
# Try to extract structured info if it's JSON
try:
data = json.loads(output)
if isinstance(data, dict):
title = data.get("title", data.get("displayTitle", ""))
content = data.get("desc", data.get("content", output))
author = data.get("user", {}).get("nickname", "")
except (json.JSONDecodeError, ValueError):
# MCP returns plain text - use as-is
lines = content.split("\n")
if lines:
title = lines[0][:80]
return ReadResult(
title=title or f"XHS Note {note_id}",
content=content,
url=url,
author=author,
platform="xiaohongshu",
)
async def _read_via_web(self, note_id: str, url: str, cookie: str) -> ReadResult:
"""Read a note via direct web scraping (fallback)."""
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Cookie": cookie,
"Referer": "https://www.xiaohongshu.com/",
}
resp = requests.get(
f"https://www.xiaohongshu.com/explore/{note_id}",
headers=headers,
timeout=15,
allow_redirects=False,
)
# Check for anti-bot redirect
if resp.status_code in (301, 302):
location = resp.headers.get("Location", "")
if "404" in location or "sec_" in location:
return ReadResult(
title="XiaoHongShu",
content="⚠️ XiaoHongShu blocked this request (anti-bot protection).\n"
"Web scraping doesn't work from server IPs.\n\n"
"💡 Better approach: use mcporter + xiaohongshu MCP server:\n"
" mcporter config add xiaohongshu http://localhost:18060/mcp\n"
" Then agent-reach will use the MCP API automatically.",
url=url,
platform="xiaohongshu",
)
resp.raise_for_status()
html = resp.text
title, content, author = self._parse_html(html)
return ReadResult(
title=title or f"XHS Note {note_id}",
content=content or "Could not extract content. Cookie may be expired.",
url=url,
author=author,
platform="xiaohongshu",
)
# ── Helpers ──
def _extract_note_id(self, url: str) -> str:
"""Extract note ID from various XHS URL formats."""
path = urlparse(url).path
parts = path.strip("/").split("/")
if parts:
return parts[-1]
parts = urlparse(url).path.strip("/").split("/")
return parts[-1] if parts else ""
def _find_token(self, note_id: str) -> Optional[str]:
"""Try to find xsec_token for a note from feeds."""
try:
out = self._call("xiaohongshu.list_feeds()", timeout=15)
data = json.loads(out)
for feed in data.get("feeds", []):
if feed.get("id") == note_id:
return feed.get("xsecToken", "")
except Exception:
pass
return None
def _extract_title(self, text: str) -> str:
for line in text.split("\n"):
line = line.strip()
if line and not line.startswith(("{", "[", "#", "http")):
return line[:80]
return ""
def _parse_html(self, html: str):
"""Extract title, content, author from XHS HTML."""
title = ""
content = ""
author = ""
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*({.*?})\s*</script>', html, re.DOTALL)
if match:
try:
state = json.loads(match.group(1).replace('undefined', 'null'))
note_data = state.get("note", {}).get("noteDetailMap", {})
if note_data:
first_note = list(note_data.values())[0]
note = first_note.get("note", {})
title = note.get("title", "")
content = note.get("desc", "")
author = note.get("user", {}).get("nickname", "")
except (json.JSONDecodeError, KeyError, IndexError):
pass
if not title:
m = re.search(r'<title>(.*?)</title>', html)
if m:
title = m.group(1)
if not content:
m = re.search(r'<meta name="description" content="(.*?)"', html)
if m:
content = m.group(1)
return title, content, author

View file

@ -73,18 +73,14 @@ def main():
p_install = sub.add_parser("install", help="One-shot installer with flags")
p_install.add_argument("--env", choices=["local", "server", "auto"], default="auto",
help="Environment: local, server, or auto-detect")
p_install.add_argument("--search", choices=["yes", "no"], default="yes",
help="Enable web search (needs free Exa API key)")
p_install.add_argument("--proxy", default="",
help="Residential proxy for Reddit/Bilibili (http://user:pass@ip:port)")
p_install.add_argument("--exa-key", default="",
help="Exa API key (get free at https://exa.ai)")
# ── configure ──
p_conf = sub.add_parser("configure", help="Set a config value or auto-extract from browser")
p_conf.add_argument("key", nargs="?", default=None,
choices=["exa-key", "proxy", "github-token", "groq-key",
"twitter-cookies", "xhs-cookie", "youtube-cookies"],
choices=["proxy", "github-token", "groq-key",
"twitter-cookies", "youtube-cookies"],
help="What to configure (omit if using --from-browser)")
p_conf.add_argument("value", nargs="*", help="The value(s) to set")
p_conf.add_argument("--from-browser", metavar="BROWSER",
@ -149,21 +145,14 @@ def _cmd_install(args):
print(f"💻 Environment: Local computer (auto-detected)")
# Apply explicit flags
if args.exa_key:
config.set("exa_api_key", args.exa_key)
print(f"✅ Exa search key configured")
if args.proxy:
config.set("reddit_proxy", args.proxy)
config.set("bilibili_proxy", args.proxy)
print(f"✅ Proxy configured for Reddit + Bilibili")
# Auto-detect Exa key from environment
if not config.get("exa_api_key") and not args.exa_key:
env_key = os.environ.get("EXA_API_KEY") or os.environ.get("exa_api_key")
if env_key:
config.set("exa_api_key", env_key)
print(f"✅ Exa key auto-detected from environment")
# ── mcporter (for Exa search + XiaoHongShu) ──
print()
_install_mcporter()
# Auto-import cookies on local computers
if env == "local":
@ -204,13 +193,6 @@ def _cmd_install(args):
ok = sum(1 for r in results.values() if r["status"] == "ok")
total = len(results)
# What's missing — only mention Exa if not configured
if not config.get("exa_api_key"):
print()
print("🔍 Recommended: unlock search with a free Exa API key")
print(" agent-reach configure exa-key YOUR_KEY")
print(" Get free key: https://exa.ai")
# Final status
print()
print(format_report(results))
@ -218,6 +200,74 @@ def _cmd_install(args):
print(f"✅ Installation complete! {ok}/{total} channels active.")
def _install_mcporter():
"""Install mcporter and configure Exa + XiaoHongShu MCP servers."""
import shutil
import subprocess
print("📦 Setting up mcporter (search + XiaoHongShu backend)...")
if shutil.which("mcporter"):
print(" ✅ mcporter already installed")
else:
# Check for npm/npx
if not shutil.which("npm") and not shutil.which("npx"):
print(" ⚠️ mcporter requires Node.js. Install Node.js first:")
print(" https://nodejs.org/ or: curl -fsSL https://fnm.vercel.app/install | bash")
return
try:
subprocess.run(
["npm", "install", "-g", "mcporter"],
capture_output=True, text=True, timeout=60,
)
if shutil.which("mcporter"):
print(" ✅ mcporter installed")
else:
print(" ❌ mcporter install failed. Try manually: npm install -g mcporter")
return
except Exception as e:
print(f" ❌ mcporter install failed: {e}")
return
# Configure Exa MCP (free, no key needed)
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
if "exa" not in r.stdout:
subprocess.run(
["mcporter", "config", "add", "exa", "https://mcp.exa.ai/mcp"],
capture_output=True, text=True, timeout=10,
)
print(" ✅ Exa search configured (free, no API key needed)")
else:
print(" ✅ Exa search already configured")
except Exception:
print(" ⚠️ Could not configure Exa. Run manually: mcporter config add exa https://mcp.exa.ai/mcp")
# Check XiaoHongShu MCP (only if server is running)
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
if "xiaohongshu" in r.stdout:
print(" ✅ XiaoHongShu MCP already configured")
else:
# Check if XHS MCP server is running on localhost:18060
import requests
try:
requests.get("http://localhost:18060/", timeout=3)
subprocess.run(
["mcporter", "config", "add", "xiaohongshu", "http://localhost:18060/mcp"],
capture_output=True, text=True, timeout=10,
)
print(" ✅ XiaoHongShu MCP auto-detected and configured")
except Exception:
print(" ⬜ XiaoHongShu MCP not detected (optional — install xiaohongshu-mcp for XHS support)")
except Exception:
pass
def _detect_environment():
"""Auto-detect if running on local computer or server."""
import os
@ -323,23 +373,6 @@ def _cmd_configure(args):
except Exception as e:
print(f"❌ Failed: {e}")
elif args.key == "exa-key":
config.set("exa_api_key", value)
print(f"✅ Exa key configured!")
print("Testing search...", end=" ")
try:
import asyncio
from agent_reach.core import AgentReach
eyes = AgentReach(config)
results = asyncio.run(eyes.search("test", num_results=1))
if results:
print("✅ Search works!")
else:
print("⚠️ No results, but API connected.")
except Exception as e:
print(f"❌ Failed: {e}")
elif args.key == "twitter-cookies":
# Accept two formats:
# 1. auth_token ct0 (two separate values)
@ -387,28 +420,6 @@ def _cmd_configure(args):
print(" 1. agent-reach configure twitter-cookies AUTH_TOKEN CT0")
print(' 2. agent-reach configure twitter-cookies "auth_token=xxx; ct0=yyy; ..."')
elif args.key == "xhs-cookie":
config.set("xhs_cookie", value)
print(f"✅ XiaoHongShu cookie configured!")
print("Testing XHS access...", end=" ")
try:
import requests
resp = requests.get(
"https://www.xiaohongshu.com/",
headers={
"User-Agent": "Mozilla/5.0",
"Cookie": value,
},
timeout=10,
)
if resp.status_code == 200 and "xiaohongshu" in resp.text.lower():
print("✅ XiaoHongShu works!")
else:
print(f"⚠️ Got status {resp.status_code}, cookie might be expired")
except Exception as e:
print(f"❌ Failed: {e}")
elif args.key == "youtube-cookies":
config.set("youtube_cookies_from", value)
print(f"✅ YouTube cookie source configured: {value}")

View file

@ -49,12 +49,12 @@ def format_report(results: Dict[str, dict]) -> str:
tier1 = {k: r for k, r in results.items() if r["tier"] == 1}
if tier1:
lines.append("")
lines.append("🔍 搜索(免费 Exa Key 即可解锁):")
lines.append("🔍 搜索(mcporter 即可解锁):")
for key, r in tier1.items():
if r["status"] == "ok":
lines.append(f"{r['name']}")
lines.append(f"{r['name']}{r['message']}")
else:
lines.append(f"{r['name']}注册 exa.ai 获取免费 Key配置一下就能用")
lines.append(f"{r['name']}{r['message']}")
# Tier 2 — optional setup
tier2 = {k: r for k, r in results.items() if r["tier"] == 2}