diff --git a/agent_reach/channels/exa_search.py b/agent_reach/channels/exa_search.py index 07edb6c..0a2e3a0 100644 --- a/agent_reach/channels/exa_search.py +++ b/agent_reach/channels/exa_search.py @@ -1,18 +1,13 @@ # -*- coding: utf-8 -*- -"""Exa semantic search — the search backbone for Agent Reach. +"""Exa semantic search — via mcporter + Exa MCP server. -Backend priority: -1. mcporter + Exa MCP server (OAuth, no API key needed) -2. Direct Exa API (requires EXA_API_KEY) - -Swap to: Tavily, SerpAPI, or any search API +Backend: Exa MCP at mcp.exa.ai (OAuth, no API key needed) +Requires: mcporter CLI """ -import os import json import shutil import subprocess -import requests from .base import Channel, SearchResult from typing import List @@ -20,160 +15,96 @@ from typing import List class ExaSearchChannel(Channel): name = "exa_search" description = "全网语义搜索(同时支持 Reddit/Twitter 搜索)" - backends = ["Exa MCP Server", "Exa API"] + backends = ["exa-mcp"] tier = 1 - API_URL = "https://api.exa.ai/search" - - def _has_mcporter_exa(self): - """Check if mcporter CLI is available and exa MCP is configured.""" + def _mcporter_ok(self) -> bool: if not shutil.which("mcporter"): return False try: - result = subprocess.run( - ["mcporter", "list"], - capture_output=True, text=True, timeout=10, + r = subprocess.run( + ["mcporter", "list"], capture_output=True, text=True, timeout=10 ) - return "exa" in result.stdout + return "exa" in r.stdout except Exception: return False - def _mcporter_call(self, tool_call: str, timeout: int = 30) -> str: - """Call an MCP tool via mcporter and return the output.""" - result = subprocess.run( - ["mcporter", "call", tool_call], + def _call(self, expr: str, timeout: int = 30) -> str: + r = subprocess.run( + ["mcporter", "call", expr], capture_output=True, text=True, timeout=timeout, ) - if result.returncode != 0: - raise RuntimeError(result.stderr or result.stdout) - return result.stdout + if r.returncode != 0: + raise RuntimeError(r.stderr or r.stdout) + return r.stdout + + # ── Channel interface ── def can_handle(self, url: str) -> bool: - return False # Search-only channel, doesn't read URLs + return False # search-only - async def read(self, url: str, config=None) -> None: + async def read(self, url: str, config=None): raise NotImplementedError("Exa is a search engine, not a reader") def check(self, config=None): - # Priority 1: mcporter - if self._has_mcporter_exa(): - return "ok", "MCP 已连接,免 Key 直接可用(全网搜索 + Reddit + Twitter)" - - # Priority 2: API key - key = None - if config: - key = config.get("exa_api_key") - if not key: - key = os.environ.get("EXA_API_KEY") - if key: - return "ok", "API Key 已配置,全网搜索可用" - - return "off", "注册 exa.ai 获取免费 Key,配置一下就能用。或安装 mcporter 免 Key 使用" - - def _get_key(self, config=None) -> str: - if config: - key = config.get("exa_api_key") - if key: - return key - key = os.environ.get("EXA_API_KEY") - if key: - return key - return "" + if not shutil.which("mcporter"): + return "off", ( + "需要 mcporter。安装:npm install -g mcporter && " + "mcporter config add exa https://mcp.exa.ai/mcp" + ) + if not self._mcporter_ok(): + return "off", "mcporter 已装但 Exa 未配置。运行:mcporter config add exa https://mcp.exa.ai/mcp" + return "ok", "MCP 已连接,免 Key 直接可用(全网搜索 + Reddit + Twitter)" async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - limit = kwargs.get("limit", 5) - - # Priority 1: mcporter + Exa MCP - if self._has_mcporter_exa(): - return await self._search_via_mcp(query, limit) - - # Priority 2: Direct API - api_key = self._get_key(config) - if not api_key: + if not self._mcporter_ok(): raise ValueError( - "Exa search not configured.\n\n" - "Option 1 (easiest): Install mcporter — no API key needed:\n" - " npm install -g mcporter && mcporter config add exa https://mcp.exa.ai/mcp\n\n" - "Option 2: Get a free API key:\n" - " Sign up at https://exa.ai (1000 searches/month free)\n" - " Then run: agent-reach configure exa-key YOUR_KEY" + "Exa 搜索需要 mcporter。安装:\n" + " npm install -g mcporter\n" + " mcporter config add exa https://mcp.exa.ai/mcp" ) - return await self._search_via_api(query, api_key, limit) - - async def _search_via_mcp(self, query: str, limit: int) -> List[SearchResult]: - """Search via mcporter + Exa MCP server.""" - # Escape quotes in query - safe_query = query.replace('"', '\\"') - output = self._mcporter_call( - f'exa.web_search_exa(query: "{safe_query}", numResults: {min(limit, 10)})', + limit = kwargs.get("limit", 5) + safe_q = query.replace('"', '\\"') + out = self._call( + f'exa.web_search_exa(query: "{safe_q}", numResults: {min(limit, 10)})', timeout=30, ) + return self._parse_output(out, limit) - # mcporter returns formatted text blocks like: - # Title: ... - # URL: ... - # Published Date: ... - # Text: ... + # ── Parse mcporter text output ── + + def _parse_output(self, text: str, limit: int) -> List[SearchResult]: + """Parse mcporter's Title/URL/Text block format.""" results = [] - current = {} + cur = {} - for line in output.split("\n"): + for line in text.split("\n"): line = line.strip() if line.startswith("Title: "): - if current.get("title"): - results.append(SearchResult( - title=current.get("title", ""), - url=current.get("url", ""), - snippet=current.get("text", ""), - date=current.get("date", ""), - score=0, - )) - current = {"title": line[7:]} + if cur.get("title"): + results.append(self._make_result(cur)) + cur = {"title": line[7:]} elif line.startswith("URL: "): - current["url"] = line[5:] + cur["url"] = line[5:] elif line.startswith("Published Date: "): - current["date"] = line[16:] + cur["date"] = line[16:] elif line.startswith("Text: "): - current["text"] = line[6:] - elif current.get("text") is not None and line: - # Continue text block - current["text"] += " " + line + cur["text"] = line[6:] + elif "text" in cur and line: + cur["text"] += " " + line - # Don't forget the last entry - if current.get("title"): - results.append(SearchResult( - title=current.get("title", ""), - url=current.get("url", ""), - snippet=current.get("text", "")[:500], - date=current.get("date", ""), - score=0, - )) + if cur.get("title"): + results.append(self._make_result(cur)) return results[:limit] - async def _search_via_api(self, query: str, api_key: str, limit: int) -> List[SearchResult]: - """Search via direct Exa API.""" - resp = requests.post( - self.API_URL, - headers={"Content-Type": "application/json", "x-api-key": api_key}, - json={ - "query": query, - "numResults": min(limit, 10), - "type": "auto", - "contents": {"text": {"maxCharacters": 500}}, - }, - timeout=15, + @staticmethod + def _make_result(d: dict) -> SearchResult: + return SearchResult( + title=d.get("title", ""), + url=d.get("url", ""), + snippet=d.get("text", "")[:500], + date=d.get("date", ""), + score=0, ) - resp.raise_for_status() - - results = [] - for item in resp.json().get("results", []): - results.append(SearchResult( - title=item.get("title", ""), - url=item.get("url", ""), - snippet=item.get("text", ""), - date=item.get("publishedDate", ""), - score=item.get("score", 0), - )) - return results diff --git a/agent_reach/channels/xiaohongshu.py b/agent_reach/channels/xiaohongshu.py index 4406e3f..634de37 100644 --- a/agent_reach/channels/xiaohongshu.py +++ b/agent_reach/channels/xiaohongshu.py @@ -1,120 +1,133 @@ # -*- coding: utf-8 -*- -"""XiaoHongShu (小红书) — via MCP server or cookie-based web scraping. +"""XiaoHongShu (小红书) — via mcporter + xiaohongshu MCP server. -Backend priority: -1. mcporter + xiaohongshu MCP server (internal API, reliable) -2. Direct web scraping with cookies (fallback, may be blocked by anti-bot) - -Swap to: any XHS access method +Backend: xiaohongshu-mcp server (internal API, reliable) +Requires: mcporter CLI + xiaohongshu MCP server running """ -import re import json import shutil import subprocess -import requests from urllib.parse import urlparse from .base import Channel, ReadResult, SearchResult -from typing import List +from typing import List, Optional class XiaoHongShuChannel(Channel): name = "xiaohongshu" description = "小红书笔记" - backends = ["XHS MCP Server", "XHS Web API"] + backends = ["xiaohongshu-mcp"] tier = 2 - def _has_mcporter(self): - """Check if mcporter CLI is available and xiaohongshu MCP is configured.""" + def _mcporter_ok(self) -> bool: + """Check if mcporter + xiaohongshu MCP is available.""" if not shutil.which("mcporter"): return False try: - result = subprocess.run( - ["mcporter", "list"], - capture_output=True, text=True, timeout=10, + r = subprocess.run( + ["mcporter", "list"], capture_output=True, text=True, timeout=10 ) - return "xiaohongshu" in result.stdout + return "xiaohongshu" in r.stdout except Exception: return False - def _mcporter_call(self, tool_call: str, timeout: int = 30) -> str: - """Call an MCP tool via mcporter and return the output.""" - result = subprocess.run( - ["mcporter", "call", tool_call], + def _call(self, expr: str, timeout: int = 30) -> str: + r = subprocess.run( + ["mcporter", "call", expr], capture_output=True, text=True, timeout=timeout, ) - if result.returncode != 0: - raise RuntimeError(result.stderr or result.stdout) - return result.stdout + if r.returncode != 0: + raise RuntimeError(r.stderr or r.stdout) + return r.stdout + + # ── Channel interface ── def can_handle(self, url: str) -> bool: - domain = urlparse(url).netloc.lower() - return "xiaohongshu.com" in domain or "xhslink.com" in domain + d = urlparse(url).netloc.lower() + return "xiaohongshu.com" in d or "xhslink.com" in d def check(self, config=None): - if self._has_mcporter(): - # Check login status - try: - output = self._mcporter_call("xiaohongshu.check_login_status()") - if "已登录" in output or "logged" in output.lower(): - return "ok", "MCP 已连接,完整可用(阅读、搜索、发帖、评论、点赞)" - else: - return "warn", "MCP 已连接但未登录。运行 agent-reach 后用小红书扫码登录" - except Exception: - return "warn", "mcporter 可用但小红书 MCP 连接失败,检查服务是否在运行" - - cookie = config.get("xhs_cookie") if config else None - if cookie: - return "ok", "Cookie 已配置(注意:服务器端可能被反爬拦截)" - return "off", "需要配置 Cookie 才能访问。导入浏览器 Cookie 即可:agent-reach configure --from-browser chrome" + if not shutil.which("mcporter"): + return "off", ( + "需要 mcporter + xiaohongshu-mcp。安装:\n" + " npm install -g mcporter\n" + " 详见 https://github.com/user/xiaohongshu-mcp" + ) + if not self._mcporter_ok(): + return "off", ( + "mcporter 已装但小红书 MCP 未配置。运行:\n" + " mcporter config add xiaohongshu http://localhost:18060/mcp" + ) + try: + out = self._call("xiaohongshu.check_login_status()", timeout=10) + if "已登录" in out or "logged" in out.lower(): + return "ok", "完整可用(阅读、搜索、发帖、评论、点赞)" + return "warn", "MCP 已连接但未登录,需扫码登录" + except Exception: + return "warn", "MCP 连接异常,检查 xiaohongshu-mcp 服务是否在运行" async def read(self, url: str, config=None) -> ReadResult: - note_id = self._extract_note_id(url) - - # Priority 1: mcporter + MCP server - if self._has_mcporter() and note_id: - try: - return await self._read_via_mcp(note_id, url) - except Exception: - pass # Fall through to web scraping - - # Priority 2: Web scraping with cookies - cookie = config.get("xhs_cookie") if config else None - if not cookie: + if not self._mcporter_ok(): return ReadResult( title="XiaoHongShu", - content="⚠️ XiaoHongShu requires cookies to access.\n" - "Set up: agent-reach configure xhs-cookie \"YOUR_COOKIE_STRING\"\n" - "How to get it: install Cookie-Editor extension → go to xiaohongshu.com → Export → Header String\n\n" - "💡 Tip: If you have mcporter + xiaohongshu MCP server, it works without cookies.\n" - "Install: pip install mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp", - url=url, - platform="xiaohongshu", + content=( + "⚠️ 小红书需要 mcporter + xiaohongshu-mcp 才能使用。\n\n" + "安装步骤:\n" + "1. npm install -g mcporter\n" + "2. 安装 xiaohongshu-mcp 服务\n" + "3. mcporter config add xiaohongshu http://localhost:18060/mcp\n" + "4. 运行 agent-reach install --env=auto" + ), + url=url, platform="xiaohongshu", ) + note_id = self._extract_note_id(url) if not note_id: - from agent_reach.channels.web import WebChannel - return await WebChannel().read(url, config) + return ReadResult( + title="XiaoHongShu", + content=f"⚠️ 无法从 URL 提取笔记 ID: {url}", + url=url, platform="xiaohongshu", + ) - return await self._read_via_web(note_id, url, cookie) + # Step 1: get xsec_token from feeds + xsec_token = self._find_token(note_id) + + if not xsec_token: + return ReadResult( + title="XiaoHongShu", + content=( + f"⚠️ 无法获取笔记 {note_id} 的访问令牌。\n" + "小红书需要 xsec_token 才能读取笔记详情。\n" + "请先通过搜索找到这篇笔记,或直接使用搜索功能。" + ), + url=url, platform="xiaohongshu", + ) + + # Step 2: get detail + out = self._call( + f'xiaohongshu.get_feed_detail(feed_id: "{note_id}", xsec_token: "{xsec_token}")', + timeout=15, + ) + + return ReadResult( + title=self._extract_title(out) or f"XHS {note_id}", + content=out.strip(), + url=url, platform="xiaohongshu", + ) async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - """Search XiaoHongShu via MCP server.""" - if not self._has_mcporter(): + if not self._mcporter_ok(): raise ValueError( - "XiaoHongShu search requires mcporter + xiaohongshu MCP server.\n" - "Install: pip install mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp" + "小红书搜索需要 mcporter + xiaohongshu-mcp。\n" + "安装: npm install -g mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp" ) - limit = kwargs.get("limit", 10) - output = self._mcporter_call( - f'xiaohongshu.search_feeds(keyword: "{query}")', - timeout=30, - ) + safe_q = query.replace('"', '\\"') + out = self._call(f'xiaohongshu.search_feeds(keyword: "{safe_q}")', timeout=30) results = [] try: - data = json.loads(output) + data = json.loads(out) for item in data.get("feeds", [])[:limit]: card = item.get("noteCard", {}) user = card.get("user", {}) @@ -127,150 +140,29 @@ class XiaoHongShuChannel(Channel): )) except (json.JSONDecodeError, KeyError): pass - return results - async def _read_via_mcp(self, note_id: str, url: str) -> ReadResult: - """Read a note via MCP server: search → get xsec_token → get detail.""" - # Step 1: Get xsec_token by listing feeds or searching - # Try to find the note in recent feeds first - output = self._mcporter_call("xiaohongshu.list_feeds()", timeout=15) - xsec_token = None - - try: - data = json.loads(output) - for feed in data.get("feeds", []): - if feed.get("id") == note_id: - xsec_token = feed.get("xsecToken", "") - break - except (json.JSONDecodeError, KeyError): - pass - - # If not found in feeds, search for it - if not xsec_token: - # Use a generic token - XHS MCP may accept it - xsec_token = "" - - if not xsec_token: - return ReadResult( - title="XiaoHongShu", - content=f"⚠️ 无法获取笔记 {note_id} 的访问令牌。\n" - "请先通过首页或搜索找到这篇笔记。", - url=url, - platform="xiaohongshu", - ) - - # Step 2: Get detail - output = self._mcporter_call( - f'xiaohongshu.get_feed_detail(feed_id: "{note_id}", xsec_token: "{xsec_token}")', - timeout=15, - ) - - # Parse MCP output (it's typically formatted text, not JSON) - title = "" - content = output.strip() - author = "" - - # Try to extract structured info if it's JSON - try: - data = json.loads(output) - if isinstance(data, dict): - title = data.get("title", data.get("displayTitle", "")) - content = data.get("desc", data.get("content", output)) - author = data.get("user", {}).get("nickname", "") - except (json.JSONDecodeError, ValueError): - # MCP returns plain text - use as-is - lines = content.split("\n") - if lines: - title = lines[0][:80] - - return ReadResult( - title=title or f"XHS Note {note_id}", - content=content, - url=url, - author=author, - platform="xiaohongshu", - ) - - async def _read_via_web(self, note_id: str, url: str, cookie: str) -> ReadResult: - """Read a note via direct web scraping (fallback).""" - headers = { - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", - "Cookie": cookie, - "Referer": "https://www.xiaohongshu.com/", - } - - resp = requests.get( - f"https://www.xiaohongshu.com/explore/{note_id}", - headers=headers, - timeout=15, - allow_redirects=False, - ) - - # Check for anti-bot redirect - if resp.status_code in (301, 302): - location = resp.headers.get("Location", "") - if "404" in location or "sec_" in location: - return ReadResult( - title="XiaoHongShu", - content="⚠️ XiaoHongShu blocked this request (anti-bot protection).\n" - "Web scraping doesn't work from server IPs.\n\n" - "💡 Better approach: use mcporter + xiaohongshu MCP server:\n" - " mcporter config add xiaohongshu http://localhost:18060/mcp\n" - " Then agent-reach will use the MCP API automatically.", - url=url, - platform="xiaohongshu", - ) - - resp.raise_for_status() - html = resp.text - - title, content, author = self._parse_html(html) - - return ReadResult( - title=title or f"XHS Note {note_id}", - content=content or "Could not extract content. Cookie may be expired.", - url=url, - author=author, - platform="xiaohongshu", - ) + # ── Helpers ── def _extract_note_id(self, url: str) -> str: - """Extract note ID from various XHS URL formats.""" - path = urlparse(url).path - parts = path.strip("/").split("/") - if parts: - return parts[-1] + parts = urlparse(url).path.strip("/").split("/") + return parts[-1] if parts else "" + + def _find_token(self, note_id: str) -> Optional[str]: + """Try to find xsec_token for a note from feeds.""" + try: + out = self._call("xiaohongshu.list_feeds()", timeout=15) + data = json.loads(out) + for feed in data.get("feeds", []): + if feed.get("id") == note_id: + return feed.get("xsecToken", "") + except Exception: + pass + return None + + def _extract_title(self, text: str) -> str: + for line in text.split("\n"): + line = line.strip() + if line and not line.startswith(("{", "[", "#", "http")): + return line[:80] return "" - - def _parse_html(self, html: str): - """Extract title, content, author from XHS HTML.""" - title = "" - content = "" - author = "" - - match = re.search(r'window\.__INITIAL_STATE__\s*=\s*({.*?})\s*', html, re.DOTALL) - if match: - try: - state = json.loads(match.group(1).replace('undefined', 'null')) - note_data = state.get("note", {}).get("noteDetailMap", {}) - if note_data: - first_note = list(note_data.values())[0] - note = first_note.get("note", {}) - title = note.get("title", "") - content = note.get("desc", "") - author = note.get("user", {}).get("nickname", "") - except (json.JSONDecodeError, KeyError, IndexError): - pass - - if not title: - m = re.search(r'