diff --git a/agent_reach/channels/exa_search.py b/agent_reach/channels/exa_search.py index c0feee8..07edb6c 100644 --- a/agent_reach/channels/exa_search.py +++ b/agent_reach/channels/exa_search.py @@ -1,11 +1,17 @@ # -*- coding: utf-8 -*- """Exa semantic search — the search backbone for Agent Reach. -Backend: Exa API (https://exa.ai) — free 1000 searches/month +Backend priority: +1. mcporter + Exa MCP server (OAuth, no API key needed) +2. Direct Exa API (requires EXA_API_KEY) + Swap to: Tavily, SerpAPI, or any search API """ import os +import json +import shutil +import subprocess import requests from .base import Channel, SearchResult from typing import List @@ -14,18 +20,56 @@ from typing import List class ExaSearchChannel(Channel): name = "exa_search" description = "全网语义搜索(同时支持 Reddit/Twitter 搜索)" - backends = ["Exa API"] - requires_config = ["exa_api_key"] + backends = ["Exa MCP Server", "Exa API"] tier = 1 API_URL = "https://api.exa.ai/search" + def _has_mcporter_exa(self): + """Check if mcporter CLI is available and exa MCP is configured.""" + if not shutil.which("mcporter"): + return False + try: + result = subprocess.run( + ["mcporter", "list"], + capture_output=True, text=True, timeout=10, + ) + return "exa" in result.stdout + except Exception: + return False + + def _mcporter_call(self, tool_call: str, timeout: int = 30) -> str: + """Call an MCP tool via mcporter and return the output.""" + result = subprocess.run( + ["mcporter", "call", tool_call], + capture_output=True, text=True, timeout=timeout, + ) + if result.returncode != 0: + raise RuntimeError(result.stderr or result.stdout) + return result.stdout + def can_handle(self, url: str) -> bool: return False # Search-only channel, doesn't read URLs async def read(self, url: str, config=None) -> None: raise NotImplementedError("Exa is a search engine, not a reader") + def check(self, config=None): + # Priority 1: mcporter + if self._has_mcporter_exa(): + return "ok", "MCP 已连接,免 Key 直接可用(全网搜索 + Reddit + Twitter)" + + # Priority 2: API key + key = None + if config: + key = config.get("exa_api_key") + if not key: + key = os.environ.get("EXA_API_KEY") + if key: + return "ok", "API Key 已配置,全网搜索可用" + + return "off", "注册 exa.ai 获取免费 Key,配置一下就能用。或安装 mcporter 免 Key 使用" + def _get_key(self, config=None) -> str: if config: key = config.get("exa_api_key") @@ -34,16 +78,82 @@ class ExaSearchChannel(Channel): key = os.environ.get("EXA_API_KEY") if key: return key - raise ValueError( - "Exa API key not configured.\n" - "Get a free key at https://exa.ai (1000 searches/month free)\n" - "Then run: agent-reach setup" - ) + return "" async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - api_key = self._get_key(config) limit = kwargs.get("limit", 5) + # Priority 1: mcporter + Exa MCP + if self._has_mcporter_exa(): + return await self._search_via_mcp(query, limit) + + # Priority 2: Direct API + api_key = self._get_key(config) + if not api_key: + raise ValueError( + "Exa search not configured.\n\n" + "Option 1 (easiest): Install mcporter — no API key needed:\n" + " npm install -g mcporter && mcporter config add exa https://mcp.exa.ai/mcp\n\n" + "Option 2: Get a free API key:\n" + " Sign up at https://exa.ai (1000 searches/month free)\n" + " Then run: agent-reach configure exa-key YOUR_KEY" + ) + + return await self._search_via_api(query, api_key, limit) + + async def _search_via_mcp(self, query: str, limit: int) -> List[SearchResult]: + """Search via mcporter + Exa MCP server.""" + # Escape quotes in query + safe_query = query.replace('"', '\\"') + output = self._mcporter_call( + f'exa.web_search_exa(query: "{safe_query}", numResults: {min(limit, 10)})', + timeout=30, + ) + + # mcporter returns formatted text blocks like: + # Title: ... + # URL: ... + # Published Date: ... + # Text: ... + results = [] + current = {} + + for line in output.split("\n"): + line = line.strip() + if line.startswith("Title: "): + if current.get("title"): + results.append(SearchResult( + title=current.get("title", ""), + url=current.get("url", ""), + snippet=current.get("text", ""), + date=current.get("date", ""), + score=0, + )) + current = {"title": line[7:]} + elif line.startswith("URL: "): + current["url"] = line[5:] + elif line.startswith("Published Date: "): + current["date"] = line[16:] + elif line.startswith("Text: "): + current["text"] = line[6:] + elif current.get("text") is not None and line: + # Continue text block + current["text"] += " " + line + + # Don't forget the last entry + if current.get("title"): + results.append(SearchResult( + title=current.get("title", ""), + url=current.get("url", ""), + snippet=current.get("text", "")[:500], + date=current.get("date", ""), + score=0, + )) + + return results[:limit] + + async def _search_via_api(self, query: str, api_key: str, limit: int) -> List[SearchResult]: + """Search via direct Exa API.""" resp = requests.post( self.API_URL, headers={"Content-Type": "application/json", "x-api-key": api_key}, diff --git a/agent_reach/channels/xiaohongshu.py b/agent_reach/channels/xiaohongshu.py index cf4e80c..4406e3f 100644 --- a/agent_reach/channels/xiaohongshu.py +++ b/agent_reach/channels/xiaohongshu.py @@ -1,68 +1,230 @@ # -*- coding: utf-8 -*- -"""XiaoHongShu (小红书) — via cookie-based API access. +"""XiaoHongShu (小红书) — via MCP server or cookie-based web scraping. + +Backend priority: +1. mcporter + xiaohongshu MCP server (internal API, reliable) +2. Direct web scraping with cookies (fallback, may be blocked by anti-bot) -Backend: XHS web API + cookies Swap to: any XHS access method """ import re import json +import shutil +import subprocess import requests from urllib.parse import urlparse -from .base import Channel, ReadResult +from .base import Channel, ReadResult, SearchResult +from typing import List class XiaoHongShuChannel(Channel): name = "xiaohongshu" description = "小红书笔记" - backends = ["XHS Web API"] + backends = ["XHS MCP Server", "XHS Web API"] tier = 2 + def _has_mcporter(self): + """Check if mcporter CLI is available and xiaohongshu MCP is configured.""" + if not shutil.which("mcporter"): + return False + try: + result = subprocess.run( + ["mcporter", "list"], + capture_output=True, text=True, timeout=10, + ) + return "xiaohongshu" in result.stdout + except Exception: + return False + + def _mcporter_call(self, tool_call: str, timeout: int = 30) -> str: + """Call an MCP tool via mcporter and return the output.""" + result = subprocess.run( + ["mcporter", "call", tool_call], + capture_output=True, text=True, timeout=timeout, + ) + if result.returncode != 0: + raise RuntimeError(result.stderr or result.stdout) + return result.stdout + def can_handle(self, url: str) -> bool: domain = urlparse(url).netloc.lower() return "xiaohongshu.com" in domain or "xhslink.com" in domain def check(self, config=None): + if self._has_mcporter(): + # Check login status + try: + output = self._mcporter_call("xiaohongshu.check_login_status()") + if "已登录" in output or "logged" in output.lower(): + return "ok", "MCP 已连接,完整可用(阅读、搜索、发帖、评论、点赞)" + else: + return "warn", "MCP 已连接但未登录。运行 agent-reach 后用小红书扫码登录" + except Exception: + return "warn", "mcporter 可用但小红书 MCP 连接失败,检查服务是否在运行" + cookie = config.get("xhs_cookie") if config else None if cookie: - return "ok", "Cookie 已配置,完整可用" + return "ok", "Cookie 已配置(注意:服务器端可能被反爬拦截)" return "off", "需要配置 Cookie 才能访问。导入浏览器 Cookie 即可:agent-reach configure --from-browser chrome" async def read(self, url: str, config=None) -> ReadResult: - cookie = config.get("xhs_cookie") if config else None + note_id = self._extract_note_id(url) + # Priority 1: mcporter + MCP server + if self._has_mcporter() and note_id: + try: + return await self._read_via_mcp(note_id, url) + except Exception: + pass # Fall through to web scraping + + # Priority 2: Web scraping with cookies + cookie = config.get("xhs_cookie") if config else None if not cookie: return ReadResult( title="XiaoHongShu", content="⚠️ XiaoHongShu requires cookies to access.\n" "Set up: agent-reach configure xhs-cookie \"YOUR_COOKIE_STRING\"\n" - "How to get it: install Cookie-Editor extension → go to xiaohongshu.com → Export → Header String", + "How to get it: install Cookie-Editor extension → go to xiaohongshu.com → Export → Header String\n\n" + "💡 Tip: If you have mcporter + xiaohongshu MCP server, it works without cookies.\n" + "Install: pip install mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp", url=url, platform="xiaohongshu", ) - # Extract note ID from URL - note_id = self._extract_note_id(url) if not note_id: from agent_reach.channels.web import WebChannel return await WebChannel().read(url, config) + return await self._read_via_web(note_id, url, cookie) + + async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: + """Search XiaoHongShu via MCP server.""" + if not self._has_mcporter(): + raise ValueError( + "XiaoHongShu search requires mcporter + xiaohongshu MCP server.\n" + "Install: pip install mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp" + ) + + limit = kwargs.get("limit", 10) + output = self._mcporter_call( + f'xiaohongshu.search_feeds(keyword: "{query}")', + timeout=30, + ) + + results = [] + try: + data = json.loads(output) + for item in data.get("feeds", [])[:limit]: + card = item.get("noteCard", {}) + user = card.get("user", {}) + interact = card.get("interactInfo", {}) + results.append(SearchResult( + title=card.get("displayTitle", ""), + url=f"https://www.xiaohongshu.com/explore/{item.get('id', '')}", + snippet=f"👤 {user.get('nickname', '')} · ❤ {interact.get('likedCount', '0')}", + score=0, + )) + except (json.JSONDecodeError, KeyError): + pass + + return results + + async def _read_via_mcp(self, note_id: str, url: str) -> ReadResult: + """Read a note via MCP server: search → get xsec_token → get detail.""" + # Step 1: Get xsec_token by listing feeds or searching + # Try to find the note in recent feeds first + output = self._mcporter_call("xiaohongshu.list_feeds()", timeout=15) + xsec_token = None + + try: + data = json.loads(output) + for feed in data.get("feeds", []): + if feed.get("id") == note_id: + xsec_token = feed.get("xsecToken", "") + break + except (json.JSONDecodeError, KeyError): + pass + + # If not found in feeds, search for it + if not xsec_token: + # Use a generic token - XHS MCP may accept it + xsec_token = "" + + if not xsec_token: + return ReadResult( + title="XiaoHongShu", + content=f"⚠️ 无法获取笔记 {note_id} 的访问令牌。\n" + "请先通过首页或搜索找到这篇笔记。", + url=url, + platform="xiaohongshu", + ) + + # Step 2: Get detail + output = self._mcporter_call( + f'xiaohongshu.get_feed_detail(feed_id: "{note_id}", xsec_token: "{xsec_token}")', + timeout=15, + ) + + # Parse MCP output (it's typically formatted text, not JSON) + title = "" + content = output.strip() + author = "" + + # Try to extract structured info if it's JSON + try: + data = json.loads(output) + if isinstance(data, dict): + title = data.get("title", data.get("displayTitle", "")) + content = data.get("desc", data.get("content", output)) + author = data.get("user", {}).get("nickname", "") + except (json.JSONDecodeError, ValueError): + # MCP returns plain text - use as-is + lines = content.split("\n") + if lines: + title = lines[0][:80] + + return ReadResult( + title=title or f"XHS Note {note_id}", + content=content, + url=url, + author=author, + platform="xiaohongshu", + ) + + async def _read_via_web(self, note_id: str, url: str, cookie: str) -> ReadResult: + """Read a note via direct web scraping (fallback).""" headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Cookie": cookie, "Referer": "https://www.xiaohongshu.com/", } - # Fetch note page resp = requests.get( f"https://www.xiaohongshu.com/explore/{note_id}", headers=headers, timeout=15, + allow_redirects=False, ) + + # Check for anti-bot redirect + if resp.status_code in (301, 302): + location = resp.headers.get("Location", "") + if "404" in location or "sec_" in location: + return ReadResult( + title="XiaoHongShu", + content="⚠️ XiaoHongShu blocked this request (anti-bot protection).\n" + "Web scraping doesn't work from server IPs.\n\n" + "💡 Better approach: use mcporter + xiaohongshu MCP server:\n" + " mcporter config add xiaohongshu http://localhost:18060/mcp\n" + " Then agent-reach will use the MCP API automatically.", + url=url, + platform="xiaohongshu", + ) + resp.raise_for_status() html = resp.text - # Extract note data from HTML title, content, author = self._parse_html(html) return ReadResult( @@ -75,9 +237,6 @@ class XiaoHongShuChannel(Channel): def _extract_note_id(self, url: str) -> str: """Extract note ID from various XHS URL formats.""" - # https://www.xiaohongshu.com/explore/xxxxx - # https://www.xiaohongshu.com/discovery/item/xxxxx - # https://xhslink.com/xxxxx path = urlparse(url).path parts = path.strip("/").split("/") if parts: @@ -90,11 +249,9 @@ class XiaoHongShuChannel(Channel): content = "" author = "" - # Try to find JSON data in page match = re.search(r'window\.__INITIAL_STATE__\s*=\s*({.*?})\s*', html, re.DOTALL) if match: try: - # XHS embeds note data in initial state state = json.loads(match.group(1).replace('undefined', 'null')) note_data = state.get("note", {}).get("noteDetailMap", {}) if note_data: @@ -106,7 +263,6 @@ class XiaoHongShuChannel(Channel): except (json.JSONDecodeError, KeyError, IndexError): pass - # Fallback: extract from meta tags if not title: m = re.search(r'(.*?)', html) if m: