From a37e9aa190f8d9e1bba665ba9e1dc9aff6c2a885 Mon Sep 17 00:00:00 2001 From: Panniantong Date: Thu, 26 Feb 2026 08:15:56 +0100 Subject: [PATCH] refactor: strip to installer + doctor + docs, remove read/search wrapper layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BREAKING CHANGE: Remove all `agent-reach read` and `agent-reach search-*` commands. Agent Reach is now an installer, configuration tool, and doctor — not a wrapper layer. After installation, agents call upstream tools directly (bird CLI, yt-dlp, mcporter, gh CLI, Jina Reader, etc.). What's kept: - agent-reach install: one-shot installer - agent-reach doctor: channel status overview - agent-reach configure: cookies, proxy, credentials - agent-reach setup: interactive wizard - SKILL.md: complete guide for agents to use upstream tools directly What's removed: - agent-reach read URL (and all channel read() methods) - agent-reach search-* commands (and all channel search() methods) - ReadResult / SearchResult data classes - URL routing system (get_channel_for_url) - All parsing/conversion logic (VTT, Reddit JSON, bird text parser, etc.) - MCP server read/search tools (kept only get_status) Net change: -1790 lines. Less code = fewer bugs. --- README.md | 28 +-- agent_reach/channels/__init__.py | 33 +-- agent_reach/channels/base.py | 126 +---------- agent_reach/channels/bilibili.py | 199 +---------------- agent_reach/channels/bosszhipin.py | 184 ++-------------- agent_reach/channels/exa_search.py | 112 ++-------- agent_reach/channels/github.py | 126 +---------- agent_reach/channels/linkedin.py | 273 ++--------------------- agent_reach/channels/reddit.py | 178 ++------------- agent_reach/channels/rss.py | 54 +---- agent_reach/channels/twitter.py | 292 ++----------------------- agent_reach/channels/web.py | 46 +--- agent_reach/channels/xiaohongshu.py | 234 ++------------------ agent_reach/channels/youtube.py | 119 +--------- agent_reach/cli.py | 160 +------------- agent_reach/core.py | 126 ++--------- agent_reach/integrations/mcp_server.py | 44 +--- agent_reach/skill/SKILL.md | 256 +++++++++++++++------- 18 files changed, 400 insertions(+), 2190 deletions(-) diff --git a/README.md b/README.md index 0f428b6..cf322a0 100644 --- a/README.md +++ b/README.md @@ -115,14 +115,14 @@ AI Agent 已经能帮你写代码、改文档、管项目——但你让它去 不需要任何配置,告诉 Agent 就行: -- "帮我看看这个链接" → 任意网页 -- "这个 GitHub 仓库是做什么的" → GitHub 仓库、Issue、代码 -- "这个视频讲了什么" → YouTube / B站字幕提取 -- "帮我看看这条推文" → Twitter 推文 -- "订阅这个 RSS" → RSS / Atom 源 -- "搜一下 GitHub 上有什么 LLM 框架" → GitHub 搜索 +- "帮我看看这个链接" → `curl https://r.jina.ai/URL` 读任意网页 +- "这个 GitHub 仓库是做什么的" → `gh repo view owner/repo` +- "这个视频讲了什么" → `yt-dlp --dump-json URL` 提取字幕 +- "帮我看看这条推文" → `bird read URL --json` +- "订阅这个 RSS" → `feedparser` 解析 +- "搜一下 GitHub 上有什么 LLM 框架" → `gh search repos "LLM framework"` -**不需要记命令。** Agent 自己知道该调什么。 +**不需要记命令。** Agent 读了 SKILL.md 之后自己知道该调什么。 --- @@ -134,9 +134,11 @@ AI Agent 已经能帮你写代码、改文档、管项目——但你让它去 Agent Reach 做的事情很简单:**帮你把这些选型和配置的活儿做完了。** +安装完成后,Agent 直接调用上游工具(bird CLI、yt-dlp、mcporter、gh CLI 等),不需要经过 Agent Reach 的包装层。 + ### 🔌 每个渠道都是可插拔的 -每个平台对应一个独立的 Python 文件,实现统一接口。**后端工具随时可以换**——哪天出了更好的工具,改一个文件就行,其他不用动。 +每个平台背后是一个独立的上游工具。**不满意?换掉就行。** ``` channels/ @@ -229,13 +231,13 @@ Star 一下,下次需要的时候能找到。⭐
AI Agent 怎么搜索 Twitter / X?不想付 API 费用 -Agent Reach 使用 [bird CLI](https://www.npmjs.com/package/@steipete/bird) 通过 Cookie 认证访问 Twitter,完全免费。安装 Agent Reach 后,用 Cookie-Editor 导出你的 Twitter Cookie,运行 `agent-reach configure twitter-cookies "your_cookies"` 即可。之后 Agent 就可以用 `agent-reach search-twitter "关键词"` 搜索推文了。 +Agent Reach 使用 [bird CLI](https://www.npmjs.com/package/@steipete/bird) 通过 Cookie 认证访问 Twitter,完全免费。安装 Agent Reach 后,用 Cookie-Editor 导出你的 Twitter Cookie,运行 `agent-reach configure twitter-cookies "your_cookies"` 即可。之后 Agent 就可以用 `bird search "关键词" --json` 搜索推文了。
How to search Twitter/X with AI agent for free (no API)? -Agent Reach uses the bird CLI with cookie auth — zero API fees. After installing, export your Twitter cookies with the Cookie-Editor extension, run `agent-reach configure twitter-cookies "your_cookies"`, then your agent can search with `agent-reach search-twitter "query"`. +Agent Reach uses the bird CLI with cookie auth — zero API fees. After installing, export your Twitter cookies with the Cookie-Editor extension, run `agent-reach configure twitter-cookies "your_cookies"`, then your agent can search with `bird search "query" --json`.
@@ -247,19 +249,19 @@ Reddit 封锁数据中心 IP。配置一个住宅代理即可解决:`agent-rea
How to get YouTube video transcripts for AI? -`agent-reach read https://youtube.com/watch?v=xxx` automatically extracts the transcript. Uses yt-dlp under the hood, supports multiple languages. No API key needed. +`yt-dlp --dump-json "https://youtube.com/watch?v=xxx"` extracts video metadata; `yt-dlp --write-sub --skip-download "URL"` extracts subtitles. Uses yt-dlp under the hood, supports multiple languages. No API key needed.
怎么让 AI Agent 读小红书? -小红书需要通过 Docker 运行一个 MCP 服务。安装 Docker 后,运行 `agent-reach install` 会自动配置。之后 Agent 就能用 `agent-reach read <小红书链接>` 或 `agent-reach search-xhs "关键词"` 了。 +小红书需要通过 Docker 运行一个 MCP 服务。安装 Docker 后,运行 `agent-reach install` 会自动配置。之后 Agent 就能用 `mcporter call 'xiaohongshu.get_feed_detail(...)'` 读取笔记或 `mcporter call 'xiaohongshu.search_feeds(keyword: "关键词")'` 搜索了。
Compatible with Claude Code / Cursor / OpenClaw / Windsurf? -Yes! Agent Reach is a standard CLI tool — any AI coding agent that can run shell commands can use it. Works with Claude Code, Cursor, OpenClaw, Windsurf, Codex, and more. Just `pip install agent-reach` and the agent can start using it immediately. +Yes! Agent Reach is an installer + configuration tool — any AI coding agent that can run shell commands can use it. Works with Claude Code, Cursor, OpenClaw, Windsurf, Codex, and more. Just `pip install agent-reach`, run `agent-reach install`, and the agent can start using the upstream tools immediately.
diff --git a/agent_reach/channels/__init__.py b/agent_reach/channels/__init__.py index 2971a13..a2f6669 100644 --- a/agent_reach/channels/__init__.py +++ b/agent_reach/channels/__init__.py @@ -1,14 +1,10 @@ # -*- coding: utf-8 -*- """ -Channel registry — routes URLs to the right channel. - -This is the core of Agent Reach' pluggable architecture. -Add a new channel: just create a file and register it here. -Swap a backend: just change the implementation inside the channel file. +Channel registry — lists all supported platforms for doctor checks. """ -from typing import Dict, List, Optional -from .base import Channel, ReadResult, SearchResult +from typing import List, Optional +from .base import Channel # Import all channels from .web import WebChannel @@ -24,7 +20,7 @@ from .linkedin import LinkedInChannel from .bosszhipin import BossZhipinChannel -# Channel registry — order matters (first match wins, web is last as fallback) +# Channel registry ALL_CHANNELS: List[Channel] = [ GitHubChannel(), TwitterChannel(), @@ -36,22 +32,9 @@ ALL_CHANNELS: List[Channel] = [ BossZhipinChannel(), RSSChannel(), ExaSearchChannel(), - WebChannel(), # Fallback — handles any URL + WebChannel(), ] -# Search-capable channels -SEARCH_CHANNELS: Dict[str, Channel] = { - ch.name: ch for ch in ALL_CHANNELS if ch.can_search() -} - - -def get_channel_for_url(url: str) -> Channel: - """Find the right channel for a URL.""" - for channel in ALL_CHANNELS: - if channel.can_handle(url): - return channel - return WebChannel() # Should never reach here, but just in case - def get_channel(name: str) -> Optional[Channel]: """Get a channel by name.""" @@ -67,7 +50,7 @@ def get_all_channels() -> List[Channel]: __all__ = [ - "Channel", "ReadResult", "SearchResult", - "ALL_CHANNELS", "SEARCH_CHANNELS", - "get_channel_for_url", "get_channel", "get_all_channels", + "Channel", + "ALL_CHANNELS", + "get_channel", "get_all_channels", ] diff --git a/agent_reach/channels/base.py b/agent_reach/channels/base.py index c311750..21551cf 100644 --- a/agent_reach/channels/base.py +++ b/agent_reach/channels/base.py @@ -1,110 +1,28 @@ # -*- coding: utf-8 -*- """ -Channel base class — the universal interface for all platforms. +Channel base class — platform availability checking. -Every channel (YouTube, Twitter, GitHub, etc.) implements this interface. -The backend tool can be swapped anytime without changing anything else. +Each channel represents a platform (YouTube, Twitter, GitHub, etc.) +and provides: + - can_handle(url) → does this URL belong to this platform? + - check(config) → is the upstream tool installed and configured? -Example: - class YouTubeChannel(Channel): - name = "youtube" - backends = ["yt-dlp"] # current backend, can be swapped - - async def read(self, url, config): - # Just call yt-dlp, return standardized dict - ... +After installation, agents call upstream tools directly. """ import shutil from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Tuple - - -@dataclass -class ReadResult: - """Standardized read result. Every channel returns this.""" - title: str - content: str - url: str - author: str = "" - date: str = "" - platform: str = "" - extra: dict = None - - def __post_init__(self): - self.extra = self.extra or {} - - def to_dict(self) -> dict: - d = { - "title": self.title, - "content": self.content, - "url": self.url, - "platform": self.platform, - } - if self.author: - d["author"] = self.author - if self.date: - d["date"] = self.date - if self.extra: - d["extra"] = self.extra - return d - - -@dataclass -class SearchResult: - """Standardized search result.""" - title: str - url: str - snippet: str = "" - author: str = "" - date: str = "" - score: float = 0 - extra: dict = None - - def __post_init__(self): - self.extra = self.extra or {} - - def to_dict(self) -> dict: - d = { - "title": self.title, - "url": self.url, - "snippet": self.snippet, - } - if self.author: - d["author"] = self.author - if self.date: - d["date"] = self.date - if self.extra: - d["extra"] = self.extra - return d +from typing import List, Tuple class Channel(ABC): - """ - Base class for all channels. - - Subclasses just need to implement: - - read(url, config) → ReadResult - - can_handle(url) → bool - - check(config) → (status, message) - - Optionally: - - search(query, config, **kwargs) → list[SearchResult] - """ + """Base class for all channels.""" name: str = "" # e.g. "youtube" - description: str = "" # e.g. "YouTube video transcripts" - backends: List[str] = [] # e.g. ["yt-dlp"] — what external tool is used - requires_config: List[str] = [] # e.g. ["reddit_proxy"] - requires_tools: List[str] = [] # e.g. ["yt-dlp"] + description: str = "" # e.g. "YouTube 视频和字幕" + backends: List[str] = [] # e.g. ["yt-dlp"] — what upstream tool is used tier: int = 0 # 0=zero-config, 1=needs free key, 2=needs setup - @abstractmethod - async def read(self, url: str, config=None) -> ReadResult: - """Read content from a URL. Must return ReadResult.""" - ... - @abstractmethod def can_handle(self, url: str) -> bool: """Check if this channel can handle this URL.""" @@ -112,29 +30,7 @@ class Channel(ABC): def check(self, config=None) -> Tuple[str, str]: """ - Check if this channel is available. + Check if this channel's upstream tool is available. Returns (status, message) where status is 'ok'/'warn'/'off'/'error'. """ - # Check required tools - for tool in self.requires_tools: - if not shutil.which(tool): - return "off", f"需要安装:pip install {tool}" - - # Check required config - for key in self.requires_config: - if config and not config.get(key): - return "off", f"需要配置 {key},运行 agent-reach setup" - return "ok", f"{'、'.join(self.backends) if self.backends else '内置'}" - - async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - """Search this platform. Override if supported.""" - raise NotImplementedError(f"{self.name} does not support search") - - def can_search(self) -> bool: - """Whether this channel supports search.""" - try: - # Check if search is overridden - return type(self).search is not Channel.search - except: - return False diff --git a/agent_reach/channels/bilibili.py b/agent_reach/channels/bilibili.py index 6ca2cf5..07843cd 100644 --- a/agent_reach/channels/bilibili.py +++ b/agent_reach/channels/bilibili.py @@ -1,207 +1,26 @@ # -*- coding: utf-8 -*- -"""Bilibili — via yt-dlp (same backend as YouTube). +"""Bilibili — check if yt-dlp is available.""" -Backend: yt-dlp (https://github.com/yt-dlp/yt-dlp) -yt-dlp natively supports Bilibili — video info, subtitles, and search. -""" - -import json +import os import shutil -import subprocess -from urllib.parse import urlparse -from .base import Channel, ReadResult, SearchResult -from typing import List +from .base import Channel class BilibiliChannel(Channel): name = "bilibili" - description = "B站视频信息和字幕" + description = "B站视频和字幕" backends = ["yt-dlp"] - requires_tools = ["yt-dlp"] - tier = 0 + tier = 1 def can_handle(self, url: str) -> bool: + from urllib.parse import urlparse d = urlparse(url).netloc.lower() return "bilibili.com" in d or "b23.tv" in d def check(self, config=None): if not shutil.which("yt-dlp"): return "off", "yt-dlp 未安装。安装:pip install yt-dlp" - proxy = config.get("bilibili_proxy") if config else None + proxy = (config.get("bilibili_proxy") if config else None) or os.environ.get("BILIBILI_PROXY") if proxy: - return "ok", "已配置代理,完整可用" - import os - is_server = bool(os.environ.get("SSH_CONNECTION") or os.path.exists("/etc/cloud")) - if is_server: - return "warn", "服务器 IP 可能被封,配置代理即可解决:agent-reach configure proxy URL" - return "ok", "本地直连可用" - - async def read(self, url: str, config=None) -> ReadResult: - if not shutil.which("yt-dlp"): - raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp") - - proxy = config.get("bilibili_proxy") if config else None - - # Get video info via yt-dlp - info = self._get_info(url, proxy) - if not info: - return ReadResult( - title="Bilibili", - content=f"⚠️ 无法获取视频信息: {url}\n服务器 IP 可能被封,配个代理:agent-reach configure proxy URL", - url=url, platform="bilibili", - ) - - title = info.get("title", url) - author = info.get("uploader", "") - desc = info.get("description", "") - - # Try subtitles - subtitle = self._get_subtitles(url, proxy) - content = desc - if subtitle: - content += f"\n\n## 字幕\n{subtitle}" - - return ReadResult( - title=title, content=content, url=url, - author=author, platform="bilibili", - extra={ - "view_count": info.get("view_count"), - "like_count": info.get("like_count"), - "duration": info.get("duration_string"), - }, - ) - - async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - """Search Bilibili. - - Strategy: - 1. Try yt-dlp bilisearch (works on local machines) - 2. Fallback to Exa site:bilibili.com (works on servers) - """ - if not shutil.which("yt-dlp"): - raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp") - - limit = kwargs.get("limit", 5) - proxy = config.get("bilibili_proxy") if config else None - - # Strategy 1: yt-dlp bilisearch - results = self._search_ytdlp(query, limit, proxy) - if results: - return results - - # Strategy 2: Exa fallback (server-friendly) - results = self._search_exa(query, limit) - if results: - return results - - return [] - - def _search_ytdlp(self, query: str, limit: int, proxy: str = None) -> List[SearchResult]: - """Search via yt-dlp bilisearch (needs local/Chinese IP).""" - cmd = [ - "yt-dlp", "--dump-json", "--no-download", - f"bilisearch{limit}:{query}", - ] - if proxy: - cmd += ["--proxy", proxy] - - try: - r = subprocess.run(cmd, capture_output=True, text=True, timeout=60) - if r.returncode != 0: - return [] - results = [] - for line in r.stdout.strip().split("\n"): - if not line.strip(): - continue - try: - d = json.loads(line) - vid = d.get("id", "") - url = d.get("webpage_url", f"https://www.bilibili.com/video/av{vid}") - results.append(SearchResult( - title=d.get("title", f"av{vid}"), - url=url, - snippet=f"👤 {d.get('uploader', '?')} · 👁 {d.get('view_count', '?')}", - extra={ - "view_count": d.get("view_count"), - "uploader": d.get("uploader"), - "duration": d.get("duration_string"), - }, - )) - except json.JSONDecodeError: - continue - return results - except subprocess.TimeoutExpired: - return [] - - def _search_exa(self, query: str, limit: int) -> List[SearchResult]: - """Fallback: search via Exa (site:bilibili.com). Works on any IP.""" - try: - r = subprocess.run( - ["mcporter", "call", - f'exa.web_search_exa(query: "site:bilibili.com {query}", numResults: {limit})'], - capture_output=True, text=True, timeout=30, - ) - if r.returncode != 0: - return [] - - results = [] - # Parse mcporter output: Title: / Author: / URL: / Text: blocks - title, author, url = "", "", "" - for line in r.stdout.split("\n"): - if line.startswith("Title: "): - title = line[7:].strip() - elif line.startswith("Author: "): - author = line[8:].strip() - elif line.startswith("URL: "): - url = line[5:].strip() - if url and "bilibili.com" in url: - results.append(SearchResult( - title=title or url, - url=url, - snippet=f"👤 {author}" if author else "(via Exa search)", - )) - title, author, url = "", "", "" - return results - except Exception: - return [] - - def _get_info(self, url: str, proxy: str = None) -> dict: - cmd = ["yt-dlp", "--dump-json", "--no-download", url] - if proxy: - cmd += ["--proxy", proxy] - try: - r = subprocess.run(cmd, capture_output=True, text=True, timeout=30) - if r.returncode == 0: - return json.loads(r.stdout) - except (subprocess.TimeoutExpired, json.JSONDecodeError): - pass - return {} - - def _get_subtitles(self, url: str, proxy: str = None) -> str: - import tempfile - from pathlib import Path - - with tempfile.TemporaryDirectory() as tmpdir: - cmd = [ - "yt-dlp", "--write-sub", "--write-auto-sub", - "--sub-lang", "zh-Hans,zh,en", - "--skip-download", "--sub-format", "vtt", - "-o", f"{tmpdir}/%(id)s.%(ext)s", url, - ] - if proxy: - cmd += ["--proxy", proxy] - try: - subprocess.run(cmd, capture_output=True, text=True, timeout=30) - for f in Path(tmpdir).glob("*.vtt"): - text = f.read_text(errors="replace") - lines = [] - for line in text.split("\n"): - line = line.strip() - if not line or line.startswith("WEBVTT") or "-->" in line or line.isdigit(): - continue - if line not in lines[-1:]: - lines.append(line) - return "\n".join(lines) - except subprocess.TimeoutExpired: - pass - return "" + return "ok", "可提取视频信息和字幕(代理已配置)" + return "ok", "可提取视频信息和字幕(本地环境)。服务器可能需要代理" diff --git a/agent_reach/channels/bosszhipin.py b/agent_reach/channels/bosszhipin.py index 8e9e95f..feea5ee 100644 --- a/agent_reach/channels/bosszhipin.py +++ b/agent_reach/channels/bosszhipin.py @@ -1,62 +1,9 @@ # -*- coding: utf-8 -*- -"""Boss直聘 (BOSS Zhipin) — via mcp-bosszp (MCP) or Jina Reader fallback. +"""Boss直聘 — check if mcp-bosszp is available.""" -Backend: mcp-bosszp (161 stars, FastMCP + Playwright) -Swap to: any Boss直聘 access tool -""" - -import json import shutil import subprocess -from urllib.parse import urlparse -from .base import Channel, ReadResult, SearchResult -from typing import List -import requests - - -def _mcporter_has_bosszhipin() -> bool: - """Check if mcporter has Boss直聘 MCP configured.""" - if not shutil.which("mcporter"): - return False - try: - r = subprocess.run( - ["mcporter", "list"], capture_output=True, text=True, timeout=10 - ) - # Check for various possible config names - out = r.stdout.lower() - return "boss" in out or "zhipin" in out or "bosszhipin" in out - except Exception: - return False - - -def _mcporter_call(expr: str, timeout: int = 30) -> str: - """Call a Boss直聘 MCP tool via mcporter.""" - r = subprocess.run( - ["mcporter", "call", expr], - capture_output=True, text=True, timeout=timeout, - ) - if r.returncode != 0: - raise RuntimeError(r.stderr or r.stdout) - return r.stdout - - -def _get_mcp_name() -> str: - """Get the actual MCP server name configured in mcporter.""" - try: - r = subprocess.run( - ["mcporter", "list"], capture_output=True, text=True, timeout=10 - ) - for line in r.stdout.split("\n"): - line_lower = line.strip().lower() - for name in ["bosszhipin", "boss-zp", "bosszp", "boss"]: - if name in line_lower: - # Extract the actual server name - parts = line.strip().split() - if parts: - return parts[0] - return "bosszhipin" - except Exception: - return "bosszhipin" +from .base import Channel class BossZhipinChannel(Channel): @@ -66,118 +13,29 @@ class BossZhipinChannel(Channel): tier = 2 def can_handle(self, url: str) -> bool: + from urllib.parse import urlparse domain = urlparse(url).netloc.lower() return "zhipin.com" in domain or "boss.com" in domain def check(self, config=None): - if _mcporter_has_bosszhipin(): - return "ok", "可搜索职位、向 HR 打招呼" - + if not shutil.which("mcporter"): + return "off", ( + "可通过 Jina Reader 读取职位页面。完整功能需要:\n" + " 1. git clone https://github.com/mucsbr/mcp-bosszp.git\n" + " 2. cd mcp-bosszp && pip install -r requirements.txt && playwright install chromium\n" + " 3. python boss_zhipin_fastmcp_v2.py(启动后扫码登录)\n" + " 4. mcporter config add bosszhipin http://localhost:8000/mcp" + ) + try: + r = subprocess.run( + ["mcporter", "list"], capture_output=True, text=True, timeout=10 + ) + out = r.stdout.lower() + if "boss" in out or "zhipin" in out: + return "ok", "可搜索职位、向 HR 打招呼" + except Exception: + pass return "off", ( - "可通过 Jina Reader 读取职位页面。完整功能需要:\n" - " 1. git clone https://github.com/mucsbr/mcp-bosszp.git\n" - " 2. cd mcp-bosszp && pip install -r requirements.txt && playwright install chromium\n" - " 3. python boss_zhipin_fastmcp_v2.py(启动后扫码登录)\n" - " 4. mcporter config add bosszhipin http://localhost:8000/mcp\n" - " 或用 Docker:docker-compose up -d\n" + "mcporter 已装但 Boss直聘 MCP 未配置。\n" " 详见 https://github.com/mucsbr/mcp-bosszp" ) - - async def read(self, url: str, config=None) -> ReadResult: - # Boss直聘 pages mostly work with Jina Reader - return await self._read_jina(url) - - async def _read_jina(self, url: str) -> ReadResult: - """Read Boss直聘 page via Jina Reader.""" - try: - resp = requests.get( - f"https://r.jina.ai/{url}", - headers={"Accept": "text/markdown"}, - timeout=15, - ) - resp.raise_for_status() - text = resp.text - - if len(text.strip()) < 50: - return ReadResult( - title="Boss直聘", - content=( - f"⚠️ 无法读取此页面内容: {url}\n\n" - "提示:\n" - "- 安装 mcp-bosszp 可解锁职位搜索和自动打招呼\n" - "- 详见 https://github.com/mucsbr/mcp-bosszp" - ), - url=url, - platform="bosszhipin", - ) - - return ReadResult( - title=text[:100] if text else url, - content=text, - url=url, - platform="bosszhipin", - ) - except Exception: - return ReadResult( - title="Boss直聘", - content=( - f"⚠️ 无法读取此 Boss直聘页面: {url}\n\n" - "提示:\n" - "- Boss直聘部分页面需要登录\n" - "- 安装 mcp-bosszp 可解锁完整功能\n" - "- 详见 https://github.com/mucsbr/mcp-bosszp" - ), - url=url, - platform="bosszhipin", - ) - - async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - limit = kwargs.get("limit", 10) - - # Try MCP search first - if _mcporter_has_bosszhipin(): - try: - return await self._search_mcp(query, limit, config) - except Exception: - pass - - # Fallback to Exa - from agent_reach.channels.exa_search import ExaSearchChannel - exa = ExaSearchChannel() - return await exa.search(f"site:zhipin.com {query}", config=config, limit=limit) - - async def _search_mcp(self, query: str, limit: int, config=None) -> List[SearchResult]: - """Search Boss直聘 via MCP.""" - server = _get_mcp_name() - try: - out = _mcporter_call( - f'{server}.get_recommend_jobs_tool(page: 1)', - timeout=30, - ) - return self._parse_jobs(out, limit) - except Exception: - return [] - - def _parse_jobs(self, text: str, limit: int) -> List[SearchResult]: - """Parse MCP job search output into SearchResults.""" - results = [] - try: - data = json.loads(text) - jobs = data if isinstance(data, list) else data.get("jobs", data.get("results", [])) - for job in jobs[:limit]: - if isinstance(job, dict): - title = job.get("title") or job.get("jobName", "") - company = job.get("company") or job.get("brandName", "") - salary = job.get("salary") or job.get("salaryDesc", "") - url = job.get("url", "") - snippet = f"🏢 {company}" if company else "" - if salary: - snippet += f" · 💰 {salary}" - results.append(SearchResult( - title=title, - url=url, - snippet=snippet, - )) - except (json.JSONDecodeError, KeyError): - pass - return results diff --git a/agent_reach/channels/exa_search.py b/agent_reach/channels/exa_search.py index 0a2e3a0..e264399 100644 --- a/agent_reach/channels/exa_search.py +++ b/agent_reach/channels/exa_search.py @@ -1,110 +1,36 @@ # -*- coding: utf-8 -*- -"""Exa semantic search — via mcporter + Exa MCP server. +"""Exa Search — check if mcporter + Exa MCP is available.""" -Backend: Exa MCP at mcp.exa.ai (OAuth, no API key needed) -Requires: mcporter CLI -""" - -import json import shutil import subprocess -from .base import Channel, SearchResult -from typing import List +from .base import Channel class ExaSearchChannel(Channel): name = "exa_search" - description = "全网语义搜索(同时支持 Reddit/Twitter 搜索)" - backends = ["exa-mcp"] - tier = 1 - - def _mcporter_ok(self) -> bool: - if not shutil.which("mcporter"): - return False - try: - r = subprocess.run( - ["mcporter", "list"], capture_output=True, text=True, timeout=10 - ) - return "exa" in r.stdout - except Exception: - return False - - def _call(self, expr: str, timeout: int = 30) -> str: - r = subprocess.run( - ["mcporter", "call", expr], - capture_output=True, text=True, timeout=timeout, - ) - if r.returncode != 0: - raise RuntimeError(r.stderr or r.stdout) - return r.stdout - - # ── Channel interface ── + description = "全网语义搜索" + backends = ["Exa via mcporter"] + tier = 0 def can_handle(self, url: str) -> bool: - return False # search-only - - async def read(self, url: str, config=None): - raise NotImplementedError("Exa is a search engine, not a reader") + return False # Search-only channel def check(self, config=None): if not shutil.which("mcporter"): return "off", ( - "需要 mcporter。安装:npm install -g mcporter && " - "mcporter config add exa https://mcp.exa.ai/mcp" - ) - if not self._mcporter_ok(): - return "off", "mcporter 已装但 Exa 未配置。运行:mcporter config add exa https://mcp.exa.ai/mcp" - return "ok", "MCP 已连接,免 Key 直接可用(全网搜索 + Reddit + Twitter)" - - async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - if not self._mcporter_ok(): - raise ValueError( - "Exa 搜索需要 mcporter。安装:\n" + "需要 mcporter + Exa MCP。安装:\n" " npm install -g mcporter\n" " mcporter config add exa https://mcp.exa.ai/mcp" ) - - limit = kwargs.get("limit", 5) - safe_q = query.replace('"', '\\"') - out = self._call( - f'exa.web_search_exa(query: "{safe_q}", numResults: {min(limit, 10)})', - timeout=30, - ) - return self._parse_output(out, limit) - - # ── Parse mcporter text output ── - - def _parse_output(self, text: str, limit: int) -> List[SearchResult]: - """Parse mcporter's Title/URL/Text block format.""" - results = [] - cur = {} - - for line in text.split("\n"): - line = line.strip() - if line.startswith("Title: "): - if cur.get("title"): - results.append(self._make_result(cur)) - cur = {"title": line[7:]} - elif line.startswith("URL: "): - cur["url"] = line[5:] - elif line.startswith("Published Date: "): - cur["date"] = line[16:] - elif line.startswith("Text: "): - cur["text"] = line[6:] - elif "text" in cur and line: - cur["text"] += " " + line - - if cur.get("title"): - results.append(self._make_result(cur)) - - return results[:limit] - - @staticmethod - def _make_result(d: dict) -> SearchResult: - return SearchResult( - title=d.get("title", ""), - url=d.get("url", ""), - snippet=d.get("text", "")[:500], - date=d.get("date", ""), - score=0, - ) + try: + r = subprocess.run( + ["mcporter", "list"], capture_output=True, text=True, timeout=10 + ) + if "exa" in r.stdout.lower(): + return "ok", "全网语义搜索可用(免费,无需 API Key)" + return "off", ( + "mcporter 已装但 Exa 未配置。运行:\n" + " mcporter config add exa https://mcp.exa.ai/mcp" + ) + except Exception: + return "off", "mcporter 连接异常" diff --git a/agent_reach/channels/github.py b/agent_reach/channels/github.py index bb4a937..790ba2b 100644 --- a/agent_reach/channels/github.py +++ b/agent_reach/channels/github.py @@ -1,16 +1,9 @@ # -*- coding: utf-8 -*- -"""GitHub — via gh CLI. +"""GitHub — check if gh CLI is available.""" -Backend: gh CLI (https://cli.github.com) -Swap to: GitHub REST API -""" - -import json import shutil import subprocess -from urllib.parse import urlparse -from .base import Channel, ReadResult, SearchResult -from typing import List +from .base import Channel class GitHubChannel(Channel): @@ -19,121 +12,18 @@ class GitHubChannel(Channel): backends = ["gh CLI"] tier = 0 - def _gh(self, args: list, timeout: int = 15) -> str: - r = subprocess.run( - ["gh"] + args, - capture_output=True, text=True, timeout=timeout, - ) - if r.returncode != 0: - raise RuntimeError(r.stderr or r.stdout) - return r.stdout - - def _gh_json(self, args: list, timeout: int = 15) -> dict: - return json.loads(self._gh(args + ["--json"], timeout)) - def can_handle(self, url: str) -> bool: + from urllib.parse import urlparse return "github.com" in urlparse(url).netloc.lower() def check(self, config=None): if not shutil.which("gh"): - return "warn", "gh CLI 未安装。安装:https://cli.github.com 。公开仓库仍可通过 Jina Reader 读取" + return "warn", "gh CLI 未安装。安装:https://cli.github.com" try: - self._gh(["auth", "status"], timeout=5) + subprocess.run( + ["gh", "auth", "status"], + capture_output=True, text=True, timeout=5 + ) return "ok", "完整可用(读取、搜索、Fork、Issue、PR 等)" except Exception: return "ok", "gh CLI 已装但未认证。运行 gh auth login 可解锁完整功能" - - async def read(self, url: str, config=None) -> ReadResult: - if not shutil.which("gh"): - # Fallback to Jina Reader for public repos - from agent_reach.channels.web import WebChannel - return await WebChannel().read(url, config) - - path = urlparse(url).path.strip("/").split("/") - if len(path) < 2: - from agent_reach.channels.web import WebChannel - return await WebChannel().read(url, config) - - owner, repo = path[0], path[1] - - # Issues / PRs - if len(path) >= 4 and path[2] in ("issues", "pull"): - return await self._read_issue(owner, repo, path[3], url) - - # Repo - return await self._read_repo(owner, repo, url) - - async def _read_repo(self, owner: str, repo: str, url: str) -> ReadResult: - slug = f"{owner}/{repo}" - try: - # Get repo info - info = self._gh(["repo", "view", slug]) - # Get README - try: - readme = self._gh( - ["api", f"repos/{slug}/readme", "--jq", ".content"], - timeout=10, - ) - import base64 - readme_text = base64.b64decode(readme).decode("utf-8", errors="replace") - except Exception: - readme_text = "" - - content = readme_text or info - return ReadResult( - title=slug, content=content, url=url, - author=owner, platform="github", - ) - except Exception: - from agent_reach.channels.web import WebChannel - return await WebChannel().read(url) - - async def _read_issue(self, owner: str, repo: str, num: str, url: str) -> ReadResult: - slug = f"{owner}/{repo}" - try: - out = self._gh(["issue", "view", num, "-R", slug]) - return ReadResult( - title=f"{slug}#{num}", content=out, url=url, - platform="github", - ) - except Exception: - # Might be a PR - try: - out = self._gh(["pr", "view", num, "-R", slug]) - return ReadResult( - title=f"{slug}#{num}", content=out, url=url, - platform="github", - ) - except Exception: - from agent_reach.channels.web import WebChannel - return await WebChannel().read(url) - - async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - if not shutil.which("gh"): - raise ValueError("GitHub search requires gh CLI. Install: https://cli.github.com") - - language = kwargs.get("language") - limit = kwargs.get("limit", 5) - - args = ["search", "repos", query, "--sort", "stars", f"--limit={limit}"] - if language: - args += [f"--language={language}"] - - out = self._gh(args, timeout=15) - results = [] - for line in out.strip().split("\n"): - if not line.strip(): - continue - parts = line.split("\t") - if len(parts) >= 1: - slug = parts[0].strip() - desc = parts[1].strip() if len(parts) > 1 else "" - stars = parts[3].strip() if len(parts) > 3 else "" - lang = parts[5].strip() if len(parts) > 5 else "" - results.append(SearchResult( - title=slug, - url=f"https://github.com/{slug}", - snippet=desc, - extra={"stars": stars, "language": lang}, - )) - return results diff --git a/agent_reach/channels/linkedin.py b/agent_reach/channels/linkedin.py index c9b509f..cb60ffa 100644 --- a/agent_reach/channels/linkedin.py +++ b/agent_reach/channels/linkedin.py @@ -1,268 +1,39 @@ # -*- coding: utf-8 -*- -"""LinkedIn — via linkedin-scraper-mcp (MCP) or Jina Reader fallback. - -Backend: linkedin-scraper-mcp (916 stars, Patchright browser automation) -Swap to: any LinkedIn access tool -""" +"""LinkedIn — check if linkedin-scraper-mcp is available.""" import shutil import subprocess -from urllib.parse import urlparse -from .base import Channel, ReadResult, SearchResult -from typing import List -import requests - - -def _mcporter_has_linkedin() -> bool: - """Check if mcporter has linkedin MCP configured.""" - if not shutil.which("mcporter"): - return False - try: - r = subprocess.run( - ["mcporter", "list"], capture_output=True, text=True, timeout=10 - ) - return "linkedin" in r.stdout.lower() - except Exception: - return False - - -def _mcporter_call(expr: str, timeout: int = 30) -> str: - """Call a LinkedIn MCP tool via mcporter.""" - r = subprocess.run( - ["mcporter", "call", expr], - capture_output=True, text=True, timeout=timeout, - ) - if r.returncode != 0: - raise RuntimeError(r.stderr or r.stdout) - return r.stdout +from .base import Channel class LinkedInChannel(Channel): name = "linkedin" - description = "LinkedIn 个人/公司 Profile 和职位" + description = "LinkedIn 职业社交" backends = ["linkedin-scraper-mcp", "Jina Reader"] tier = 2 def can_handle(self, url: str) -> bool: - domain = urlparse(url).netloc.lower() - return "linkedin.com" in domain + from urllib.parse import urlparse + return "linkedin.com" in urlparse(url).netloc.lower() def check(self, config=None): - if _mcporter_has_linkedin(): - return "ok", "完整可用(Profile、公司、职位搜索)" - - # Check if linkedin-scraper-mcp is installed as CLI - if shutil.which("linkedin-scraper-mcp"): - return "warn", ( - "linkedin-scraper-mcp 已安装但未接入 mcporter。运行:\n" - " 1. linkedin-scraper-mcp --login(在有浏览器的机器上登录)\n" - " 2. linkedin-scraper-mcp --transport streamable-http --port 8001\n" - " 3. mcporter config add linkedin http://localhost:8001/mcp" + if not shutil.which("mcporter"): + return "off", ( + "基本内容可通过 Jina Reader 读取。完整功能需要:\n" + " pip install linkedin-scraper-mcp\n" + " mcporter config add linkedin http://localhost:3000/mcp\n" + " 详见 https://github.com/stickerdaniel/linkedin-mcp-server" ) - + try: + r = subprocess.run( + ["mcporter", "list"], capture_output=True, text=True, timeout=10 + ) + if "linkedin" in r.stdout.lower(): + return "ok", "完整可用(Profile、公司、职位搜索)" + except Exception: + pass return "off", ( - "可通过 Jina Reader 读取部分内容。完整功能需要:\n" - " 1. pip install linkedin-scraper-mcp\n" - " 2. linkedin-scraper-mcp --login(在有浏览器的机器上登录)\n" - " 3. linkedin-scraper-mcp --transport streamable-http --port 8001\n" - " 4. mcporter config add linkedin http://localhost:8001/mcp\n" - " 详见 https://github.com/stickerdaniel/linkedin-mcp-server" + "mcporter 已装但 LinkedIn MCP 未配置。运行:\n" + " pip install linkedin-scraper-mcp\n" + " mcporter config add linkedin http://localhost:3000/mcp" ) - - async def read(self, url: str, config=None) -> ReadResult: - path = urlparse(url).path.strip("/") - - # Try MCP first - if _mcporter_has_linkedin(): - try: - if "/in/" in url: - return await self._read_profile_mcp(url) - elif "/company/" in url: - return await self._read_company_mcp(url) - elif "/jobs/view/" in url: - return await self._read_job_mcp(url) - except Exception: - pass # Fall through to Jina - - # Fallback: Jina Reader - return await self._read_jina(url) - - async def _read_profile_mcp(self, url: str) -> ReadResult: - """Read a LinkedIn profile via MCP.""" - import re - # Extract username from URL: /in/username/ - match = re.search(r"/in/([^/]+)", url) - if not match: - return await self._read_jina(url) - username = match.group(1) - safe_username = username.replace('"', '\\"') - out = _mcporter_call( - f'linkedin.get_person_profile(linkedin_username: "{safe_username}")', - timeout=60, - ) - return ReadResult( - title=self._extract_title(out) or f"LinkedIn Profile - {username}", - content=out.strip(), - url=url, - platform="linkedin", - ) - - async def _read_company_mcp(self, url: str) -> ReadResult: - """Read a LinkedIn company page via MCP.""" - import re - # Extract company name from URL: /company/name/ - match = re.search(r"/company/([^/]+)", url) - if not match: - return await self._read_jina(url) - company = match.group(1) - safe_company = company.replace('"', '\\"') - out = _mcporter_call( - f'linkedin.get_company_profile(company_name: "{safe_company}")', - timeout=60, - ) - return ReadResult( - title=self._extract_title(out) or "LinkedIn Company", - content=out.strip(), - url=url, - platform="linkedin", - ) - - async def _read_job_mcp(self, url: str) -> ReadResult: - """Read a LinkedIn job posting via MCP.""" - import re - match = re.search(r"/jobs/view/(\d+)", url) - if not match: - return await self._read_jina(url) - - job_id = match.group(1) - out = _mcporter_call( - f'linkedin.get_job_details(job_id: "{job_id}")', - timeout=30, - ) - return ReadResult( - title=self._extract_title(out) or f"LinkedIn Job {job_id}", - content=out.strip(), - url=url, - platform="linkedin", - ) - - async def _read_jina(self, url: str) -> ReadResult: - """Fallback: use Jina Reader.""" - try: - resp = requests.get( - f"https://r.jina.ai/{url}", - headers={"Accept": "text/markdown"}, - timeout=15, - ) - resp.raise_for_status() - text = resp.text - - # Check if content is usable - if len(text.strip()) < 100 or "Sign in" in text[:200]: - return ReadResult( - title="LinkedIn", - content=( - f"⚠️ LinkedIn 页面需要登录才能完整查看。\n\n" - f"URL: {url}\n\n" - "完整功能需安装 linkedin-scraper-mcp:\n" - " pip install linkedin-scraper-mcp\n" - " uvx linkedin-scraper-mcp --login\n" - " 详见 https://github.com/stickerdaniel/linkedin-mcp-server" - ), - url=url, - platform="linkedin", - ) - - return ReadResult( - title=text[:100] if text else url, - content=text, - url=url, - platform="linkedin", - ) - except Exception: - return ReadResult( - title="LinkedIn", - content=( - f"⚠️ 无法读取此 LinkedIn 页面: {url}\n\n" - "提示:\n" - "- LinkedIn 需要登录才能查看大部分内容\n" - "- 安装 linkedin-scraper-mcp 解锁完整功能\n" - "- 详见 https://github.com/stickerdaniel/linkedin-mcp-server" - ), - url=url, - platform="linkedin", - ) - - async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - limit = kwargs.get("limit", 10) - - # Try MCP search first - if _mcporter_has_linkedin(): - try: - return await self._search_mcp(query, limit) - except Exception: - pass - - # Fallback to Exa - from agent_reach.channels.exa_search import ExaSearchChannel - exa = ExaSearchChannel() - return await exa.search(f"site:linkedin.com {query}", config=config, limit=limit) - - async def _search_mcp(self, query: str, limit: int) -> List[SearchResult]: - """Search LinkedIn via MCP.""" - safe_q = query.replace('"', '\\"') - # Try job search first (most common use case) - try: - out = _mcporter_call( - f'linkedin.search_jobs(keywords: "{safe_q}")', - timeout=60, - ) - results = self._parse_search_results(out, "job") - if results: - return results[:limit] - except Exception: - pass - - # Try people search - try: - out = _mcporter_call( - f'linkedin.search_people(keywords: "{safe_q}")', - timeout=60, - ) - results = self._parse_search_results(out, "people") - if results: - return results - except Exception: - pass - - return [] - - def _parse_search_results(self, text: str, result_type: str) -> List[SearchResult]: - """Parse MCP search output into SearchResults.""" - import json - results = [] - try: - data = json.loads(text) - items = data if isinstance(data, list) else data.get("results", data.get("jobs", [])) - for item in items: - if isinstance(item, dict): - title = item.get("title") or item.get("name") or item.get("headline", "") - url = item.get("url") or item.get("link", "") - snippet = item.get("description") or item.get("company", "") - results.append(SearchResult( - title=title, - url=url, - snippet=snippet[:200] if snippet else "", - )) - except (json.JSONDecodeError, KeyError): - # Try line-by-line parsing - pass - return results - - def _extract_title(self, text: str) -> str: - """Extract a title from MCP output.""" - for line in text.split("\n"): - line = line.strip() - if line and not line.startswith(("{", "[", "#", "http")): - return line[:80] - return "" diff --git a/agent_reach/channels/reddit.py b/agent_reach/channels/reddit.py index 30bbfbf..b8e9c42 100644 --- a/agent_reach/channels/reddit.py +++ b/agent_reach/channels/reddit.py @@ -1,178 +1,26 @@ # -*- coding: utf-8 -*- -"""Reddit — via Reddit JSON API + optional proxy. - -Backend: Reddit public JSON API (append .json to any URL) -Swap to: any Reddit access method -""" +"""Reddit — check if proxy and credentials are configured.""" import os -import requests -from urllib.parse import urlparse -from .base import Channel, ReadResult +from .base import Channel class RedditChannel(Channel): name = "reddit" description = "Reddit 帖子和评论" - backends = ["Reddit JSON API"] - tier = 2 - - USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" + backends = ["JSON API", "Exa"] + tier = 1 def can_handle(self, url: str) -> bool: - domain = urlparse(url).netloc.lower() - return "reddit.com" in domain or "redd.it" in domain + from urllib.parse import urlparse + d = urlparse(url).netloc.lower() + return "reddit.com" in d or "redd.it" in d def check(self, config=None): - proxy = config.get("reddit_proxy") if config else None - has_bot = bool(os.environ.get("REDDIT_CLIENT_ID")) - if proxy and has_bot: - return "ok", "完整可用(代理 + OAuth Bot)" - elif proxy: - return "ok", "代理已配置,可读取帖子。配置 REDDIT_CLIENT_ID/SECRET 可解锁高级搜索和发帖" - elif has_bot: - return "warn", "OAuth Bot 已配置,但服务器直连可能被封。配个代理更稳定:agent-reach configure proxy URL" - else: - return "off", "搜索用 Exa 免费可用。读帖子需配个代理:agent-reach configure proxy URL" - - async def read(self, url: str, config=None) -> ReadResult: - proxy = config.get("reddit_proxy") if config else None - proxies = {"http": proxy, "https": proxy} if proxy else None - - # Clean URL: remove query params, trailing slash, then add .json - parsed = urlparse(url) - clean_path = parsed.path.rstrip("/") - # Remove trailing .json if already present (avoid double .json) - if clean_path.endswith(".json"): - clean_path = clean_path[:-5] - json_url = f"https://www.reddit.com{clean_path}.json" - - try: - resp = requests.get( - json_url, - headers={"User-Agent": self.USER_AGENT}, - proxies=proxies, - params={"limit": 50}, - timeout=15, - ) - resp.raise_for_status() - except requests.exceptions.HTTPError as e: - status = e.response.status_code if e.response is not None else 0 - if status in (403, 429): - return ReadResult( - title="Reddit", - content="⚠️ Reddit blocked this request (403 Forbidden). " - "Reddit blocks most server IPs.\n" - "Fix: agent-reach configure proxy http://user:pass@ip:port\n" - "Cheap option: https://www.webshare.io ($1/month)\n\n" - "Alternatively, search Reddit via Exa (free, no proxy needed): " - "agent-reach search-reddit \"your query\"", - url=url, - platform="reddit", - ) - raise - - data = resp.json() - - # Subreddit listing page: /r/sub/, /r/sub/hot, /r/sub/new, /r/sub/top - if isinstance(data, dict) and data.get("kind") == "Listing": - return self._parse_listing(data, url) - - if isinstance(data, list) and len(data) >= 1: - # Post page: [post_listing, comments_listing] - post = data[0]["data"]["children"][0]["data"] - title = post.get("title", "") - author = post.get("author", "") - selftext = post.get("selftext", "") - score = post.get("score", 0) - subreddit = post.get("subreddit", "") - - # Extract comments - comments_text = "" - if len(data) >= 2: - comments_text = self._extract_comments(data[1]) - - content = selftext - if comments_text: - content += f"\n\n---\n## Comments\n{comments_text}" - - return ReadResult( - title=title, - content=content, - url=url, - author=f"u/{author}", - platform="reddit", - extra={"subreddit": subreddit, "score": score}, - ) - - raise ValueError(f"Could not parse Reddit response for: {url}") - - def _parse_listing(self, data: dict, url: str) -> ReadResult: - """Parse a subreddit listing (hot/new/top/rising).""" - children = data.get("data", {}).get("children", []) - - # Extract subreddit name and sort from URL - parsed = urlparse(url) - path_parts = [p for p in parsed.path.strip("/").split("/") if p] - subreddit = path_parts[1] if len(path_parts) >= 2 else "reddit" - sort_type = path_parts[2] if len(path_parts) >= 3 else "hot" - - lines = [] - for i, child in enumerate(children, 1): - if child.get("kind") != "t3": - continue - post = child.get("data", {}) - title = post.get("title", "") - author = post.get("author", "") - score = post.get("score", 0) - num_comments = post.get("num_comments", 0) - permalink = post.get("permalink", "") - post_url = post.get("url", "") - is_self = post.get("is_self", False) - - lines.append(f"### {i}. {title}") - lines.append(f"👤 u/{author} · ⬆ {score} · 💬 {num_comments}") - if not is_self and post_url: - lines.append(f"🔗 {post_url}") - lines.append(f"📎 https://www.reddit.com{permalink}") - # Add selftext preview (first 200 chars) - selftext = post.get("selftext", "") - if selftext: - preview = selftext[:200].replace("\n", " ") - if len(selftext) > 200: - preview += "..." - lines.append(f"> {preview}") - lines.append("") - - content = "\n".join(lines) if lines else "No posts found." - return ReadResult( - title=f"r/{subreddit} — {sort_type}", - content=content, - url=url, - platform="reddit", - extra={"subreddit": subreddit, "sort": sort_type, "count": len(children)}, + proxy = (config.get("reddit_proxy") if config else None) or os.environ.get("REDDIT_PROXY") + if proxy: + return "ok", "代理已配置,可读取帖子。搜索走 Exa" + return "warn", ( + "无代理。服务器 IP 可能被 Reddit 封锁。配置代理:\n" + " agent-reach configure proxy http://user:pass@ip:port" ) - - def _extract_comments(self, comments_data: dict, depth: int = 0, max_depth: int = 3) -> str: - """Recursively extract comments.""" - lines = [] - children = comments_data.get("data", {}).get("children", []) - - for child in children: - if child.get("kind") != "t1": - continue - data = child.get("data", {}) - author = data.get("author", "[deleted]") - body = data.get("body", "") - score = data.get("score", 0) - indent = " " * depth - - lines.append(f"{indent}**u/{author}** ({score} points):") - lines.append(f"{indent}{body}") - lines.append("") - - # Recurse into replies - if depth < max_depth and data.get("replies") and isinstance(data["replies"], dict): - lines.append(self._extract_comments(data["replies"], depth + 1, max_depth)) - - return "\n".join(lines) diff --git a/agent_reach/channels/rss.py b/agent_reach/channels/rss.py index 6b8646e..a61d9d9 100644 --- a/agent_reach/channels/rss.py +++ b/agent_reach/channels/rss.py @@ -1,13 +1,7 @@ # -*- coding: utf-8 -*- -"""RSS feeds — via feedparser (free, pip dependency). +"""RSS — check if feedparser is available.""" -Backend: feedparser (https://github.com/kurtmckee/feedparser) -Swap to: any RSS parser -""" - -import feedparser -from urllib.parse import urlparse -from .base import Channel, ReadResult +from .base import Channel class RSSChannel(Channel): @@ -17,41 +11,11 @@ class RSSChannel(Channel): tier = 0 def can_handle(self, url: str) -> bool: - lower = url.lower() - domain = urlparse(url).netloc.lower() - return (lower.endswith(".xml") or "/rss" in lower or "/feed" in lower - or "/atom" in lower or "rss" in domain) + return any(x in url.lower() for x in ["/feed", "/rss", ".xml", "atom"]) - async def read(self, url: str, config=None) -> ReadResult: - feed = feedparser.parse(url) - - if feed.bozo and not feed.entries: - raise ValueError(f"Failed to parse RSS feed: {url}") - - if not feed.entries: - raise ValueError(f"No entries in RSS feed: {url}") - - # Return latest entry - entry = feed.entries[0] - content = entry.get("summary", "") or entry.get("description", "") - - # If multiple entries, summarize all - if len(feed.entries) > 1: - lines = [f"# {feed.feed.get('title', 'RSS Feed')}\n"] - for i, e in enumerate(feed.entries[:20], 1): - title = e.get("title", "Untitled") - link = e.get("link", "") - summary = e.get("summary", "")[:200] - lines.append(f"## {i}. {title}") - lines.append(f"🔗 {link}") - if summary: - lines.append(summary) - lines.append("") - content = "\n".join(lines) - - return ReadResult( - title=feed.feed.get("title", entry.get("title", url)), - content=content, - url=url, - platform="rss", - ) + def check(self, config=None): + try: + import feedparser + return "ok", "可读取 RSS/Atom 源" + except ImportError: + return "off", "feedparser 未安装。安装:pip install feedparser" diff --git a/agent_reach/channels/twitter.py b/agent_reach/channels/twitter.py index 77edc51..cfe605b 100644 --- a/agent_reach/channels/twitter.py +++ b/agent_reach/channels/twitter.py @@ -1,286 +1,38 @@ # -*- coding: utf-8 -*- -"""Twitter/X — via bird CLI (free) or Jina Reader fallback. - -Backend: bird (@steipete/bird npm package) for search/timeline - Jina Reader for single tweets -Swap to: any Twitter access tool -""" +"""Twitter/X — check if bird CLI is available.""" import shutil import subprocess -from urllib.parse import urlparse -from .base import Channel, ReadResult, SearchResult -from typing import List -import requests - - -def _bird_cmd(): - """Find bird CLI binary.""" - return shutil.which("bird") or shutil.which("birdx") - - -def _bird_env(config=None): - """Build env dict with Twitter cookies and proxy support for bird CLI. - - Node.js native fetch() doesn't respect HTTP_PROXY/HTTPS_PROXY. - We inject undici's EnvHttpProxyAgent via NODE_OPTIONS so bird - automatically routes through the user's proxy. - """ - import os - import tempfile - env = os.environ.copy() - if config: - auth_token = config.get("twitter_auth_token") - ct0 = config.get("twitter_ct0") - if auth_token: - env["AUTH_TOKEN"] = auth_token - if ct0: - env["CT0"] = ct0 - - # Auto-inject undici proxy support if HTTP_PROXY/HTTPS_PROXY is set - has_proxy = env.get("HTTPS_PROXY") or env.get("HTTP_PROXY") or env.get("https_proxy") or env.get("http_proxy") - if has_proxy: - bootstrap = _get_proxy_bootstrap_path() - if bootstrap: - npm_root = subprocess.run( - ["npm", "root", "-g"], - capture_output=True, text=True, timeout=5, - ).stdout.strip() - existing_opts = env.get("NODE_OPTIONS", "") - env["NODE_OPTIONS"] = f"--require {bootstrap} {existing_opts}".strip() - env["NODE_PATH"] = npm_root - - return env - - -def _get_proxy_bootstrap_path(): - """Create/return a bootstrap JS file that sets up undici proxy for fetch.""" - import os - import tempfile - bootstrap_path = os.path.join(tempfile.gettempdir(), "agent-reach-undici-proxy.js") - if not os.path.exists(bootstrap_path): - # Check if undici is available - npm_root = subprocess.run( - ["npm", "root", "-g"], - capture_output=True, text=True, timeout=5, - ).stdout.strip() - undici_path = os.path.join(npm_root, "undici", "index.js") - if not os.path.exists(undici_path): - return None - with open(bootstrap_path, "w") as f: - f.write( - "try {\n" - " const { EnvHttpProxyAgent, setGlobalDispatcher } = require('undici');\n" - " if (process.env.HTTPS_PROXY || process.env.HTTP_PROXY) {\n" - " setGlobalDispatcher(new EnvHttpProxyAgent());\n" - " }\n" - "} catch(e) {}\n" - ) - return bootstrap_path +from .base import Channel class TwitterChannel(Channel): name = "twitter" description = "Twitter/X 推文" - backends = ["bird", "Jina Reader"] - tier = 0 # Single tweet reading is zero-config + backends = ["bird CLI"] + tier = 1 def can_handle(self, url: str) -> bool: - domain = urlparse(url).netloc.lower() - return "x.com" in domain or "twitter.com" in domain + from urllib.parse import urlparse + d = urlparse(url).netloc.lower() + return "x.com" in d or "twitter.com" in d def check(self, config=None): - # Basic reading always works (Jina fallback) - bird = _bird_cmd() - if bird: - # Actually test bird connectivity - try: - result = subprocess.run( - [bird, "whoami"], - capture_output=True, timeout=15, - encoding='utf-8', errors='replace', - env=_bird_env(config), - ) - if result.returncode == 0 and "fetch failed" not in result.stdout.lower() and "fetch failed" not in result.stderr.lower(): - return "ok", "搜索、时间线、发推全部可用" - else: - error_hint = (result.stderr or result.stdout).strip()[:100] - if "fetch failed" in (error_hint + result.stdout).lower(): - return "warn", ( - f"bird 已安装但连接失败(fetch failed)。可能原因:\n" - " 1. Cookie 无效或过期 → 重新导出 Cookie\n" - " 2. 需要代理但 Node.js fetch 不走系统代理 → 使用全局/透明代理(如 Clash TUN 模式、Proxifier)\n" - " 3. 网络无法直连 x.com\n" - " 搜索功能暂不可用,将使用 Exa 搜索作为替代" - ) - return "warn", f"bird 连接异常:{error_hint}。搜索将使用 Exa 替代" - except (subprocess.TimeoutExpired, FileNotFoundError): - return "warn", "bird 已安装但连接超时。搜索将使用 Exa 替代" - return "ok", "可读取推文。安装 bird + 配置 Cookie 可解锁搜索和发推" - - async def read(self, url: str, config=None) -> ReadResult: - # Try bird first - bird = _bird_cmd() - if bird: - return await self._read_bird(url, bird, config) - # Fallback: Jina Reader - return await self._read_jina(url) - - async def _read_bird(self, url: str, bird: str, config=None) -> ReadResult: - result = subprocess.run( - [bird, "read", url], - capture_output=True, timeout=30, - encoding='utf-8', errors='replace', - env=_bird_env(config), - ) - if result.returncode != 0: - return await self._read_jina(url) - - text = result.stdout.strip() - # Extract author from first line - author = "" - lines = text.split("\n") - if lines and lines[0].startswith("@"): - author = lines[0].split()[0] - - return ReadResult( - title=text[:100], - content=text, - url=url, - author=author, - platform="twitter", - ) - - async def _read_jina(self, url: str) -> ReadResult: - try: - resp = requests.get( - f"https://r.jina.ai/{url}", - headers={"Accept": "text/markdown"}, - timeout=15, + bird = shutil.which("bird") or shutil.which("birdx") + if not bird: + return "warn", ( + "bird CLI 未安装。搜索可通过 Exa 替代。安装:\n" + " npm install -g @steipete/bird" ) - resp.raise_for_status() - text = resp.text - - # Detect unusable Jina responses for X/Twitter (JS-required pages) - unusable_indicators = [ - "page doesn", # "this page doesn't exist" (handles both ' and ') - "miss what", # "Don't miss what's happening" - "Something went wrong. Try reloading", - "Log in](", # Markdown link: [Log in](...) - ] - if any(indicator in text for indicator in unusable_indicators): - return ReadResult( - title="Twitter/X", - content="⚠️ Could not read this tweet.\n" - "The tweet may have been deleted, or the account is private.\n\n" - "Tips:\n" - "- Make sure the URL is correct\n" - "- Try: bird read (if bird CLI is installed)\n" - "- For protected tweets, configure Twitter cookies: " - "agent-reach configure twitter-cookies AUTH_TOKEN CT0", - url=url, - platform="twitter", - ) - - title = text[:100] if text else url - return ReadResult( - title=title, - content=text, - url=url, - platform="twitter", + try: + r = subprocess.run( + [bird, "whoami"], capture_output=True, text=True, timeout=10 + ) + if r.returncode == 0: + return "ok", "完整可用(读取、搜索推文)" + return "warn", ( + "bird CLI 已安装但未配置 Cookie。运行:\n" + " agent-reach configure twitter-cookies \"auth_token=xxx; ct0=yyy\"" ) except Exception: - return ReadResult( - title="Twitter/X", - content="⚠️ Could not read this tweet.\n" - "The tweet may have been deleted, or the account is private.\n\n" - "Tips:\n" - "- Make sure the URL is correct\n" - "- Try: bird read (if bird CLI is installed)\n" - "- For protected tweets, configure Twitter cookies: " - "agent-reach configure twitter-cookies AUTH_TOKEN CT0", - url=url, - platform="twitter", - ) - - async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - limit = kwargs.get("limit", 10) - - bird = _bird_cmd() - if bird: - return await self._search_bird(query, limit, bird, config) - - # Fallback to Exa - return await self._search_exa(query, limit, config) - - async def _search_bird(self, query: str, limit: int, bird: str, config=None) -> List[SearchResult]: - try: - result = subprocess.run( - [bird, "search", query, "-n", str(limit)], - capture_output=True, timeout=30, - encoding='utf-8', errors='replace', - env=_bird_env(config), - ) - if result.returncode != 0: - stderr = (result.stderr or "").strip() - if "fetch failed" in stderr.lower() or "fetch failed" in (result.stdout or "").lower(): - # bird can't connect — fall back to Exa silently - return await self._search_exa(query, limit, config) - return await self._search_exa(query, limit, config) - - parsed = self._parse_bird_output(result.stdout) - if not parsed: - # bird returned nothing — try Exa - return await self._search_exa(query, limit, config) - return parsed - except (subprocess.TimeoutExpired, FileNotFoundError): - return await self._search_exa(query, limit, config) - - def _parse_bird_output(self, text: str) -> List[SearchResult]: - """Parse bird text output into SearchResults.""" - results = [] - current = {} - text_lines = [] - - for line in text.strip().split("\n"): - line = line.strip() - if line.startswith("─"): - if current: - current["text"] = "\n".join(text_lines).strip() - results.append(SearchResult( - title=current.get("text", "")[:80], - url=current.get("url", ""), - snippet=current.get("text", ""), - author=current.get("author", ""), - date=current.get("date", ""), - )) - current = {} - text_lines = [] - continue - if line.startswith("@") and line.endswith(":") and "(" in line: - current["author"] = line.split()[0] - continue - if line.startswith("date:"): - current["date"] = line[5:].strip() - continue - if line.startswith("url:"): - current["url"] = line[4:].strip() - continue - if current is not None: - text_lines.append(line) - - if current and text_lines: - current["text"] = "\n".join(text_lines).strip() - results.append(SearchResult( - title=current.get("text", "")[:80], - url=current.get("url", ""), - snippet=current.get("text", ""), - author=current.get("author", ""), - date=current.get("date", ""), - )) - return results - - async def _search_exa(self, query: str, limit: int, config=None) -> List[SearchResult]: - from agent_reach.channels.exa_search import ExaSearchChannel - exa = ExaSearchChannel() - return await exa.search(f"site:x.com {query}", config=config, limit=limit) + return "warn", "bird CLI 已安装但连接失败" diff --git a/agent_reach/channels/web.py b/agent_reach/channels/web.py index b2d7ae9..bbe33af 100644 --- a/agent_reach/channels/web.py +++ b/agent_reach/channels/web.py @@ -1,49 +1,17 @@ # -*- coding: utf-8 -*- -"""Web pages — via Jina Reader API (free, no config needed). +"""Web — any URL via Jina Reader. Always available.""" -Backend: Jina Reader (https://r.jina.ai) -Swap to: Firecrawl, Trafilatura, or any other reader API -""" - -import requests -from .base import Channel, ReadResult +from .base import Channel class WebChannel(Channel): name = "web" - description = "网页(任意 URL)" - backends = ["Jina Reader API"] + description = "任意网页" + backends = ["Jina Reader"] tier = 0 - JINA_URL = "https://r.jina.ai/" - def can_handle(self, url: str) -> bool: - # Fallback — handles any URL not matched by other channels - return True + return True # Fallback — handles any URL - async def read(self, url: str, config=None) -> ReadResult: - resp = requests.get( - f"{self.JINA_URL}{url}", - headers={"Accept": "text/markdown"}, - timeout=15, - ) - resp.raise_for_status() - text = resp.text - - # Extract title from first markdown heading - title = url - for line in text.split("\n"): - line = line.strip() - if line.startswith("# "): - title = line[2:].strip() - break - if line.startswith("Title:"): - title = line[6:].strip() - break - - return ReadResult( - title=title, - content=text, - url=url, - platform="web", - ) + def check(self, config=None): + return "ok", "通过 Jina Reader 读取任意网页(curl https://r.jina.ai/URL)" diff --git a/agent_reach/channels/xiaohongshu.py b/agent_reach/channels/xiaohongshu.py index 84ddfa1..e3dae04 100644 --- a/agent_reach/channels/xiaohongshu.py +++ b/agent_reach/channels/xiaohongshu.py @@ -1,16 +1,9 @@ # -*- coding: utf-8 -*- -"""XiaoHongShu (小红书) — via mcporter + xiaohongshu MCP server. +"""XiaoHongShu — check if mcporter + xiaohongshu MCP is available.""" -Backend: xiaohongshu-mcp server (internal API, reliable) -Requires: mcporter CLI + xiaohongshu MCP server running -""" - -import json import shutil import subprocess -from urllib.parse import urlparse, parse_qs, urlencode -from .base import Channel, ReadResult, SearchResult -from typing import List, Optional +from .base import Channel class XiaoHongShuChannel(Channel): @@ -19,30 +12,8 @@ class XiaoHongShuChannel(Channel): backends = ["xiaohongshu-mcp"] tier = 2 - def _mcporter_ok(self) -> bool: - """Check if mcporter + xiaohongshu MCP is available.""" - if not shutil.which("mcporter"): - return False - try: - r = subprocess.run( - ["mcporter", "list"], capture_output=True, text=True, timeout=10 - ) - return "xiaohongshu" in r.stdout - except Exception: - return False - - def _call(self, expr: str, timeout: int = 30) -> str: - r = subprocess.run( - ["mcporter", "call", expr], - capture_output=True, text=True, timeout=timeout, - ) - if r.returncode != 0: - raise RuntimeError(r.stderr or r.stdout) - return r.stdout - - # ── Channel interface ── - def can_handle(self, url: str) -> bool: + from urllib.parse import urlparse d = urlparse(url).netloc.lower() return "xiaohongshu.com" in d or "xhslink.com" in d @@ -55,190 +26,25 @@ class XiaoHongShuChannel(Channel): " 3. mcporter config add xiaohongshu http://localhost:18060/mcp\n" " 详见 https://github.com/xpzouying/xiaohongshu-mcp" ) - if not self._mcporter_ok(): - return "off", ( - "mcporter 已装但小红书 MCP 未配置。运行:\n" - " docker run -d --name xiaohongshu-mcp -p 18060:18060 xpzouying/xiaohongshu-mcp\n" - " mcporter config add xiaohongshu http://localhost:18060/mcp" - ) try: - out = self._call("xiaohongshu.check_login_status()", timeout=10) - if "已登录" in out or "logged" in out.lower(): + r = subprocess.run( + ["mcporter", "list"], capture_output=True, text=True, timeout=10 + ) + if "xiaohongshu" not in r.stdout: + return "off", ( + "mcporter 已装但小红书 MCP 未配置。运行:\n" + " docker run -d --name xiaohongshu-mcp -p 18060:18060 xpzouying/xiaohongshu-mcp\n" + " mcporter config add xiaohongshu http://localhost:18060/mcp" + ) + except Exception: + return "off", "mcporter 连接异常" + try: + r = subprocess.run( + ["mcporter", "call", "xiaohongshu.check_login_status()"], + capture_output=True, text=True, timeout=10 + ) + if "已登录" in r.stdout or "logged" in r.stdout.lower(): return "ok", "完整可用(阅读、搜索、发帖、评论、点赞)" return "warn", "MCP 已连接但未登录,需扫码登录" except Exception: return "warn", "MCP 连接异常,检查 xiaohongshu-mcp 服务是否在运行" - - async def read(self, url: str, config=None) -> ReadResult: - if not self._mcporter_ok(): - return ReadResult( - title="XiaoHongShu", - content=( - "⚠️ 小红书需要 mcporter + xiaohongshu-mcp 才能使用。\n\n" - "安装步骤:\n" - "1. npm install -g mcporter\n" - "2. docker run -d --name xiaohongshu-mcp -p 18060:18060 xpzouying/xiaohongshu-mcp\n" - "3. mcporter config add xiaohongshu http://localhost:18060/mcp\n" - "4. 运行 agent-reach doctor 检查状态\n\n" - "详见 https://github.com/xpzouying/xiaohongshu-mcp" - ), - url=url, platform="xiaohongshu", - ) - - note_id = self._extract_note_id(url) - if not note_id: - return ReadResult( - title="XiaoHongShu", - content=f"⚠️ 无法从 URL 提取笔记 ID: {url}", - url=url, platform="xiaohongshu", - ) - - # Step 1: try xsec_token from URL query param (e.g. from search results) - xsec_token = self._extract_token_from_url(url) - - # Step 2: try homepage feeds - if not xsec_token: - xsec_token = self._find_token_in_feeds(note_id) - - # Step 3: search for the note to get a fresh token - if not xsec_token: - xsec_token = self._find_token_by_search(note_id) - - # If no token found, fallback to Jina Reader - if not xsec_token: - return await self._read_jina(url) - - # Get detail via MCP - out = self._call( - f'xiaohongshu.get_feed_detail(feed_id: "{note_id}", xsec_token: "{xsec_token}")', - timeout=15, - ) - - return ReadResult( - title=self._extract_title(out) or f"XHS {note_id}", - content=out.strip(), - url=url, platform="xiaohongshu", - ) - - async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - if not self._mcporter_ok(): - raise ValueError( - "小红书搜索需要 mcporter + xiaohongshu-mcp。\n" - "安装: npm install -g mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp" - ) - limit = kwargs.get("limit", 10) - safe_q = query.replace('"', '\\"') - out = self._call(f'xiaohongshu.search_feeds(keyword: "{safe_q}")', timeout=30) - - results = [] - try: - data = json.loads(out) - for item in data.get("feeds", [])[:limit]: - card = item.get("noteCard", {}) - user = card.get("user", {}) - interact = card.get("interactInfo", {}) - note_id = item.get("id", "") - xsec_token = item.get("xsecToken", "") - note_url = f"https://www.xiaohongshu.com/explore/{note_id}" - if xsec_token: - note_url += f"?xsec_token={xsec_token}" - results.append(SearchResult( - title=card.get("displayTitle", ""), - url=note_url, - snippet=f"👤 {user.get('nickname', '')} · ❤ {interact.get('likedCount', '0')}", - score=0, - )) - except (json.JSONDecodeError, KeyError): - pass - return results - - # ── Helpers ── - - def _extract_note_id(self, url: str) -> str: - """Extract note ID from URL path, ignoring query params.""" - path = urlparse(url).path.strip("/").split("/") - return path[-1] if path else "" - - def _extract_token_from_url(self, url: str) -> Optional[str]: - """Extract xsec_token from URL query parameter if present.""" - qs = parse_qs(urlparse(url).query) - tokens = qs.get("xsec_token", []) - return tokens[0] if tokens else None - - def _find_token_in_feeds(self, note_id: str) -> Optional[str]: - """Try to find xsec_token for a note from homepage feeds.""" - try: - out = self._call("xiaohongshu.list_feeds()", timeout=15) - data = json.loads(out) - for feed in data.get("feeds", []): - if feed.get("id") == note_id: - return feed.get("xsecToken") or None - except Exception: - pass - return None - - def _find_token_by_search(self, note_id: str) -> Optional[str]: - """Search for the note ID to get a fresh xsec_token.""" - try: - out = self._call( - f'xiaohongshu.search_feeds(keyword: "{note_id}")', timeout=20 - ) - data = json.loads(out) - for feed in data.get("feeds", []): - if feed.get("id") == note_id: - return feed.get("xsecToken") or None - # If exact match not found but results exist, try the first one - # (search by note_id sometimes returns the note with a different key) - except Exception: - pass - return None - - def _extract_title(self, text: str) -> str: - for line in text.split("\n"): - line = line.strip() - if line and not line.startswith(("{", "[", "#", "http")): - return line[:80] - return "" - - async def _read_jina(self, url: str) -> ReadResult: - """Fallback: read XHS note via Jina Reader when xsec_token unavailable.""" - import requests - try: - resp = requests.get( - f"https://r.jina.ai/{url}", - headers={"Accept": "text/markdown"}, - timeout=15, - ) - resp.raise_for_status() - text = resp.text - if len(text.strip()) < 50 or "登录" in text[:200]: - return ReadResult( - title="XiaoHongShu", - content=( - f"⚠️ 无法获取笔记详情: {url}\n\n" - "小红书需要 xsec_token 才能通过 MCP 读取笔记。\n" - "请尝试先搜索相关关键词,再从结果中读取。" - ), - url=url, platform="xiaohongshu", - ) - title = "" - for line in text.split("\n"): - line = line.strip() - if line and not line.startswith(("#", "http", "![", "[")): - title = line[:80] - break - return ReadResult( - title=title or "XiaoHongShu", - content=text.strip(), - url=url, platform="xiaohongshu", - ) - except Exception: - return ReadResult( - title="XiaoHongShu", - content=( - f"⚠️ 无法获取笔记详情: {url}\n\n" - "小红书需要 xsec_token 才能通过 MCP 读取笔记。\n" - "请尝试先搜索相关关键词,再从结果中读取。" - ), - url=url, platform="xiaohongshu", - ) diff --git a/agent_reach/channels/youtube.py b/agent_reach/channels/youtube.py index a544f8c..aaae963 100644 --- a/agent_reach/channels/youtube.py +++ b/agent_reach/channels/youtube.py @@ -1,125 +1,22 @@ # -*- coding: utf-8 -*- -"""YouTube — via yt-dlp (video info, subtitles, and search). +"""YouTube — check if yt-dlp is available.""" -Backend: yt-dlp (https://github.com/yt-dlp/yt-dlp) -Supports: read (info + subtitles), search (ytsearch) -""" - -import json import shutil -import subprocess -import tempfile -from pathlib import Path -from urllib.parse import urlparse -from .base import Channel, ReadResult, SearchResult -from typing import List +from .base import Channel class YouTubeChannel(Channel): name = "youtube" - description = "YouTube 视频字幕" + description = "YouTube 视频和字幕" backends = ["yt-dlp"] - requires_tools = ["yt-dlp"] tier = 0 def can_handle(self, url: str) -> bool: + from urllib.parse import urlparse d = urlparse(url).netloc.lower() return "youtube.com" in d or "youtu.be" in d - async def read(self, url: str, config=None) -> ReadResult: - if not shutil.which("yt-dlp"): - raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp") - - with tempfile.TemporaryDirectory() as tmpdir: - info = self._get_info(url) - title = info.get("title", url) - author = info.get("uploader", "") - - transcript = self._get_subtitles(url, tmpdir) - if not transcript: - transcript = f"[Video: {title}]\n[No subtitles available.]" - - return ReadResult( - title=title, content=transcript, url=url, - author=author, platform="youtube", - extra={ - "duration": info.get("duration_string"), - "view_count": info.get("view_count"), - "upload_date": info.get("upload_date"), - }, - ) - - async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: - """Search YouTube via yt-dlp's ytsearch.""" - if not shutil.which("yt-dlp"): - raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp") - - limit = kwargs.get("limit", 10) - - try: - r = subprocess.run( - ["yt-dlp", "--dump-json", "--flat-playlist", - f"ytsearch{limit}:{query}"], - capture_output=True, text=True, timeout=30, - ) - results = [] - for line in r.stdout.strip().split("\n"): - if not line.strip(): - continue - try: - d = json.loads(line) - vid = d.get("id", "") - results.append(SearchResult( - title=d.get("title", ""), - url=f"https://youtube.com/watch?v={vid}" if vid else "", - snippet=( - f"👤 {d.get('channel', '?')} · " - f"⏱ {d.get('duration_string', '?')} · " - f"👁 {d.get('view_count', '?')}" - ), - extra={ - "channel": d.get("channel"), - "duration": d.get("duration_string"), - "view_count": d.get("view_count"), - }, - )) - except json.JSONDecodeError: - continue - return results - except subprocess.TimeoutExpired: - return [] - - def _get_info(self, url: str) -> dict: - try: - r = subprocess.run( - ["yt-dlp", "--dump-json", "--no-download", url], - capture_output=True, text=True, timeout=30, - ) - if r.returncode == 0: - return json.loads(r.stdout) - except (subprocess.TimeoutExpired, json.JSONDecodeError): - pass - return {} - - def _get_subtitles(self, url: str, tmpdir: str) -> str: - try: - subprocess.run( - ["yt-dlp", "--write-auto-sub", "--write-sub", - "--sub-lang", "en,zh-Hans,zh", - "--skip-download", "--sub-format", "vtt", - "-o", f"{tmpdir}/%(id)s.%(ext)s", url], - capture_output=True, text=True, timeout=30, - ) - for f in Path(tmpdir).glob("*.vtt"): - text = f.read_text(errors="replace") - lines = [] - for line in text.split("\n"): - line = line.strip() - if not line or line.startswith("WEBVTT") or "-->" in line or line.isdigit(): - continue - if line not in lines[-1:]: - lines.append(line) - return "\n".join(lines) - except subprocess.TimeoutExpired: - pass - return "" + def check(self, config=None): + if shutil.which("yt-dlp"): + return "ok", "可提取视频信息和字幕" + return "off", "yt-dlp 未安装。安装:pip install yt-dlp" diff --git a/agent_reach/cli.py b/agent_reach/cli.py index 8e1f36e..f5f3e7e 100644 --- a/agent_reach/cli.py +++ b/agent_reach/cli.py @@ -1,20 +1,15 @@ # -*- coding: utf-8 -*- """ -Agent Reach CLI — command-line interface. +Agent Reach CLI — installer, doctor, and configuration tool. Usage: - agent-reach read - agent-reach search - agent-reach search-reddit [--sub ] - agent-reach search-github [--lang ] - agent-reach search-twitter - agent-reach setup + agent-reach install --env=auto agent-reach doctor - agent-reach version + agent-reach configure twitter-cookies "auth_token=xxx; ct0=yyy" + agent-reach setup """ import sys -import asyncio import argparse import json import os @@ -48,57 +43,6 @@ def main(): sub = parser.add_subparsers(dest="command", help="Available commands") # ── read ── - p_read = sub.add_parser("read", help="Read content from a URL") - p_read.add_argument("url", help="URL to read") - p_read.add_argument("--json", dest="as_json", action="store_true", help="Output as JSON") - - # ── search ── - p_search = sub.add_parser("search", help="Search the web (Exa)") - p_search.add_argument("query", nargs="+", help="Search query") - p_search.add_argument("-n", "--num", type=int, default=5, help="Number of results") - - # ── search-reddit ── - p_sr = sub.add_parser("search-reddit", help="Search Reddit") - p_sr.add_argument("query", nargs="+", help="Search query") - p_sr.add_argument("--sub", help="Subreddit filter") - p_sr.add_argument("-n", "--num", type=int, default=10, help="Number of results") - - # ── search-github ── - p_sg = sub.add_parser("search-github", help="Search GitHub") - p_sg.add_argument("query", nargs="+", help="Search query") - p_sg.add_argument("--lang", help="Language filter") - p_sg.add_argument("-n", "--num", type=int, default=5, help="Number of results") - - # ── search-twitter ── - p_st = sub.add_parser("search-twitter", help="Search Twitter") - p_st.add_argument("query", nargs="+", help="Search query") - p_st.add_argument("-n", "--num", type=int, default=10, help="Number of results") - - # ── search-youtube ── - p_sy = sub.add_parser("search-youtube", help="Search YouTube") - p_sy.add_argument("query", nargs="+", help="Search query") - p_sy.add_argument("-n", "--num", type=int, default=5, help="Number of results") - - # ── search-bilibili ── - p_sb = sub.add_parser("search-bilibili", help="Search Bilibili") - p_sb.add_argument("query", nargs="+", help="Search query") - p_sb.add_argument("-n", "--num", type=int, default=5, help="Number of results") - - # ── search-xhs ── - p_sx = sub.add_parser("search-xhs", help="Search XiaoHongShu") - p_sx.add_argument("query", nargs="+", help="Search query") - p_sx.add_argument("-n", "--num", type=int, default=10, help="Number of results") - - # ── search-linkedin ── - p_sl = sub.add_parser("search-linkedin", help="Search LinkedIn") - p_sl.add_argument("query", nargs="+", help="Search query") - p_sl.add_argument("-n", "--num", type=int, default=10, help="Number of results") - - # ── search-bosszhipin ── - p_sbz = sub.add_parser("search-bosszhipin", help="Search Boss直聘") - p_sbz.add_argument("query", nargs="+", help="Search query") - p_sbz.add_argument("-n", "--num", type=int, default=10, help="Number of results") - # ── setup ── sub.add_parser("setup", help="Interactive configuration wizard") @@ -161,10 +105,6 @@ def main(): _cmd_install(args) elif args.command == "configure": _cmd_configure(args) - elif args.command == "read": - asyncio.run(_cmd_read(args)) - elif args.command.startswith("search"): - asyncio.run(_cmd_search(args)) # ── Command handlers ──────────────────────────────── @@ -849,98 +789,6 @@ def _cmd_setup(): print() -async def _cmd_read(args): - from agent_reach.core import AgentReach - eyes = AgentReach() - try: - result = await eyes.read(args.url) - if args.as_json: - print(json.dumps(result, ensure_ascii=False, indent=2)) - else: - print(f"\n📖 {result.get('title', 'Untitled')}") - print(f"🔗 {result.get('url', '')}") - if result.get("author"): - print(f"👤 {result['author']}") - print(f"\n{result.get('content', '')}") - except Exception as e: - error_str = str(e) - if "400" in error_str and "Bad Request" in error_str: - print(f"❌ Invalid URL: {args.url}", file=sys.stderr) - print(" Please provide a valid URL (e.g., https://example.com)", file=sys.stderr) - elif "ConnectionError" in type(e).__name__ or "Timeout" in type(e).__name__: - print(f"❌ Could not connect to: {args.url}", file=sys.stderr) - print(" Check your internet connection or the URL.", file=sys.stderr) - else: - print(f"❌ Error: {e}", file=sys.stderr) - sys.exit(1) - - -async def _cmd_search(args): - from agent_reach.core import AgentReach - eyes = AgentReach() - query = " ".join(args.query).strip() - num = args.num - - if not query: - print("Please provide a search query.", file=sys.stderr) - sys.exit(1) - - try: - if args.command == "search": - results = await eyes.search(query, num_results=num) - elif args.command == "search-reddit": - results = await eyes.search_reddit(query, subreddit=getattr(args, "sub", None), limit=num) - elif args.command == "search-github": - results = await eyes.search_github(query, language=getattr(args, "lang", None), limit=num) - elif args.command == "search-twitter": - results = await eyes.search_twitter(query, limit=num) - elif args.command == "search-youtube": - results = await eyes.search_youtube(query, limit=num) - elif args.command == "search-bilibili": - results = await eyes.search_bilibili(query, limit=num) - elif args.command == "search-xhs": - results = await eyes.search_xhs(query, limit=num) - elif args.command == "search-linkedin": - results = await eyes.search_linkedin(query, limit=num) - elif args.command == "search-bosszhipin": - results = await eyes.search_bosszhipin(query, limit=num) - else: - print(f"Unknown command: {args.command}", file=sys.stderr) - sys.exit(1) - except Exception as e: - error_str = str(e) - if "401" in error_str or "Unauthorized" in error_str: - print("⚠️ Exa API key not configured or invalid.") - print("Get a free key at https://exa.ai (1000 searches/month free)") - print("Then run: agent-reach configure exa-key YOUR_KEY") - sys.exit(1) - elif "exa" in error_str.lower() or "api_key" in error_str.lower(): - print("⚠️ Exa API key not configured.") - print("Get a free key at https://exa.ai") - print("Then run: agent-reach configure exa-key YOUR_KEY") - sys.exit(1) - else: - print(f"❌ Error: {e}", file=sys.stderr) - sys.exit(1) - - if not results: - print("No results found.") - return - - for i, r in enumerate(results, 1): - title = r.get("title") or r.get("name") or r.get("text", "")[:60] - url = r.get("url", "") - snippet = r.get("snippet") or r.get("description") or r.get("text", "") - print(f"\n{i}. {title}") - print(f" 🔗 {url}") - if snippet: - print(f" {snippet[:200]}") - # Extra info for GitHub - extra = r.get("extra", {}) - if extra.get("stars"): - print(f" ⭐ {extra['stars']} 🍴 {extra.get('forks', 0)} 📝 {extra.get('language', '')}") - - def _cmd_check_update(): """Check for newer versions on GitHub.""" import requests diff --git a/agent_reach/core.py b/agent_reach/core.py index c5e8a3c..fd8caae 100644 --- a/agent_reach/core.py +++ b/agent_reach/core.py @@ -1,120 +1,36 @@ # -*- coding: utf-8 -*- """ -AgentReach — the unified entry point. +AgentReach — installer, doctor, and configuration tool. -Pure glue: routes URLs to the right channel, routes searches to the right engine. -Every channel is a thin wrapper around an external tool. Swap any backend anytime. +Agent Reach helps AI agents install and configure upstream platform tools +(bird CLI, yt-dlp, mcporter, gh CLI, etc.). After installation, agents +call the upstream tools directly — no wrapper layer needed. Usage: - from agent_reach import AgentReach + from agent_reach.doctor import check_all, format_report + from agent_reach.config import Config - eyes = AgentReach() - content = await eyes.read("https://github.com/openai/gpt-4") - results = await eyes.search("AI agent framework") + config = Config() + results = check_all(config) + print(format_report(results)) """ -import asyncio -from typing import Any, Dict, List, Optional +from typing import Dict, Optional from agent_reach.config import Config -from agent_reach.channels import get_channel_for_url, get_channel, get_all_channels class AgentReach: - """Give your AI Agent eyes to see the entire internet.""" + """Give your AI Agent eyes to see the entire internet. + + This class provides health-check functionality. + For reading/searching, use the upstream tools directly + (see SKILL.md for commands). + """ def __init__(self, config: Optional[Config] = None): self.config = config or Config() - # ── Reading ───────────────────────────────────────── - - async def read(self, url: str) -> Dict[str, Any]: - """ - Read content from any URL. Auto-detects platform. - - Supported: Web, GitHub, Reddit, Twitter, YouTube, - Bilibili, RSS, and more. - - Returns: - Dict with title, content, url, author, platform, etc. - """ - if not url.startswith(("http://", "https://")): - url = f"https://{url}" - - channel = get_channel_for_url(url) - result = await channel.read(url, config=self.config) - return result.to_dict() - - async def read_batch(self, urls: List[str]) -> List[Dict[str, Any]]: - """Read multiple URLs concurrently.""" - tasks = [self.read(url) for url in urls] - results = await asyncio.gather(*tasks, return_exceptions=True) - return [r for r in results if not isinstance(r, Exception)] - - def detect_platform(self, url: str) -> str: - """Detect what platform a URL belongs to.""" - channel = get_channel_for_url(url) - return channel.name - - # ── Searching ─────────────────────────────────────── - - async def search(self, query: str, num_results: int = 5) -> List[Dict[str, Any]]: - """Semantic web search via Exa.""" - ch = get_channel("exa_search") - results = await ch.search(query, config=self.config, limit=num_results) - return [r.to_dict() for r in results] - - async def search_reddit(self, query: str, subreddit: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: - """Search Reddit via Exa (bypasses IP blocks).""" - ch = get_channel("exa_search") - q = f"site:reddit.com/r/{subreddit} {query}" if subreddit else f"site:reddit.com {query}" - results = await ch.search(q, config=self.config, limit=limit) - return [r.to_dict() for r in results] - - async def search_github(self, query: str, language: Optional[str] = None, limit: int = 5) -> List[Dict[str, Any]]: - """Search GitHub repositories.""" - ch = get_channel("github") - results = await ch.search(query, config=self.config, language=language, limit=limit) - return [r.to_dict() for r in results] - - async def search_twitter(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: - """Search Twitter. Uses bird CLI if available, else Exa.""" - ch = get_channel("twitter") - results = await ch.search(query, config=self.config, limit=limit) - return [r.to_dict() for r in results] - - async def search_youtube(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: - """Search YouTube via yt-dlp.""" - ch = get_channel("youtube") - results = await ch.search(query, config=self.config, limit=limit) - return [r.to_dict() for r in results] - - async def search_bilibili(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: - """Search Bilibili. Tries yt-dlp first, falls back to Exa.""" - ch = get_channel("bilibili") - results = await ch.search(query, config=self.config, limit=limit) - return [r.to_dict() for r in results] - - async def search_xhs(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: - """Search XiaoHongShu via mcporter.""" - ch = get_channel("xiaohongshu") - results = await ch.search(query, config=self.config, limit=limit) - return [r.to_dict() for r in results] - - async def search_linkedin(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: - """Search LinkedIn via MCP or Exa.""" - ch = get_channel("linkedin") - results = await ch.search(query, config=self.config, limit=limit) - return [r.to_dict() for r in results] - - async def search_bosszhipin(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: - """Search Boss直聘 via MCP or Exa.""" - ch = get_channel("bosszhipin") - results = await ch.search(query, config=self.config, limit=limit) - return [r.to_dict() for r in results] - - # ── Health ────────────────────────────────────────── - def doctor(self) -> Dict[str, dict]: """Check all channel availability.""" from agent_reach.doctor import check_all @@ -124,13 +40,3 @@ class AgentReach: """Get formatted health report.""" from agent_reach.doctor import check_all, format_report return format_report(check_all(self.config)) - - # ── Sync wrappers ─────────────────────────────────── - - def read_sync(self, url: str) -> Dict[str, Any]: - """Synchronous version of read().""" - return asyncio.run(self.read(url)) - - def search_sync(self, query: str, num_results: int = 5) -> List[Dict[str, Any]]: - """Synchronous version of search().""" - return asyncio.run(self.search(query, num_results)) diff --git a/agent_reach/integrations/mcp_server.py b/agent_reach/integrations/mcp_server.py index 13a91bc..636e4ee 100644 --- a/agent_reach/integrations/mcp_server.py +++ b/agent_reach/integrations/mcp_server.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- """ -Agent Reach MCP Server — expose all capabilities as MCP tools. +Agent Reach MCP Server — expose doctor/status as MCP tool. Run: python -m agent_reach.integrations.mcp_server -8 tools for any MCP-compatible AI Agent. +Agent Reach is an installer + doctor tool. For actual reading/searching, +agents should call upstream tools directly (bird, yt-dlp, mcporter, etc.). """ import asyncio @@ -35,50 +36,15 @@ def create_server(): @server.list_tools() async def list_tools(): return [ - Tool(name="read_url", - description="Read content from any URL. Supports: web, GitHub, Reddit, Twitter, YouTube, Bilibili, RSS.", - inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}), - Tool(name="read_batch", - description="Read multiple URLs concurrently.", - inputSchema={"type": "object", "properties": {"urls": {"type": "array", "items": {"type": "string"}}}, "required": ["urls"]}), - Tool(name="detect_platform", - description="Detect what platform a URL belongs to.", - inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}), - Tool(name="search", - description="Semantic web search via Exa.", - inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "num_results": {"type": "integer", "default": 5}}, "required": ["query"]}), - Tool(name="search_reddit", - description="Search Reddit posts.", - inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "subreddit": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}), - Tool(name="search_github", - description="Search GitHub repositories.", - inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "language": {"type": "string"}, "limit": {"type": "integer", "default": 5}}, "required": ["query"]}), - Tool(name="search_twitter", - description="Search Twitter/X posts.", - inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}), Tool(name="get_status", - description="Get Agent Reach status: which channels are active.", + description="Get Agent Reach status: which channels are installed and active.", inputSchema={"type": "object", "properties": {}}), ] @server.call_tool() async def call_tool(name: str, arguments: dict): try: - if name == "read_url": - result = await eyes.read(arguments["url"]) - elif name == "read_batch": - result = await eyes.read_batch(arguments["urls"]) - elif name == "detect_platform": - result = eyes.detect_platform(arguments["url"]) - elif name == "search": - result = await eyes.search(arguments["query"], arguments.get("num_results", 5)) - elif name == "search_reddit": - result = await eyes.search_reddit(arguments["query"], arguments.get("subreddit"), arguments.get("limit", 10)) - elif name == "search_github": - result = await eyes.search_github(arguments["query"], arguments.get("language"), arguments.get("limit", 5)) - elif name == "search_twitter": - result = await eyes.search_twitter(arguments["query"], arguments.get("limit", 10)) - elif name == "get_status": + if name == "get_status": result = eyes.doctor_report() else: result = f"Unknown tool: {name}" diff --git a/agent_reach/skill/SKILL.md b/agent_reach/skill/SKILL.md index b7af5af..596cf42 100644 --- a/agent_reach/skill/SKILL.md +++ b/agent_reach/skill/SKILL.md @@ -1,72 +1,39 @@ --- name: agent-reach description: > - Give your AI agent eyes to see the entire internet. Read and search across - Twitter/X, Reddit, YouTube, GitHub, Bilibili, XiaoHongShu, LinkedIn, - Boss直聘, RSS, and any web page — all from a single CLI. - Use when: (1) reading content from URLs (tweets, Reddit posts, articles, videos), - (2) searching across platforms (web, Twitter, Reddit, GitHub, YouTube, Bilibili, - XiaoHongShu, LinkedIn, Boss直聘), - (3) user asks to configure/enable a platform channel, - (4) checking channel health or updating Agent Reach. - Triggers: "search Twitter/Reddit/YouTube", "read this URL", "find posts about", - "搜索", "读取", "查一下", "看看这个链接", - "帮我配", "帮我添加", "帮我安装". + Give your AI agent eyes to see the entire internet. Install and configure + upstream tools for Twitter/X, Reddit, YouTube, GitHub, Bilibili, XiaoHongShu, + LinkedIn, Boss直聘, RSS, and any web page — then call them directly. + Use when: (1) setting up platform access tools for the first time, + (2) checking which platforms are available, + (3) user asks to configure/enable a platform channel. + Triggers: "帮我配", "帮我添加", "帮我安装", "agent reach", "install channels", + "configure twitter", "enable reddit". --- # Agent Reach -Read and search the internet across 12+ platforms via unified CLI. +Install and configure upstream tools for 12+ platforms. After setup, call them directly — no wrapper layer. ## Setup -First check if agent-reach is installed: -```bash -agent-reach doctor -``` - -If command not found, install it: ```bash pip install https://github.com/Panniantong/agent-reach/archive/main.zip agent-reach install --env=auto +agent-reach doctor ``` -`install` auto-detects your environment and installs core dependencies (Node.js, mcporter, bird CLI, gh CLI). Read the output and run `agent-reach doctor` to see what's active. +`install` auto-detects your environment and installs core dependencies (Node.js, mcporter, bird CLI, gh CLI, yt-dlp, feedparser). Run `doctor` to see what's active. -## Commands - -### Read any URL -```bash -agent-reach read -agent-reach read --json # structured output -``` -Handles: tweets, Reddit posts, articles, YouTube/Bilibili (transcripts), GitHub repos, LinkedIn profiles, Boss直聘 jobs, XiaoHongShu notes, RSS feeds, and any web page. - -### Search - -```bash -agent-reach search "query" # web search (Exa) -agent-reach search-twitter "query" # Twitter/X -agent-reach search-reddit "query" # Reddit (--sub ) -agent-reach search-github "query" # GitHub (--lang ) -agent-reach search-youtube "query" # YouTube -agent-reach search-bilibili "query" # Bilibili (B站) -agent-reach search-xhs "query" # XiaoHongShu (小红书) -agent-reach search-linkedin "query" # LinkedIn -agent-reach search-bosszhipin "query" # Boss直聘 -``` - -All search commands support `-n ` for number of results. - -### Management +## Management ```bash agent-reach doctor # channel status overview -agent-reach watch # quick health + update check (for scheduled tasks) +agent-reach watch # quick health + update check agent-reach check-update # check for new versions ``` -### Configure channels +## Configure channels ```bash agent-reach configure twitter-cookies "auth_token=xxx; ct0=yyy" @@ -76,45 +43,188 @@ agent-reach configure --from-browser chrome # auto-extract cookies from local ## Configuring a channel ("帮我配 XXX") -All channels follow the same flow. When a user asks to configure/enable any channel: +When a user asks to configure/enable any channel: 1. Run `agent-reach doctor` -2. Find the channel in the output — it shows the current status (✅/⚠️/⬜) and **what to do next** -3. Execute whatever you can automatically (install packages, start services, register MCP) -4. For steps that require human action (scan QR code, paste cookies, login), tell the user exactly what to do -5. After the user completes their part, run `agent-reach doctor` again to verify +2. Find the channel — it shows status (✅/⚠️/⬜) and **what to do next** +3. Execute what you can automatically (install packages, start services) +4. For human-required steps (scan QR, paste cookies), tell the user what to do +5. Run `agent-reach doctor` again to verify -**Do NOT memorize per-channel steps.** Always rely on `doctor` output — it stays up-to-date even when backends change. +**Do NOT memorize per-channel steps.** Always rely on `doctor` output. ### Common human actions -These are things only the user can do. When `doctor` indicates one is needed, explain it clearly: +- **Paste cookies:** User installs [Cookie-Editor](https://chromewebstore.google.com/detail/cookie-editor/hlkenndednhfkekhgcdicdfddnkalmdm) → goes to the website → exports Header String → sends it to you → you run `agent-reach configure -cookies "..."` +- **Scan QR code:** User opens the URL on their phone/browser and scans with the platform's app +- **Proxy:** Reddit/Bilibili/XiaoHongShu may block server IPs — suggest a residential proxy if on a server -- **Paste cookies:** User installs [Cookie-Editor](https://chromewebstore.google.com/detail/cookie-editor/hlkenndednhfkekhgcdicdfddnkalmdm) Chrome extension → goes to the website → exports Header String → sends it to you → you run `agent-reach configure -cookies "..."` -- **Scan QR code:** User opens the URL shown in `doctor` output on their phone/browser and scans with the platform's app -- **Browser login:** Some MCP services need a one-time browser login; on servers without a display, user may need VNC -- **Proxy:** Reddit/Bilibili/XiaoHongShu block server IPs — suggest a residential proxy (~$1/month) if on a server +--- -## Tips +## Using Upstream Tools Directly -- Always try `agent-reach read ` first for any URL — it auto-detects the platform -- If a channel is ⬜ but the user hasn't asked for it, don't push — let them opt in -- If a channel breaks, run `agent-reach doctor` to diagnose -- LinkedIn and Boss直聘 have Jina Reader fallback even without full setup -- Twitter search 在 bird 失败时会自动 fallback 到 Exa 搜索 +After `agent-reach install`, call the upstream tools directly. No need for `agent-reach read` or `agent-reach search`. + +### Twitter/X (bird CLI) + +```bash +# Search tweets +bird search "query" --json -n 10 + +# Read a specific tweet +bird read https://x.com/user/status/123 --json + +# Read a user's timeline +bird timeline @username --json -n 20 +``` + +### YouTube (yt-dlp) + +```bash +# Get video metadata +yt-dlp --dump-json "https://www.youtube.com/watch?v=xxx" + +# Download subtitles only +yt-dlp --write-sub --write-auto-sub --sub-lang "zh-Hans,zh,en" --skip-download -o "/tmp/%(id)s" "URL" +# Then read the .vtt file + +# Search (yt-dlp ytsearch) +yt-dlp --dump-json "ytsearch5:query" +``` + +### Bilibili (yt-dlp) + +```bash +# Get video metadata +yt-dlp --dump-json "https://www.bilibili.com/video/BVxxx" + +# Download subtitles +yt-dlp --write-sub --write-auto-sub --sub-lang "zh-Hans,zh,en" --convert-subs vtt --skip-download -o "/tmp/%(id)s" "URL" +``` + +### Reddit (JSON API) + +```bash +# Read a subreddit +curl -s "https://www.reddit.com/r/python/hot.json?limit=10" -H "User-Agent: agent-reach/1.0" + +# Read a post with comments +curl -s "https://www.reddit.com/r/python/comments/POST_ID.json" -H "User-Agent: agent-reach/1.0" + +# Search +curl -s "https://www.reddit.com/search.json?q=query&limit=10" -H "User-Agent: agent-reach/1.0" +``` + +Note: On servers, Reddit may block your IP. Use proxy or search via Exa instead. + +### 小红书 / XiaoHongShu (mcporter + xiaohongshu-mcp) + +```bash +# Search notes +mcporter call 'xiaohongshu.search_feeds(keyword: "query")' + +# Read a note +mcporter call 'xiaohongshu.get_feed_detail(feed_id: "xxx", xsec_token: "yyy")' + +# Get comments +mcporter call 'xiaohongshu.get_feed_comments(feed_id: "xxx", xsec_token: "yyy")' + +# Post a note +mcporter call 'xiaohongshu.create_image_feed(title: "标题", desc: "内容", image_paths: ["/path/to/img.jpg"])' +``` + +### GitHub (gh CLI) + +```bash +# Search repos +gh search repos "query" --sort stars --limit 10 + +# View a repo +gh repo view owner/repo + +# Search code +gh search code "query" --language python + +# List issues +gh issue list -R owner/repo --state open + +# View a specific issue/PR +gh issue view 123 -R owner/repo +``` + +### Web — Any URL (Jina Reader) + +```bash +# Read any webpage as markdown +curl -s "https://r.jina.ai/URL" -H "Accept: text/markdown" + +# Search the web +curl -s "https://s.jina.ai/query" -H "Accept: text/markdown" +``` + +### Exa Search (mcporter + exa MCP) + +```bash +# Web search +mcporter call 'exa.web_search_exa(query: "query", numResults: 5)' + +# Code search (GitHub, StackOverflow, docs) +mcporter call 'exa.get_code_context_exa(query: "how to parse JSON in Python", tokensNum: 3000)' + +# Company research +mcporter call 'exa.company_research_exa(companyName: "OpenAI")' +``` + +### LinkedIn (mcporter + linkedin-scraper-mcp) + +```bash +# View a profile +mcporter call 'linkedin.get_person_profile(linkedin_url: "https://linkedin.com/in/username")' + +# Search people +mcporter call 'linkedin.search_people(keyword: "AI engineer", limit: 10)' + +# View company +mcporter call 'linkedin.get_company_profile(linkedin_url: "https://linkedin.com/company/xxx")' +``` + +Fallback: `curl -s "https://r.jina.ai/https://linkedin.com/in/username"` + +### Boss直聘 (mcporter + mcp-bosszp) + +```bash +# Browse recommended jobs +mcporter call 'bosszhipin.get_recommend_jobs_tool(page: 1)' + +# Search jobs +mcporter call 'bosszhipin.search_jobs_tool(keyword: "Python", city: "北京", page: 1)' + +# View job details +mcporter call 'bosszhipin.get_job_detail_tool(job_url: "https://www.zhipin.com/job_detail/xxx")' +``` + +Fallback: `curl -s "https://r.jina.ai/https://www.zhipin.com/job_detail/xxx"` + +### RSS (feedparser) + +```python +python3 -c " +import feedparser +d = feedparser.parse('https://example.com/feed') +for e in d.entries[:5]: + print(f'{e.title} — {e.link}') +" +``` ## Troubleshooting ### Twitter "fetch failed" -bird CLI 用 Node.js 原生 `fetch()`,不走 `HTTP_PROXY`/`HTTPS_PROXY` 环境变量。 -Agent Reach 已内置解决方案:通过 `undici` 的 `EnvHttpProxyAgent` 自动注入代理。 +bird CLI uses Node.js native `fetch()`, which doesn't respect `HTTP_PROXY`. Solutions: +1. Ensure `undici` is installed: `npm install -g undici` +2. Configure proxy: `agent-reach configure proxy http://user:pass@ip:port` +3. If still failing, use transparent proxy (Clash TUN, Proxifier) -**你(Agent)需要确认:** -1. `undici` 已全局安装:`npm list -g undici`,没有就装:`npm install -g undici` -2. 用户已配置代理:`agent-reach configure proxy http://user:pass@ip:port` -3. Agent Reach 会自动处理注入,不需要用户额外操作 +### Channel broken? -如果以上都没问题仍然 "fetch failed",建议用户使用透明代理(如 Clash TUN 模式、Proxifier)。 - -详见 https://github.com/Panniantong/Agent-Reach/blob/main/docs/troubleshooting.md +Run `agent-reach doctor` — it shows what's wrong and how to fix it.