diff --git a/README.md b/README.md
index 0f428b6..cf322a0 100644
--- a/README.md
+++ b/README.md
@@ -115,14 +115,14 @@ AI Agent 已经能帮你写代码、改文档、管项目——但你让它去
不需要任何配置,告诉 Agent 就行:
-- "帮我看看这个链接" → 任意网页
-- "这个 GitHub 仓库是做什么的" → GitHub 仓库、Issue、代码
-- "这个视频讲了什么" → YouTube / B站字幕提取
-- "帮我看看这条推文" → Twitter 推文
-- "订阅这个 RSS" → RSS / Atom 源
-- "搜一下 GitHub 上有什么 LLM 框架" → GitHub 搜索
+- "帮我看看这个链接" → `curl https://r.jina.ai/URL` 读任意网页
+- "这个 GitHub 仓库是做什么的" → `gh repo view owner/repo`
+- "这个视频讲了什么" → `yt-dlp --dump-json URL` 提取字幕
+- "帮我看看这条推文" → `bird read URL --json`
+- "订阅这个 RSS" → `feedparser` 解析
+- "搜一下 GitHub 上有什么 LLM 框架" → `gh search repos "LLM framework"`
-**不需要记命令。** Agent 自己知道该调什么。
+**不需要记命令。** Agent 读了 SKILL.md 之后自己知道该调什么。
---
@@ -134,9 +134,11 @@ AI Agent 已经能帮你写代码、改文档、管项目——但你让它去
Agent Reach 做的事情很简单:**帮你把这些选型和配置的活儿做完了。**
+安装完成后,Agent 直接调用上游工具(bird CLI、yt-dlp、mcporter、gh CLI 等),不需要经过 Agent Reach 的包装层。
+
### 🔌 每个渠道都是可插拔的
-每个平台对应一个独立的 Python 文件,实现统一接口。**后端工具随时可以换**——哪天出了更好的工具,改一个文件就行,其他不用动。
+每个平台背后是一个独立的上游工具。**不满意?换掉就行。**
```
channels/
@@ -229,13 +231,13 @@ Star 一下,下次需要的时候能找到。⭐
AI Agent 怎么搜索 Twitter / X?不想付 API 费用
-Agent Reach 使用 [bird CLI](https://www.npmjs.com/package/@steipete/bird) 通过 Cookie 认证访问 Twitter,完全免费。安装 Agent Reach 后,用 Cookie-Editor 导出你的 Twitter Cookie,运行 `agent-reach configure twitter-cookies "your_cookies"` 即可。之后 Agent 就可以用 `agent-reach search-twitter "关键词"` 搜索推文了。
+Agent Reach 使用 [bird CLI](https://www.npmjs.com/package/@steipete/bird) 通过 Cookie 认证访问 Twitter,完全免费。安装 Agent Reach 后,用 Cookie-Editor 导出你的 Twitter Cookie,运行 `agent-reach configure twitter-cookies "your_cookies"` 即可。之后 Agent 就可以用 `bird search "关键词" --json` 搜索推文了。
How to search Twitter/X with AI agent for free (no API)?
-Agent Reach uses the bird CLI with cookie auth — zero API fees. After installing, export your Twitter cookies with the Cookie-Editor extension, run `agent-reach configure twitter-cookies "your_cookies"`, then your agent can search with `agent-reach search-twitter "query"`.
+Agent Reach uses the bird CLI with cookie auth — zero API fees. After installing, export your Twitter cookies with the Cookie-Editor extension, run `agent-reach configure twitter-cookies "your_cookies"`, then your agent can search with `bird search "query" --json`.
@@ -247,19 +249,19 @@ Reddit 封锁数据中心 IP。配置一个住宅代理即可解决:`agent-rea
How to get YouTube video transcripts for AI?
-`agent-reach read https://youtube.com/watch?v=xxx` automatically extracts the transcript. Uses yt-dlp under the hood, supports multiple languages. No API key needed.
+`yt-dlp --dump-json "https://youtube.com/watch?v=xxx"` extracts video metadata; `yt-dlp --write-sub --skip-download "URL"` extracts subtitles. Uses yt-dlp under the hood, supports multiple languages. No API key needed.
怎么让 AI Agent 读小红书?
-小红书需要通过 Docker 运行一个 MCP 服务。安装 Docker 后,运行 `agent-reach install` 会自动配置。之后 Agent 就能用 `agent-reach read <小红书链接>` 或 `agent-reach search-xhs "关键词"` 了。
+小红书需要通过 Docker 运行一个 MCP 服务。安装 Docker 后,运行 `agent-reach install` 会自动配置。之后 Agent 就能用 `mcporter call 'xiaohongshu.get_feed_detail(...)'` 读取笔记或 `mcporter call 'xiaohongshu.search_feeds(keyword: "关键词")'` 搜索了。
Compatible with Claude Code / Cursor / OpenClaw / Windsurf?
-Yes! Agent Reach is a standard CLI tool — any AI coding agent that can run shell commands can use it. Works with Claude Code, Cursor, OpenClaw, Windsurf, Codex, and more. Just `pip install agent-reach` and the agent can start using it immediately.
+Yes! Agent Reach is an installer + configuration tool — any AI coding agent that can run shell commands can use it. Works with Claude Code, Cursor, OpenClaw, Windsurf, Codex, and more. Just `pip install agent-reach`, run `agent-reach install`, and the agent can start using the upstream tools immediately.
diff --git a/agent_reach/channels/__init__.py b/agent_reach/channels/__init__.py
index 2971a13..a2f6669 100644
--- a/agent_reach/channels/__init__.py
+++ b/agent_reach/channels/__init__.py
@@ -1,14 +1,10 @@
# -*- coding: utf-8 -*-
"""
-Channel registry — routes URLs to the right channel.
-
-This is the core of Agent Reach' pluggable architecture.
-Add a new channel: just create a file and register it here.
-Swap a backend: just change the implementation inside the channel file.
+Channel registry — lists all supported platforms for doctor checks.
"""
-from typing import Dict, List, Optional
-from .base import Channel, ReadResult, SearchResult
+from typing import List, Optional
+from .base import Channel
# Import all channels
from .web import WebChannel
@@ -24,7 +20,7 @@ from .linkedin import LinkedInChannel
from .bosszhipin import BossZhipinChannel
-# Channel registry — order matters (first match wins, web is last as fallback)
+# Channel registry
ALL_CHANNELS: List[Channel] = [
GitHubChannel(),
TwitterChannel(),
@@ -36,22 +32,9 @@ ALL_CHANNELS: List[Channel] = [
BossZhipinChannel(),
RSSChannel(),
ExaSearchChannel(),
- WebChannel(), # Fallback — handles any URL
+ WebChannel(),
]
-# Search-capable channels
-SEARCH_CHANNELS: Dict[str, Channel] = {
- ch.name: ch for ch in ALL_CHANNELS if ch.can_search()
-}
-
-
-def get_channel_for_url(url: str) -> Channel:
- """Find the right channel for a URL."""
- for channel in ALL_CHANNELS:
- if channel.can_handle(url):
- return channel
- return WebChannel() # Should never reach here, but just in case
-
def get_channel(name: str) -> Optional[Channel]:
"""Get a channel by name."""
@@ -67,7 +50,7 @@ def get_all_channels() -> List[Channel]:
__all__ = [
- "Channel", "ReadResult", "SearchResult",
- "ALL_CHANNELS", "SEARCH_CHANNELS",
- "get_channel_for_url", "get_channel", "get_all_channels",
+ "Channel",
+ "ALL_CHANNELS",
+ "get_channel", "get_all_channels",
]
diff --git a/agent_reach/channels/base.py b/agent_reach/channels/base.py
index c311750..21551cf 100644
--- a/agent_reach/channels/base.py
+++ b/agent_reach/channels/base.py
@@ -1,110 +1,28 @@
# -*- coding: utf-8 -*-
"""
-Channel base class — the universal interface for all platforms.
+Channel base class — platform availability checking.
-Every channel (YouTube, Twitter, GitHub, etc.) implements this interface.
-The backend tool can be swapped anytime without changing anything else.
+Each channel represents a platform (YouTube, Twitter, GitHub, etc.)
+and provides:
+ - can_handle(url) → does this URL belong to this platform?
+ - check(config) → is the upstream tool installed and configured?
-Example:
- class YouTubeChannel(Channel):
- name = "youtube"
- backends = ["yt-dlp"] # current backend, can be swapped
-
- async def read(self, url, config):
- # Just call yt-dlp, return standardized dict
- ...
+After installation, agents call upstream tools directly.
"""
import shutil
from abc import ABC, abstractmethod
-from dataclasses import dataclass
-from typing import Any, Dict, List, Optional, Tuple
-
-
-@dataclass
-class ReadResult:
- """Standardized read result. Every channel returns this."""
- title: str
- content: str
- url: str
- author: str = ""
- date: str = ""
- platform: str = ""
- extra: dict = None
-
- def __post_init__(self):
- self.extra = self.extra or {}
-
- def to_dict(self) -> dict:
- d = {
- "title": self.title,
- "content": self.content,
- "url": self.url,
- "platform": self.platform,
- }
- if self.author:
- d["author"] = self.author
- if self.date:
- d["date"] = self.date
- if self.extra:
- d["extra"] = self.extra
- return d
-
-
-@dataclass
-class SearchResult:
- """Standardized search result."""
- title: str
- url: str
- snippet: str = ""
- author: str = ""
- date: str = ""
- score: float = 0
- extra: dict = None
-
- def __post_init__(self):
- self.extra = self.extra or {}
-
- def to_dict(self) -> dict:
- d = {
- "title": self.title,
- "url": self.url,
- "snippet": self.snippet,
- }
- if self.author:
- d["author"] = self.author
- if self.date:
- d["date"] = self.date
- if self.extra:
- d["extra"] = self.extra
- return d
+from typing import List, Tuple
class Channel(ABC):
- """
- Base class for all channels.
-
- Subclasses just need to implement:
- - read(url, config) → ReadResult
- - can_handle(url) → bool
- - check(config) → (status, message)
-
- Optionally:
- - search(query, config, **kwargs) → list[SearchResult]
- """
+ """Base class for all channels."""
name: str = "" # e.g. "youtube"
- description: str = "" # e.g. "YouTube video transcripts"
- backends: List[str] = [] # e.g. ["yt-dlp"] — what external tool is used
- requires_config: List[str] = [] # e.g. ["reddit_proxy"]
- requires_tools: List[str] = [] # e.g. ["yt-dlp"]
+ description: str = "" # e.g. "YouTube 视频和字幕"
+ backends: List[str] = [] # e.g. ["yt-dlp"] — what upstream tool is used
tier: int = 0 # 0=zero-config, 1=needs free key, 2=needs setup
- @abstractmethod
- async def read(self, url: str, config=None) -> ReadResult:
- """Read content from a URL. Must return ReadResult."""
- ...
-
@abstractmethod
def can_handle(self, url: str) -> bool:
"""Check if this channel can handle this URL."""
@@ -112,29 +30,7 @@ class Channel(ABC):
def check(self, config=None) -> Tuple[str, str]:
"""
- Check if this channel is available.
+ Check if this channel's upstream tool is available.
Returns (status, message) where status is 'ok'/'warn'/'off'/'error'.
"""
- # Check required tools
- for tool in self.requires_tools:
- if not shutil.which(tool):
- return "off", f"需要安装:pip install {tool}"
-
- # Check required config
- for key in self.requires_config:
- if config and not config.get(key):
- return "off", f"需要配置 {key},运行 agent-reach setup"
-
return "ok", f"{'、'.join(self.backends) if self.backends else '内置'}"
-
- async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
- """Search this platform. Override if supported."""
- raise NotImplementedError(f"{self.name} does not support search")
-
- def can_search(self) -> bool:
- """Whether this channel supports search."""
- try:
- # Check if search is overridden
- return type(self).search is not Channel.search
- except:
- return False
diff --git a/agent_reach/channels/bilibili.py b/agent_reach/channels/bilibili.py
index 6ca2cf5..07843cd 100644
--- a/agent_reach/channels/bilibili.py
+++ b/agent_reach/channels/bilibili.py
@@ -1,207 +1,26 @@
# -*- coding: utf-8 -*-
-"""Bilibili — via yt-dlp (same backend as YouTube).
+"""Bilibili — check if yt-dlp is available."""
-Backend: yt-dlp (https://github.com/yt-dlp/yt-dlp)
-yt-dlp natively supports Bilibili — video info, subtitles, and search.
-"""
-
-import json
+import os
import shutil
-import subprocess
-from urllib.parse import urlparse
-from .base import Channel, ReadResult, SearchResult
-from typing import List
+from .base import Channel
class BilibiliChannel(Channel):
name = "bilibili"
- description = "B站视频信息和字幕"
+ description = "B站视频和字幕"
backends = ["yt-dlp"]
- requires_tools = ["yt-dlp"]
- tier = 0
+ tier = 1
def can_handle(self, url: str) -> bool:
+ from urllib.parse import urlparse
d = urlparse(url).netloc.lower()
return "bilibili.com" in d or "b23.tv" in d
def check(self, config=None):
if not shutil.which("yt-dlp"):
return "off", "yt-dlp 未安装。安装:pip install yt-dlp"
- proxy = config.get("bilibili_proxy") if config else None
+ proxy = (config.get("bilibili_proxy") if config else None) or os.environ.get("BILIBILI_PROXY")
if proxy:
- return "ok", "已配置代理,完整可用"
- import os
- is_server = bool(os.environ.get("SSH_CONNECTION") or os.path.exists("/etc/cloud"))
- if is_server:
- return "warn", "服务器 IP 可能被封,配置代理即可解决:agent-reach configure proxy URL"
- return "ok", "本地直连可用"
-
- async def read(self, url: str, config=None) -> ReadResult:
- if not shutil.which("yt-dlp"):
- raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp")
-
- proxy = config.get("bilibili_proxy") if config else None
-
- # Get video info via yt-dlp
- info = self._get_info(url, proxy)
- if not info:
- return ReadResult(
- title="Bilibili",
- content=f"⚠️ 无法获取视频信息: {url}\n服务器 IP 可能被封,配个代理:agent-reach configure proxy URL",
- url=url, platform="bilibili",
- )
-
- title = info.get("title", url)
- author = info.get("uploader", "")
- desc = info.get("description", "")
-
- # Try subtitles
- subtitle = self._get_subtitles(url, proxy)
- content = desc
- if subtitle:
- content += f"\n\n## 字幕\n{subtitle}"
-
- return ReadResult(
- title=title, content=content, url=url,
- author=author, platform="bilibili",
- extra={
- "view_count": info.get("view_count"),
- "like_count": info.get("like_count"),
- "duration": info.get("duration_string"),
- },
- )
-
- async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
- """Search Bilibili.
-
- Strategy:
- 1. Try yt-dlp bilisearch (works on local machines)
- 2. Fallback to Exa site:bilibili.com (works on servers)
- """
- if not shutil.which("yt-dlp"):
- raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp")
-
- limit = kwargs.get("limit", 5)
- proxy = config.get("bilibili_proxy") if config else None
-
- # Strategy 1: yt-dlp bilisearch
- results = self._search_ytdlp(query, limit, proxy)
- if results:
- return results
-
- # Strategy 2: Exa fallback (server-friendly)
- results = self._search_exa(query, limit)
- if results:
- return results
-
- return []
-
- def _search_ytdlp(self, query: str, limit: int, proxy: str = None) -> List[SearchResult]:
- """Search via yt-dlp bilisearch (needs local/Chinese IP)."""
- cmd = [
- "yt-dlp", "--dump-json", "--no-download",
- f"bilisearch{limit}:{query}",
- ]
- if proxy:
- cmd += ["--proxy", proxy]
-
- try:
- r = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
- if r.returncode != 0:
- return []
- results = []
- for line in r.stdout.strip().split("\n"):
- if not line.strip():
- continue
- try:
- d = json.loads(line)
- vid = d.get("id", "")
- url = d.get("webpage_url", f"https://www.bilibili.com/video/av{vid}")
- results.append(SearchResult(
- title=d.get("title", f"av{vid}"),
- url=url,
- snippet=f"👤 {d.get('uploader', '?')} · 👁 {d.get('view_count', '?')}",
- extra={
- "view_count": d.get("view_count"),
- "uploader": d.get("uploader"),
- "duration": d.get("duration_string"),
- },
- ))
- except json.JSONDecodeError:
- continue
- return results
- except subprocess.TimeoutExpired:
- return []
-
- def _search_exa(self, query: str, limit: int) -> List[SearchResult]:
- """Fallback: search via Exa (site:bilibili.com). Works on any IP."""
- try:
- r = subprocess.run(
- ["mcporter", "call",
- f'exa.web_search_exa(query: "site:bilibili.com {query}", numResults: {limit})'],
- capture_output=True, text=True, timeout=30,
- )
- if r.returncode != 0:
- return []
-
- results = []
- # Parse mcporter output: Title: / Author: / URL: / Text: blocks
- title, author, url = "", "", ""
- for line in r.stdout.split("\n"):
- if line.startswith("Title: "):
- title = line[7:].strip()
- elif line.startswith("Author: "):
- author = line[8:].strip()
- elif line.startswith("URL: "):
- url = line[5:].strip()
- if url and "bilibili.com" in url:
- results.append(SearchResult(
- title=title or url,
- url=url,
- snippet=f"👤 {author}" if author else "(via Exa search)",
- ))
- title, author, url = "", "", ""
- return results
- except Exception:
- return []
-
- def _get_info(self, url: str, proxy: str = None) -> dict:
- cmd = ["yt-dlp", "--dump-json", "--no-download", url]
- if proxy:
- cmd += ["--proxy", proxy]
- try:
- r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
- if r.returncode == 0:
- return json.loads(r.stdout)
- except (subprocess.TimeoutExpired, json.JSONDecodeError):
- pass
- return {}
-
- def _get_subtitles(self, url: str, proxy: str = None) -> str:
- import tempfile
- from pathlib import Path
-
- with tempfile.TemporaryDirectory() as tmpdir:
- cmd = [
- "yt-dlp", "--write-sub", "--write-auto-sub",
- "--sub-lang", "zh-Hans,zh,en",
- "--skip-download", "--sub-format", "vtt",
- "-o", f"{tmpdir}/%(id)s.%(ext)s", url,
- ]
- if proxy:
- cmd += ["--proxy", proxy]
- try:
- subprocess.run(cmd, capture_output=True, text=True, timeout=30)
- for f in Path(tmpdir).glob("*.vtt"):
- text = f.read_text(errors="replace")
- lines = []
- for line in text.split("\n"):
- line = line.strip()
- if not line or line.startswith("WEBVTT") or "-->" in line or line.isdigit():
- continue
- if line not in lines[-1:]:
- lines.append(line)
- return "\n".join(lines)
- except subprocess.TimeoutExpired:
- pass
- return ""
+ return "ok", "可提取视频信息和字幕(代理已配置)"
+ return "ok", "可提取视频信息和字幕(本地环境)。服务器可能需要代理"
diff --git a/agent_reach/channels/bosszhipin.py b/agent_reach/channels/bosszhipin.py
index 8e9e95f..feea5ee 100644
--- a/agent_reach/channels/bosszhipin.py
+++ b/agent_reach/channels/bosszhipin.py
@@ -1,62 +1,9 @@
# -*- coding: utf-8 -*-
-"""Boss直聘 (BOSS Zhipin) — via mcp-bosszp (MCP) or Jina Reader fallback.
+"""Boss直聘 — check if mcp-bosszp is available."""
-Backend: mcp-bosszp (161 stars, FastMCP + Playwright)
-Swap to: any Boss直聘 access tool
-"""
-
-import json
import shutil
import subprocess
-from urllib.parse import urlparse
-from .base import Channel, ReadResult, SearchResult
-from typing import List
-import requests
-
-
-def _mcporter_has_bosszhipin() -> bool:
- """Check if mcporter has Boss直聘 MCP configured."""
- if not shutil.which("mcporter"):
- return False
- try:
- r = subprocess.run(
- ["mcporter", "list"], capture_output=True, text=True, timeout=10
- )
- # Check for various possible config names
- out = r.stdout.lower()
- return "boss" in out or "zhipin" in out or "bosszhipin" in out
- except Exception:
- return False
-
-
-def _mcporter_call(expr: str, timeout: int = 30) -> str:
- """Call a Boss直聘 MCP tool via mcporter."""
- r = subprocess.run(
- ["mcporter", "call", expr],
- capture_output=True, text=True, timeout=timeout,
- )
- if r.returncode != 0:
- raise RuntimeError(r.stderr or r.stdout)
- return r.stdout
-
-
-def _get_mcp_name() -> str:
- """Get the actual MCP server name configured in mcporter."""
- try:
- r = subprocess.run(
- ["mcporter", "list"], capture_output=True, text=True, timeout=10
- )
- for line in r.stdout.split("\n"):
- line_lower = line.strip().lower()
- for name in ["bosszhipin", "boss-zp", "bosszp", "boss"]:
- if name in line_lower:
- # Extract the actual server name
- parts = line.strip().split()
- if parts:
- return parts[0]
- return "bosszhipin"
- except Exception:
- return "bosszhipin"
+from .base import Channel
class BossZhipinChannel(Channel):
@@ -66,118 +13,29 @@ class BossZhipinChannel(Channel):
tier = 2
def can_handle(self, url: str) -> bool:
+ from urllib.parse import urlparse
domain = urlparse(url).netloc.lower()
return "zhipin.com" in domain or "boss.com" in domain
def check(self, config=None):
- if _mcporter_has_bosszhipin():
- return "ok", "可搜索职位、向 HR 打招呼"
-
+ if not shutil.which("mcporter"):
+ return "off", (
+ "可通过 Jina Reader 读取职位页面。完整功能需要:\n"
+ " 1. git clone https://github.com/mucsbr/mcp-bosszp.git\n"
+ " 2. cd mcp-bosszp && pip install -r requirements.txt && playwright install chromium\n"
+ " 3. python boss_zhipin_fastmcp_v2.py(启动后扫码登录)\n"
+ " 4. mcporter config add bosszhipin http://localhost:8000/mcp"
+ )
+ try:
+ r = subprocess.run(
+ ["mcporter", "list"], capture_output=True, text=True, timeout=10
+ )
+ out = r.stdout.lower()
+ if "boss" in out or "zhipin" in out:
+ return "ok", "可搜索职位、向 HR 打招呼"
+ except Exception:
+ pass
return "off", (
- "可通过 Jina Reader 读取职位页面。完整功能需要:\n"
- " 1. git clone https://github.com/mucsbr/mcp-bosszp.git\n"
- " 2. cd mcp-bosszp && pip install -r requirements.txt && playwright install chromium\n"
- " 3. python boss_zhipin_fastmcp_v2.py(启动后扫码登录)\n"
- " 4. mcporter config add bosszhipin http://localhost:8000/mcp\n"
- " 或用 Docker:docker-compose up -d\n"
+ "mcporter 已装但 Boss直聘 MCP 未配置。\n"
" 详见 https://github.com/mucsbr/mcp-bosszp"
)
-
- async def read(self, url: str, config=None) -> ReadResult:
- # Boss直聘 pages mostly work with Jina Reader
- return await self._read_jina(url)
-
- async def _read_jina(self, url: str) -> ReadResult:
- """Read Boss直聘 page via Jina Reader."""
- try:
- resp = requests.get(
- f"https://r.jina.ai/{url}",
- headers={"Accept": "text/markdown"},
- timeout=15,
- )
- resp.raise_for_status()
- text = resp.text
-
- if len(text.strip()) < 50:
- return ReadResult(
- title="Boss直聘",
- content=(
- f"⚠️ 无法读取此页面内容: {url}\n\n"
- "提示:\n"
- "- 安装 mcp-bosszp 可解锁职位搜索和自动打招呼\n"
- "- 详见 https://github.com/mucsbr/mcp-bosszp"
- ),
- url=url,
- platform="bosszhipin",
- )
-
- return ReadResult(
- title=text[:100] if text else url,
- content=text,
- url=url,
- platform="bosszhipin",
- )
- except Exception:
- return ReadResult(
- title="Boss直聘",
- content=(
- f"⚠️ 无法读取此 Boss直聘页面: {url}\n\n"
- "提示:\n"
- "- Boss直聘部分页面需要登录\n"
- "- 安装 mcp-bosszp 可解锁完整功能\n"
- "- 详见 https://github.com/mucsbr/mcp-bosszp"
- ),
- url=url,
- platform="bosszhipin",
- )
-
- async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
- limit = kwargs.get("limit", 10)
-
- # Try MCP search first
- if _mcporter_has_bosszhipin():
- try:
- return await self._search_mcp(query, limit, config)
- except Exception:
- pass
-
- # Fallback to Exa
- from agent_reach.channels.exa_search import ExaSearchChannel
- exa = ExaSearchChannel()
- return await exa.search(f"site:zhipin.com {query}", config=config, limit=limit)
-
- async def _search_mcp(self, query: str, limit: int, config=None) -> List[SearchResult]:
- """Search Boss直聘 via MCP."""
- server = _get_mcp_name()
- try:
- out = _mcporter_call(
- f'{server}.get_recommend_jobs_tool(page: 1)',
- timeout=30,
- )
- return self._parse_jobs(out, limit)
- except Exception:
- return []
-
- def _parse_jobs(self, text: str, limit: int) -> List[SearchResult]:
- """Parse MCP job search output into SearchResults."""
- results = []
- try:
- data = json.loads(text)
- jobs = data if isinstance(data, list) else data.get("jobs", data.get("results", []))
- for job in jobs[:limit]:
- if isinstance(job, dict):
- title = job.get("title") or job.get("jobName", "")
- company = job.get("company") or job.get("brandName", "")
- salary = job.get("salary") or job.get("salaryDesc", "")
- url = job.get("url", "")
- snippet = f"🏢 {company}" if company else ""
- if salary:
- snippet += f" · 💰 {salary}"
- results.append(SearchResult(
- title=title,
- url=url,
- snippet=snippet,
- ))
- except (json.JSONDecodeError, KeyError):
- pass
- return results
diff --git a/agent_reach/channels/exa_search.py b/agent_reach/channels/exa_search.py
index 0a2e3a0..e264399 100644
--- a/agent_reach/channels/exa_search.py
+++ b/agent_reach/channels/exa_search.py
@@ -1,110 +1,36 @@
# -*- coding: utf-8 -*-
-"""Exa semantic search — via mcporter + Exa MCP server.
+"""Exa Search — check if mcporter + Exa MCP is available."""
-Backend: Exa MCP at mcp.exa.ai (OAuth, no API key needed)
-Requires: mcporter CLI
-"""
-
-import json
import shutil
import subprocess
-from .base import Channel, SearchResult
-from typing import List
+from .base import Channel
class ExaSearchChannel(Channel):
name = "exa_search"
- description = "全网语义搜索(同时支持 Reddit/Twitter 搜索)"
- backends = ["exa-mcp"]
- tier = 1
-
- def _mcporter_ok(self) -> bool:
- if not shutil.which("mcporter"):
- return False
- try:
- r = subprocess.run(
- ["mcporter", "list"], capture_output=True, text=True, timeout=10
- )
- return "exa" in r.stdout
- except Exception:
- return False
-
- def _call(self, expr: str, timeout: int = 30) -> str:
- r = subprocess.run(
- ["mcporter", "call", expr],
- capture_output=True, text=True, timeout=timeout,
- )
- if r.returncode != 0:
- raise RuntimeError(r.stderr or r.stdout)
- return r.stdout
-
- # ── Channel interface ──
+ description = "全网语义搜索"
+ backends = ["Exa via mcporter"]
+ tier = 0
def can_handle(self, url: str) -> bool:
- return False # search-only
-
- async def read(self, url: str, config=None):
- raise NotImplementedError("Exa is a search engine, not a reader")
+ return False # Search-only channel
def check(self, config=None):
if not shutil.which("mcporter"):
return "off", (
- "需要 mcporter。安装:npm install -g mcporter && "
- "mcporter config add exa https://mcp.exa.ai/mcp"
- )
- if not self._mcporter_ok():
- return "off", "mcporter 已装但 Exa 未配置。运行:mcporter config add exa https://mcp.exa.ai/mcp"
- return "ok", "MCP 已连接,免 Key 直接可用(全网搜索 + Reddit + Twitter)"
-
- async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
- if not self._mcporter_ok():
- raise ValueError(
- "Exa 搜索需要 mcporter。安装:\n"
+ "需要 mcporter + Exa MCP。安装:\n"
" npm install -g mcporter\n"
" mcporter config add exa https://mcp.exa.ai/mcp"
)
-
- limit = kwargs.get("limit", 5)
- safe_q = query.replace('"', '\\"')
- out = self._call(
- f'exa.web_search_exa(query: "{safe_q}", numResults: {min(limit, 10)})',
- timeout=30,
- )
- return self._parse_output(out, limit)
-
- # ── Parse mcporter text output ──
-
- def _parse_output(self, text: str, limit: int) -> List[SearchResult]:
- """Parse mcporter's Title/URL/Text block format."""
- results = []
- cur = {}
-
- for line in text.split("\n"):
- line = line.strip()
- if line.startswith("Title: "):
- if cur.get("title"):
- results.append(self._make_result(cur))
- cur = {"title": line[7:]}
- elif line.startswith("URL: "):
- cur["url"] = line[5:]
- elif line.startswith("Published Date: "):
- cur["date"] = line[16:]
- elif line.startswith("Text: "):
- cur["text"] = line[6:]
- elif "text" in cur and line:
- cur["text"] += " " + line
-
- if cur.get("title"):
- results.append(self._make_result(cur))
-
- return results[:limit]
-
- @staticmethod
- def _make_result(d: dict) -> SearchResult:
- return SearchResult(
- title=d.get("title", ""),
- url=d.get("url", ""),
- snippet=d.get("text", "")[:500],
- date=d.get("date", ""),
- score=0,
- )
+ try:
+ r = subprocess.run(
+ ["mcporter", "list"], capture_output=True, text=True, timeout=10
+ )
+ if "exa" in r.stdout.lower():
+ return "ok", "全网语义搜索可用(免费,无需 API Key)"
+ return "off", (
+ "mcporter 已装但 Exa 未配置。运行:\n"
+ " mcporter config add exa https://mcp.exa.ai/mcp"
+ )
+ except Exception:
+ return "off", "mcporter 连接异常"
diff --git a/agent_reach/channels/github.py b/agent_reach/channels/github.py
index bb4a937..790ba2b 100644
--- a/agent_reach/channels/github.py
+++ b/agent_reach/channels/github.py
@@ -1,16 +1,9 @@
# -*- coding: utf-8 -*-
-"""GitHub — via gh CLI.
+"""GitHub — check if gh CLI is available."""
-Backend: gh CLI (https://cli.github.com)
-Swap to: GitHub REST API
-"""
-
-import json
import shutil
import subprocess
-from urllib.parse import urlparse
-from .base import Channel, ReadResult, SearchResult
-from typing import List
+from .base import Channel
class GitHubChannel(Channel):
@@ -19,121 +12,18 @@ class GitHubChannel(Channel):
backends = ["gh CLI"]
tier = 0
- def _gh(self, args: list, timeout: int = 15) -> str:
- r = subprocess.run(
- ["gh"] + args,
- capture_output=True, text=True, timeout=timeout,
- )
- if r.returncode != 0:
- raise RuntimeError(r.stderr or r.stdout)
- return r.stdout
-
- def _gh_json(self, args: list, timeout: int = 15) -> dict:
- return json.loads(self._gh(args + ["--json"], timeout))
-
def can_handle(self, url: str) -> bool:
+ from urllib.parse import urlparse
return "github.com" in urlparse(url).netloc.lower()
def check(self, config=None):
if not shutil.which("gh"):
- return "warn", "gh CLI 未安装。安装:https://cli.github.com 。公开仓库仍可通过 Jina Reader 读取"
+ return "warn", "gh CLI 未安装。安装:https://cli.github.com"
try:
- self._gh(["auth", "status"], timeout=5)
+ subprocess.run(
+ ["gh", "auth", "status"],
+ capture_output=True, text=True, timeout=5
+ )
return "ok", "完整可用(读取、搜索、Fork、Issue、PR 等)"
except Exception:
return "ok", "gh CLI 已装但未认证。运行 gh auth login 可解锁完整功能"
-
- async def read(self, url: str, config=None) -> ReadResult:
- if not shutil.which("gh"):
- # Fallback to Jina Reader for public repos
- from agent_reach.channels.web import WebChannel
- return await WebChannel().read(url, config)
-
- path = urlparse(url).path.strip("/").split("/")
- if len(path) < 2:
- from agent_reach.channels.web import WebChannel
- return await WebChannel().read(url, config)
-
- owner, repo = path[0], path[1]
-
- # Issues / PRs
- if len(path) >= 4 and path[2] in ("issues", "pull"):
- return await self._read_issue(owner, repo, path[3], url)
-
- # Repo
- return await self._read_repo(owner, repo, url)
-
- async def _read_repo(self, owner: str, repo: str, url: str) -> ReadResult:
- slug = f"{owner}/{repo}"
- try:
- # Get repo info
- info = self._gh(["repo", "view", slug])
- # Get README
- try:
- readme = self._gh(
- ["api", f"repos/{slug}/readme", "--jq", ".content"],
- timeout=10,
- )
- import base64
- readme_text = base64.b64decode(readme).decode("utf-8", errors="replace")
- except Exception:
- readme_text = ""
-
- content = readme_text or info
- return ReadResult(
- title=slug, content=content, url=url,
- author=owner, platform="github",
- )
- except Exception:
- from agent_reach.channels.web import WebChannel
- return await WebChannel().read(url)
-
- async def _read_issue(self, owner: str, repo: str, num: str, url: str) -> ReadResult:
- slug = f"{owner}/{repo}"
- try:
- out = self._gh(["issue", "view", num, "-R", slug])
- return ReadResult(
- title=f"{slug}#{num}", content=out, url=url,
- platform="github",
- )
- except Exception:
- # Might be a PR
- try:
- out = self._gh(["pr", "view", num, "-R", slug])
- return ReadResult(
- title=f"{slug}#{num}", content=out, url=url,
- platform="github",
- )
- except Exception:
- from agent_reach.channels.web import WebChannel
- return await WebChannel().read(url)
-
- async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
- if not shutil.which("gh"):
- raise ValueError("GitHub search requires gh CLI. Install: https://cli.github.com")
-
- language = kwargs.get("language")
- limit = kwargs.get("limit", 5)
-
- args = ["search", "repos", query, "--sort", "stars", f"--limit={limit}"]
- if language:
- args += [f"--language={language}"]
-
- out = self._gh(args, timeout=15)
- results = []
- for line in out.strip().split("\n"):
- if not line.strip():
- continue
- parts = line.split("\t")
- if len(parts) >= 1:
- slug = parts[0].strip()
- desc = parts[1].strip() if len(parts) > 1 else ""
- stars = parts[3].strip() if len(parts) > 3 else ""
- lang = parts[5].strip() if len(parts) > 5 else ""
- results.append(SearchResult(
- title=slug,
- url=f"https://github.com/{slug}",
- snippet=desc,
- extra={"stars": stars, "language": lang},
- ))
- return results
diff --git a/agent_reach/channels/linkedin.py b/agent_reach/channels/linkedin.py
index c9b509f..cb60ffa 100644
--- a/agent_reach/channels/linkedin.py
+++ b/agent_reach/channels/linkedin.py
@@ -1,268 +1,39 @@
# -*- coding: utf-8 -*-
-"""LinkedIn — via linkedin-scraper-mcp (MCP) or Jina Reader fallback.
-
-Backend: linkedin-scraper-mcp (916 stars, Patchright browser automation)
-Swap to: any LinkedIn access tool
-"""
+"""LinkedIn — check if linkedin-scraper-mcp is available."""
import shutil
import subprocess
-from urllib.parse import urlparse
-from .base import Channel, ReadResult, SearchResult
-from typing import List
-import requests
-
-
-def _mcporter_has_linkedin() -> bool:
- """Check if mcporter has linkedin MCP configured."""
- if not shutil.which("mcporter"):
- return False
- try:
- r = subprocess.run(
- ["mcporter", "list"], capture_output=True, text=True, timeout=10
- )
- return "linkedin" in r.stdout.lower()
- except Exception:
- return False
-
-
-def _mcporter_call(expr: str, timeout: int = 30) -> str:
- """Call a LinkedIn MCP tool via mcporter."""
- r = subprocess.run(
- ["mcporter", "call", expr],
- capture_output=True, text=True, timeout=timeout,
- )
- if r.returncode != 0:
- raise RuntimeError(r.stderr or r.stdout)
- return r.stdout
+from .base import Channel
class LinkedInChannel(Channel):
name = "linkedin"
- description = "LinkedIn 个人/公司 Profile 和职位"
+ description = "LinkedIn 职业社交"
backends = ["linkedin-scraper-mcp", "Jina Reader"]
tier = 2
def can_handle(self, url: str) -> bool:
- domain = urlparse(url).netloc.lower()
- return "linkedin.com" in domain
+ from urllib.parse import urlparse
+ return "linkedin.com" in urlparse(url).netloc.lower()
def check(self, config=None):
- if _mcporter_has_linkedin():
- return "ok", "完整可用(Profile、公司、职位搜索)"
-
- # Check if linkedin-scraper-mcp is installed as CLI
- if shutil.which("linkedin-scraper-mcp"):
- return "warn", (
- "linkedin-scraper-mcp 已安装但未接入 mcporter。运行:\n"
- " 1. linkedin-scraper-mcp --login(在有浏览器的机器上登录)\n"
- " 2. linkedin-scraper-mcp --transport streamable-http --port 8001\n"
- " 3. mcporter config add linkedin http://localhost:8001/mcp"
+ if not shutil.which("mcporter"):
+ return "off", (
+ "基本内容可通过 Jina Reader 读取。完整功能需要:\n"
+ " pip install linkedin-scraper-mcp\n"
+ " mcporter config add linkedin http://localhost:3000/mcp\n"
+ " 详见 https://github.com/stickerdaniel/linkedin-mcp-server"
)
-
+ try:
+ r = subprocess.run(
+ ["mcporter", "list"], capture_output=True, text=True, timeout=10
+ )
+ if "linkedin" in r.stdout.lower():
+ return "ok", "完整可用(Profile、公司、职位搜索)"
+ except Exception:
+ pass
return "off", (
- "可通过 Jina Reader 读取部分内容。完整功能需要:\n"
- " 1. pip install linkedin-scraper-mcp\n"
- " 2. linkedin-scraper-mcp --login(在有浏览器的机器上登录)\n"
- " 3. linkedin-scraper-mcp --transport streamable-http --port 8001\n"
- " 4. mcporter config add linkedin http://localhost:8001/mcp\n"
- " 详见 https://github.com/stickerdaniel/linkedin-mcp-server"
+ "mcporter 已装但 LinkedIn MCP 未配置。运行:\n"
+ " pip install linkedin-scraper-mcp\n"
+ " mcporter config add linkedin http://localhost:3000/mcp"
)
-
- async def read(self, url: str, config=None) -> ReadResult:
- path = urlparse(url).path.strip("/")
-
- # Try MCP first
- if _mcporter_has_linkedin():
- try:
- if "/in/" in url:
- return await self._read_profile_mcp(url)
- elif "/company/" in url:
- return await self._read_company_mcp(url)
- elif "/jobs/view/" in url:
- return await self._read_job_mcp(url)
- except Exception:
- pass # Fall through to Jina
-
- # Fallback: Jina Reader
- return await self._read_jina(url)
-
- async def _read_profile_mcp(self, url: str) -> ReadResult:
- """Read a LinkedIn profile via MCP."""
- import re
- # Extract username from URL: /in/username/
- match = re.search(r"/in/([^/]+)", url)
- if not match:
- return await self._read_jina(url)
- username = match.group(1)
- safe_username = username.replace('"', '\\"')
- out = _mcporter_call(
- f'linkedin.get_person_profile(linkedin_username: "{safe_username}")',
- timeout=60,
- )
- return ReadResult(
- title=self._extract_title(out) or f"LinkedIn Profile - {username}",
- content=out.strip(),
- url=url,
- platform="linkedin",
- )
-
- async def _read_company_mcp(self, url: str) -> ReadResult:
- """Read a LinkedIn company page via MCP."""
- import re
- # Extract company name from URL: /company/name/
- match = re.search(r"/company/([^/]+)", url)
- if not match:
- return await self._read_jina(url)
- company = match.group(1)
- safe_company = company.replace('"', '\\"')
- out = _mcporter_call(
- f'linkedin.get_company_profile(company_name: "{safe_company}")',
- timeout=60,
- )
- return ReadResult(
- title=self._extract_title(out) or "LinkedIn Company",
- content=out.strip(),
- url=url,
- platform="linkedin",
- )
-
- async def _read_job_mcp(self, url: str) -> ReadResult:
- """Read a LinkedIn job posting via MCP."""
- import re
- match = re.search(r"/jobs/view/(\d+)", url)
- if not match:
- return await self._read_jina(url)
-
- job_id = match.group(1)
- out = _mcporter_call(
- f'linkedin.get_job_details(job_id: "{job_id}")',
- timeout=30,
- )
- return ReadResult(
- title=self._extract_title(out) or f"LinkedIn Job {job_id}",
- content=out.strip(),
- url=url,
- platform="linkedin",
- )
-
- async def _read_jina(self, url: str) -> ReadResult:
- """Fallback: use Jina Reader."""
- try:
- resp = requests.get(
- f"https://r.jina.ai/{url}",
- headers={"Accept": "text/markdown"},
- timeout=15,
- )
- resp.raise_for_status()
- text = resp.text
-
- # Check if content is usable
- if len(text.strip()) < 100 or "Sign in" in text[:200]:
- return ReadResult(
- title="LinkedIn",
- content=(
- f"⚠️ LinkedIn 页面需要登录才能完整查看。\n\n"
- f"URL: {url}\n\n"
- "完整功能需安装 linkedin-scraper-mcp:\n"
- " pip install linkedin-scraper-mcp\n"
- " uvx linkedin-scraper-mcp --login\n"
- " 详见 https://github.com/stickerdaniel/linkedin-mcp-server"
- ),
- url=url,
- platform="linkedin",
- )
-
- return ReadResult(
- title=text[:100] if text else url,
- content=text,
- url=url,
- platform="linkedin",
- )
- except Exception:
- return ReadResult(
- title="LinkedIn",
- content=(
- f"⚠️ 无法读取此 LinkedIn 页面: {url}\n\n"
- "提示:\n"
- "- LinkedIn 需要登录才能查看大部分内容\n"
- "- 安装 linkedin-scraper-mcp 解锁完整功能\n"
- "- 详见 https://github.com/stickerdaniel/linkedin-mcp-server"
- ),
- url=url,
- platform="linkedin",
- )
-
- async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
- limit = kwargs.get("limit", 10)
-
- # Try MCP search first
- if _mcporter_has_linkedin():
- try:
- return await self._search_mcp(query, limit)
- except Exception:
- pass
-
- # Fallback to Exa
- from agent_reach.channels.exa_search import ExaSearchChannel
- exa = ExaSearchChannel()
- return await exa.search(f"site:linkedin.com {query}", config=config, limit=limit)
-
- async def _search_mcp(self, query: str, limit: int) -> List[SearchResult]:
- """Search LinkedIn via MCP."""
- safe_q = query.replace('"', '\\"')
- # Try job search first (most common use case)
- try:
- out = _mcporter_call(
- f'linkedin.search_jobs(keywords: "{safe_q}")',
- timeout=60,
- )
- results = self._parse_search_results(out, "job")
- if results:
- return results[:limit]
- except Exception:
- pass
-
- # Try people search
- try:
- out = _mcporter_call(
- f'linkedin.search_people(keywords: "{safe_q}")',
- timeout=60,
- )
- results = self._parse_search_results(out, "people")
- if results:
- return results
- except Exception:
- pass
-
- return []
-
- def _parse_search_results(self, text: str, result_type: str) -> List[SearchResult]:
- """Parse MCP search output into SearchResults."""
- import json
- results = []
- try:
- data = json.loads(text)
- items = data if isinstance(data, list) else data.get("results", data.get("jobs", []))
- for item in items:
- if isinstance(item, dict):
- title = item.get("title") or item.get("name") or item.get("headline", "")
- url = item.get("url") or item.get("link", "")
- snippet = item.get("description") or item.get("company", "")
- results.append(SearchResult(
- title=title,
- url=url,
- snippet=snippet[:200] if snippet else "",
- ))
- except (json.JSONDecodeError, KeyError):
- # Try line-by-line parsing
- pass
- return results
-
- def _extract_title(self, text: str) -> str:
- """Extract a title from MCP output."""
- for line in text.split("\n"):
- line = line.strip()
- if line and not line.startswith(("{", "[", "#", "http")):
- return line[:80]
- return ""
diff --git a/agent_reach/channels/reddit.py b/agent_reach/channels/reddit.py
index 30bbfbf..b8e9c42 100644
--- a/agent_reach/channels/reddit.py
+++ b/agent_reach/channels/reddit.py
@@ -1,178 +1,26 @@
# -*- coding: utf-8 -*-
-"""Reddit — via Reddit JSON API + optional proxy.
-
-Backend: Reddit public JSON API (append .json to any URL)
-Swap to: any Reddit access method
-"""
+"""Reddit — check if proxy and credentials are configured."""
import os
-import requests
-from urllib.parse import urlparse
-from .base import Channel, ReadResult
+from .base import Channel
class RedditChannel(Channel):
name = "reddit"
description = "Reddit 帖子和评论"
- backends = ["Reddit JSON API"]
- tier = 2
-
- USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
+ backends = ["JSON API", "Exa"]
+ tier = 1
def can_handle(self, url: str) -> bool:
- domain = urlparse(url).netloc.lower()
- return "reddit.com" in domain or "redd.it" in domain
+ from urllib.parse import urlparse
+ d = urlparse(url).netloc.lower()
+ return "reddit.com" in d or "redd.it" in d
def check(self, config=None):
- proxy = config.get("reddit_proxy") if config else None
- has_bot = bool(os.environ.get("REDDIT_CLIENT_ID"))
- if proxy and has_bot:
- return "ok", "完整可用(代理 + OAuth Bot)"
- elif proxy:
- return "ok", "代理已配置,可读取帖子。配置 REDDIT_CLIENT_ID/SECRET 可解锁高级搜索和发帖"
- elif has_bot:
- return "warn", "OAuth Bot 已配置,但服务器直连可能被封。配个代理更稳定:agent-reach configure proxy URL"
- else:
- return "off", "搜索用 Exa 免费可用。读帖子需配个代理:agent-reach configure proxy URL"
-
- async def read(self, url: str, config=None) -> ReadResult:
- proxy = config.get("reddit_proxy") if config else None
- proxies = {"http": proxy, "https": proxy} if proxy else None
-
- # Clean URL: remove query params, trailing slash, then add .json
- parsed = urlparse(url)
- clean_path = parsed.path.rstrip("/")
- # Remove trailing .json if already present (avoid double .json)
- if clean_path.endswith(".json"):
- clean_path = clean_path[:-5]
- json_url = f"https://www.reddit.com{clean_path}.json"
-
- try:
- resp = requests.get(
- json_url,
- headers={"User-Agent": self.USER_AGENT},
- proxies=proxies,
- params={"limit": 50},
- timeout=15,
- )
- resp.raise_for_status()
- except requests.exceptions.HTTPError as e:
- status = e.response.status_code if e.response is not None else 0
- if status in (403, 429):
- return ReadResult(
- title="Reddit",
- content="⚠️ Reddit blocked this request (403 Forbidden). "
- "Reddit blocks most server IPs.\n"
- "Fix: agent-reach configure proxy http://user:pass@ip:port\n"
- "Cheap option: https://www.webshare.io ($1/month)\n\n"
- "Alternatively, search Reddit via Exa (free, no proxy needed): "
- "agent-reach search-reddit \"your query\"",
- url=url,
- platform="reddit",
- )
- raise
-
- data = resp.json()
-
- # Subreddit listing page: /r/sub/, /r/sub/hot, /r/sub/new, /r/sub/top
- if isinstance(data, dict) and data.get("kind") == "Listing":
- return self._parse_listing(data, url)
-
- if isinstance(data, list) and len(data) >= 1:
- # Post page: [post_listing, comments_listing]
- post = data[0]["data"]["children"][0]["data"]
- title = post.get("title", "")
- author = post.get("author", "")
- selftext = post.get("selftext", "")
- score = post.get("score", 0)
- subreddit = post.get("subreddit", "")
-
- # Extract comments
- comments_text = ""
- if len(data) >= 2:
- comments_text = self._extract_comments(data[1])
-
- content = selftext
- if comments_text:
- content += f"\n\n---\n## Comments\n{comments_text}"
-
- return ReadResult(
- title=title,
- content=content,
- url=url,
- author=f"u/{author}",
- platform="reddit",
- extra={"subreddit": subreddit, "score": score},
- )
-
- raise ValueError(f"Could not parse Reddit response for: {url}")
-
- def _parse_listing(self, data: dict, url: str) -> ReadResult:
- """Parse a subreddit listing (hot/new/top/rising)."""
- children = data.get("data", {}).get("children", [])
-
- # Extract subreddit name and sort from URL
- parsed = urlparse(url)
- path_parts = [p for p in parsed.path.strip("/").split("/") if p]
- subreddit = path_parts[1] if len(path_parts) >= 2 else "reddit"
- sort_type = path_parts[2] if len(path_parts) >= 3 else "hot"
-
- lines = []
- for i, child in enumerate(children, 1):
- if child.get("kind") != "t3":
- continue
- post = child.get("data", {})
- title = post.get("title", "")
- author = post.get("author", "")
- score = post.get("score", 0)
- num_comments = post.get("num_comments", 0)
- permalink = post.get("permalink", "")
- post_url = post.get("url", "")
- is_self = post.get("is_self", False)
-
- lines.append(f"### {i}. {title}")
- lines.append(f"👤 u/{author} · ⬆ {score} · 💬 {num_comments}")
- if not is_self and post_url:
- lines.append(f"🔗 {post_url}")
- lines.append(f"📎 https://www.reddit.com{permalink}")
- # Add selftext preview (first 200 chars)
- selftext = post.get("selftext", "")
- if selftext:
- preview = selftext[:200].replace("\n", " ")
- if len(selftext) > 200:
- preview += "..."
- lines.append(f"> {preview}")
- lines.append("")
-
- content = "\n".join(lines) if lines else "No posts found."
- return ReadResult(
- title=f"r/{subreddit} — {sort_type}",
- content=content,
- url=url,
- platform="reddit",
- extra={"subreddit": subreddit, "sort": sort_type, "count": len(children)},
+ proxy = (config.get("reddit_proxy") if config else None) or os.environ.get("REDDIT_PROXY")
+ if proxy:
+ return "ok", "代理已配置,可读取帖子。搜索走 Exa"
+ return "warn", (
+ "无代理。服务器 IP 可能被 Reddit 封锁。配置代理:\n"
+ " agent-reach configure proxy http://user:pass@ip:port"
)
-
- def _extract_comments(self, comments_data: dict, depth: int = 0, max_depth: int = 3) -> str:
- """Recursively extract comments."""
- lines = []
- children = comments_data.get("data", {}).get("children", [])
-
- for child in children:
- if child.get("kind") != "t1":
- continue
- data = child.get("data", {})
- author = data.get("author", "[deleted]")
- body = data.get("body", "")
- score = data.get("score", 0)
- indent = " " * depth
-
- lines.append(f"{indent}**u/{author}** ({score} points):")
- lines.append(f"{indent}{body}")
- lines.append("")
-
- # Recurse into replies
- if depth < max_depth and data.get("replies") and isinstance(data["replies"], dict):
- lines.append(self._extract_comments(data["replies"], depth + 1, max_depth))
-
- return "\n".join(lines)
diff --git a/agent_reach/channels/rss.py b/agent_reach/channels/rss.py
index 6b8646e..a61d9d9 100644
--- a/agent_reach/channels/rss.py
+++ b/agent_reach/channels/rss.py
@@ -1,13 +1,7 @@
# -*- coding: utf-8 -*-
-"""RSS feeds — via feedparser (free, pip dependency).
+"""RSS — check if feedparser is available."""
-Backend: feedparser (https://github.com/kurtmckee/feedparser)
-Swap to: any RSS parser
-"""
-
-import feedparser
-from urllib.parse import urlparse
-from .base import Channel, ReadResult
+from .base import Channel
class RSSChannel(Channel):
@@ -17,41 +11,11 @@ class RSSChannel(Channel):
tier = 0
def can_handle(self, url: str) -> bool:
- lower = url.lower()
- domain = urlparse(url).netloc.lower()
- return (lower.endswith(".xml") or "/rss" in lower or "/feed" in lower
- or "/atom" in lower or "rss" in domain)
+ return any(x in url.lower() for x in ["/feed", "/rss", ".xml", "atom"])
- async def read(self, url: str, config=None) -> ReadResult:
- feed = feedparser.parse(url)
-
- if feed.bozo and not feed.entries:
- raise ValueError(f"Failed to parse RSS feed: {url}")
-
- if not feed.entries:
- raise ValueError(f"No entries in RSS feed: {url}")
-
- # Return latest entry
- entry = feed.entries[0]
- content = entry.get("summary", "") or entry.get("description", "")
-
- # If multiple entries, summarize all
- if len(feed.entries) > 1:
- lines = [f"# {feed.feed.get('title', 'RSS Feed')}\n"]
- for i, e in enumerate(feed.entries[:20], 1):
- title = e.get("title", "Untitled")
- link = e.get("link", "")
- summary = e.get("summary", "")[:200]
- lines.append(f"## {i}. {title}")
- lines.append(f"🔗 {link}")
- if summary:
- lines.append(summary)
- lines.append("")
- content = "\n".join(lines)
-
- return ReadResult(
- title=feed.feed.get("title", entry.get("title", url)),
- content=content,
- url=url,
- platform="rss",
- )
+ def check(self, config=None):
+ try:
+ import feedparser
+ return "ok", "可读取 RSS/Atom 源"
+ except ImportError:
+ return "off", "feedparser 未安装。安装:pip install feedparser"
diff --git a/agent_reach/channels/twitter.py b/agent_reach/channels/twitter.py
index 77edc51..cfe605b 100644
--- a/agent_reach/channels/twitter.py
+++ b/agent_reach/channels/twitter.py
@@ -1,286 +1,38 @@
# -*- coding: utf-8 -*-
-"""Twitter/X — via bird CLI (free) or Jina Reader fallback.
-
-Backend: bird (@steipete/bird npm package) for search/timeline
- Jina Reader for single tweets
-Swap to: any Twitter access tool
-"""
+"""Twitter/X — check if bird CLI is available."""
import shutil
import subprocess
-from urllib.parse import urlparse
-from .base import Channel, ReadResult, SearchResult
-from typing import List
-import requests
-
-
-def _bird_cmd():
- """Find bird CLI binary."""
- return shutil.which("bird") or shutil.which("birdx")
-
-
-def _bird_env(config=None):
- """Build env dict with Twitter cookies and proxy support for bird CLI.
-
- Node.js native fetch() doesn't respect HTTP_PROXY/HTTPS_PROXY.
- We inject undici's EnvHttpProxyAgent via NODE_OPTIONS so bird
- automatically routes through the user's proxy.
- """
- import os
- import tempfile
- env = os.environ.copy()
- if config:
- auth_token = config.get("twitter_auth_token")
- ct0 = config.get("twitter_ct0")
- if auth_token:
- env["AUTH_TOKEN"] = auth_token
- if ct0:
- env["CT0"] = ct0
-
- # Auto-inject undici proxy support if HTTP_PROXY/HTTPS_PROXY is set
- has_proxy = env.get("HTTPS_PROXY") or env.get("HTTP_PROXY") or env.get("https_proxy") or env.get("http_proxy")
- if has_proxy:
- bootstrap = _get_proxy_bootstrap_path()
- if bootstrap:
- npm_root = subprocess.run(
- ["npm", "root", "-g"],
- capture_output=True, text=True, timeout=5,
- ).stdout.strip()
- existing_opts = env.get("NODE_OPTIONS", "")
- env["NODE_OPTIONS"] = f"--require {bootstrap} {existing_opts}".strip()
- env["NODE_PATH"] = npm_root
-
- return env
-
-
-def _get_proxy_bootstrap_path():
- """Create/return a bootstrap JS file that sets up undici proxy for fetch."""
- import os
- import tempfile
- bootstrap_path = os.path.join(tempfile.gettempdir(), "agent-reach-undici-proxy.js")
- if not os.path.exists(bootstrap_path):
- # Check if undici is available
- npm_root = subprocess.run(
- ["npm", "root", "-g"],
- capture_output=True, text=True, timeout=5,
- ).stdout.strip()
- undici_path = os.path.join(npm_root, "undici", "index.js")
- if not os.path.exists(undici_path):
- return None
- with open(bootstrap_path, "w") as f:
- f.write(
- "try {\n"
- " const { EnvHttpProxyAgent, setGlobalDispatcher } = require('undici');\n"
- " if (process.env.HTTPS_PROXY || process.env.HTTP_PROXY) {\n"
- " setGlobalDispatcher(new EnvHttpProxyAgent());\n"
- " }\n"
- "} catch(e) {}\n"
- )
- return bootstrap_path
+from .base import Channel
class TwitterChannel(Channel):
name = "twitter"
description = "Twitter/X 推文"
- backends = ["bird", "Jina Reader"]
- tier = 0 # Single tweet reading is zero-config
+ backends = ["bird CLI"]
+ tier = 1
def can_handle(self, url: str) -> bool:
- domain = urlparse(url).netloc.lower()
- return "x.com" in domain or "twitter.com" in domain
+ from urllib.parse import urlparse
+ d = urlparse(url).netloc.lower()
+ return "x.com" in d or "twitter.com" in d
def check(self, config=None):
- # Basic reading always works (Jina fallback)
- bird = _bird_cmd()
- if bird:
- # Actually test bird connectivity
- try:
- result = subprocess.run(
- [bird, "whoami"],
- capture_output=True, timeout=15,
- encoding='utf-8', errors='replace',
- env=_bird_env(config),
- )
- if result.returncode == 0 and "fetch failed" not in result.stdout.lower() and "fetch failed" not in result.stderr.lower():
- return "ok", "搜索、时间线、发推全部可用"
- else:
- error_hint = (result.stderr or result.stdout).strip()[:100]
- if "fetch failed" in (error_hint + result.stdout).lower():
- return "warn", (
- f"bird 已安装但连接失败(fetch failed)。可能原因:\n"
- " 1. Cookie 无效或过期 → 重新导出 Cookie\n"
- " 2. 需要代理但 Node.js fetch 不走系统代理 → 使用全局/透明代理(如 Clash TUN 模式、Proxifier)\n"
- " 3. 网络无法直连 x.com\n"
- " 搜索功能暂不可用,将使用 Exa 搜索作为替代"
- )
- return "warn", f"bird 连接异常:{error_hint}。搜索将使用 Exa 替代"
- except (subprocess.TimeoutExpired, FileNotFoundError):
- return "warn", "bird 已安装但连接超时。搜索将使用 Exa 替代"
- return "ok", "可读取推文。安装 bird + 配置 Cookie 可解锁搜索和发推"
-
- async def read(self, url: str, config=None) -> ReadResult:
- # Try bird first
- bird = _bird_cmd()
- if bird:
- return await self._read_bird(url, bird, config)
- # Fallback: Jina Reader
- return await self._read_jina(url)
-
- async def _read_bird(self, url: str, bird: str, config=None) -> ReadResult:
- result = subprocess.run(
- [bird, "read", url],
- capture_output=True, timeout=30,
- encoding='utf-8', errors='replace',
- env=_bird_env(config),
- )
- if result.returncode != 0:
- return await self._read_jina(url)
-
- text = result.stdout.strip()
- # Extract author from first line
- author = ""
- lines = text.split("\n")
- if lines and lines[0].startswith("@"):
- author = lines[0].split()[0]
-
- return ReadResult(
- title=text[:100],
- content=text,
- url=url,
- author=author,
- platform="twitter",
- )
-
- async def _read_jina(self, url: str) -> ReadResult:
- try:
- resp = requests.get(
- f"https://r.jina.ai/{url}",
- headers={"Accept": "text/markdown"},
- timeout=15,
+ bird = shutil.which("bird") or shutil.which("birdx")
+ if not bird:
+ return "warn", (
+ "bird CLI 未安装。搜索可通过 Exa 替代。安装:\n"
+ " npm install -g @steipete/bird"
)
- resp.raise_for_status()
- text = resp.text
-
- # Detect unusable Jina responses for X/Twitter (JS-required pages)
- unusable_indicators = [
- "page doesn", # "this page doesn't exist" (handles both ' and ')
- "miss what", # "Don't miss what's happening"
- "Something went wrong. Try reloading",
- "Log in](", # Markdown link: [Log in](...)
- ]
- if any(indicator in text for indicator in unusable_indicators):
- return ReadResult(
- title="Twitter/X",
- content="⚠️ Could not read this tweet.\n"
- "The tweet may have been deleted, or the account is private.\n\n"
- "Tips:\n"
- "- Make sure the URL is correct\n"
- "- Try: bird read (if bird CLI is installed)\n"
- "- For protected tweets, configure Twitter cookies: "
- "agent-reach configure twitter-cookies AUTH_TOKEN CT0",
- url=url,
- platform="twitter",
- )
-
- title = text[:100] if text else url
- return ReadResult(
- title=title,
- content=text,
- url=url,
- platform="twitter",
+ try:
+ r = subprocess.run(
+ [bird, "whoami"], capture_output=True, text=True, timeout=10
+ )
+ if r.returncode == 0:
+ return "ok", "完整可用(读取、搜索推文)"
+ return "warn", (
+ "bird CLI 已安装但未配置 Cookie。运行:\n"
+ " agent-reach configure twitter-cookies \"auth_token=xxx; ct0=yyy\""
)
except Exception:
- return ReadResult(
- title="Twitter/X",
- content="⚠️ Could not read this tweet.\n"
- "The tweet may have been deleted, or the account is private.\n\n"
- "Tips:\n"
- "- Make sure the URL is correct\n"
- "- Try: bird read (if bird CLI is installed)\n"
- "- For protected tweets, configure Twitter cookies: "
- "agent-reach configure twitter-cookies AUTH_TOKEN CT0",
- url=url,
- platform="twitter",
- )
-
- async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
- limit = kwargs.get("limit", 10)
-
- bird = _bird_cmd()
- if bird:
- return await self._search_bird(query, limit, bird, config)
-
- # Fallback to Exa
- return await self._search_exa(query, limit, config)
-
- async def _search_bird(self, query: str, limit: int, bird: str, config=None) -> List[SearchResult]:
- try:
- result = subprocess.run(
- [bird, "search", query, "-n", str(limit)],
- capture_output=True, timeout=30,
- encoding='utf-8', errors='replace',
- env=_bird_env(config),
- )
- if result.returncode != 0:
- stderr = (result.stderr or "").strip()
- if "fetch failed" in stderr.lower() or "fetch failed" in (result.stdout or "").lower():
- # bird can't connect — fall back to Exa silently
- return await self._search_exa(query, limit, config)
- return await self._search_exa(query, limit, config)
-
- parsed = self._parse_bird_output(result.stdout)
- if not parsed:
- # bird returned nothing — try Exa
- return await self._search_exa(query, limit, config)
- return parsed
- except (subprocess.TimeoutExpired, FileNotFoundError):
- return await self._search_exa(query, limit, config)
-
- def _parse_bird_output(self, text: str) -> List[SearchResult]:
- """Parse bird text output into SearchResults."""
- results = []
- current = {}
- text_lines = []
-
- for line in text.strip().split("\n"):
- line = line.strip()
- if line.startswith("─"):
- if current:
- current["text"] = "\n".join(text_lines).strip()
- results.append(SearchResult(
- title=current.get("text", "")[:80],
- url=current.get("url", ""),
- snippet=current.get("text", ""),
- author=current.get("author", ""),
- date=current.get("date", ""),
- ))
- current = {}
- text_lines = []
- continue
- if line.startswith("@") and line.endswith(":") and "(" in line:
- current["author"] = line.split()[0]
- continue
- if line.startswith("date:"):
- current["date"] = line[5:].strip()
- continue
- if line.startswith("url:"):
- current["url"] = line[4:].strip()
- continue
- if current is not None:
- text_lines.append(line)
-
- if current and text_lines:
- current["text"] = "\n".join(text_lines).strip()
- results.append(SearchResult(
- title=current.get("text", "")[:80],
- url=current.get("url", ""),
- snippet=current.get("text", ""),
- author=current.get("author", ""),
- date=current.get("date", ""),
- ))
- return results
-
- async def _search_exa(self, query: str, limit: int, config=None) -> List[SearchResult]:
- from agent_reach.channels.exa_search import ExaSearchChannel
- exa = ExaSearchChannel()
- return await exa.search(f"site:x.com {query}", config=config, limit=limit)
+ return "warn", "bird CLI 已安装但连接失败"
diff --git a/agent_reach/channels/web.py b/agent_reach/channels/web.py
index b2d7ae9..bbe33af 100644
--- a/agent_reach/channels/web.py
+++ b/agent_reach/channels/web.py
@@ -1,49 +1,17 @@
# -*- coding: utf-8 -*-
-"""Web pages — via Jina Reader API (free, no config needed).
+"""Web — any URL via Jina Reader. Always available."""
-Backend: Jina Reader (https://r.jina.ai)
-Swap to: Firecrawl, Trafilatura, or any other reader API
-"""
-
-import requests
-from .base import Channel, ReadResult
+from .base import Channel
class WebChannel(Channel):
name = "web"
- description = "网页(任意 URL)"
- backends = ["Jina Reader API"]
+ description = "任意网页"
+ backends = ["Jina Reader"]
tier = 0
- JINA_URL = "https://r.jina.ai/"
-
def can_handle(self, url: str) -> bool:
- # Fallback — handles any URL not matched by other channels
- return True
+ return True # Fallback — handles any URL
- async def read(self, url: str, config=None) -> ReadResult:
- resp = requests.get(
- f"{self.JINA_URL}{url}",
- headers={"Accept": "text/markdown"},
- timeout=15,
- )
- resp.raise_for_status()
- text = resp.text
-
- # Extract title from first markdown heading
- title = url
- for line in text.split("\n"):
- line = line.strip()
- if line.startswith("# "):
- title = line[2:].strip()
- break
- if line.startswith("Title:"):
- title = line[6:].strip()
- break
-
- return ReadResult(
- title=title,
- content=text,
- url=url,
- platform="web",
- )
+ def check(self, config=None):
+ return "ok", "通过 Jina Reader 读取任意网页(curl https://r.jina.ai/URL)"
diff --git a/agent_reach/channels/xiaohongshu.py b/agent_reach/channels/xiaohongshu.py
index 84ddfa1..e3dae04 100644
--- a/agent_reach/channels/xiaohongshu.py
+++ b/agent_reach/channels/xiaohongshu.py
@@ -1,16 +1,9 @@
# -*- coding: utf-8 -*-
-"""XiaoHongShu (小红书) — via mcporter + xiaohongshu MCP server.
+"""XiaoHongShu — check if mcporter + xiaohongshu MCP is available."""
-Backend: xiaohongshu-mcp server (internal API, reliable)
-Requires: mcporter CLI + xiaohongshu MCP server running
-"""
-
-import json
import shutil
import subprocess
-from urllib.parse import urlparse, parse_qs, urlencode
-from .base import Channel, ReadResult, SearchResult
-from typing import List, Optional
+from .base import Channel
class XiaoHongShuChannel(Channel):
@@ -19,30 +12,8 @@ class XiaoHongShuChannel(Channel):
backends = ["xiaohongshu-mcp"]
tier = 2
- def _mcporter_ok(self) -> bool:
- """Check if mcporter + xiaohongshu MCP is available."""
- if not shutil.which("mcporter"):
- return False
- try:
- r = subprocess.run(
- ["mcporter", "list"], capture_output=True, text=True, timeout=10
- )
- return "xiaohongshu" in r.stdout
- except Exception:
- return False
-
- def _call(self, expr: str, timeout: int = 30) -> str:
- r = subprocess.run(
- ["mcporter", "call", expr],
- capture_output=True, text=True, timeout=timeout,
- )
- if r.returncode != 0:
- raise RuntimeError(r.stderr or r.stdout)
- return r.stdout
-
- # ── Channel interface ──
-
def can_handle(self, url: str) -> bool:
+ from urllib.parse import urlparse
d = urlparse(url).netloc.lower()
return "xiaohongshu.com" in d or "xhslink.com" in d
@@ -55,190 +26,25 @@ class XiaoHongShuChannel(Channel):
" 3. mcporter config add xiaohongshu http://localhost:18060/mcp\n"
" 详见 https://github.com/xpzouying/xiaohongshu-mcp"
)
- if not self._mcporter_ok():
- return "off", (
- "mcporter 已装但小红书 MCP 未配置。运行:\n"
- " docker run -d --name xiaohongshu-mcp -p 18060:18060 xpzouying/xiaohongshu-mcp\n"
- " mcporter config add xiaohongshu http://localhost:18060/mcp"
- )
try:
- out = self._call("xiaohongshu.check_login_status()", timeout=10)
- if "已登录" in out or "logged" in out.lower():
+ r = subprocess.run(
+ ["mcporter", "list"], capture_output=True, text=True, timeout=10
+ )
+ if "xiaohongshu" not in r.stdout:
+ return "off", (
+ "mcporter 已装但小红书 MCP 未配置。运行:\n"
+ " docker run -d --name xiaohongshu-mcp -p 18060:18060 xpzouying/xiaohongshu-mcp\n"
+ " mcporter config add xiaohongshu http://localhost:18060/mcp"
+ )
+ except Exception:
+ return "off", "mcporter 连接异常"
+ try:
+ r = subprocess.run(
+ ["mcporter", "call", "xiaohongshu.check_login_status()"],
+ capture_output=True, text=True, timeout=10
+ )
+ if "已登录" in r.stdout or "logged" in r.stdout.lower():
return "ok", "完整可用(阅读、搜索、发帖、评论、点赞)"
return "warn", "MCP 已连接但未登录,需扫码登录"
except Exception:
return "warn", "MCP 连接异常,检查 xiaohongshu-mcp 服务是否在运行"
-
- async def read(self, url: str, config=None) -> ReadResult:
- if not self._mcporter_ok():
- return ReadResult(
- title="XiaoHongShu",
- content=(
- "⚠️ 小红书需要 mcporter + xiaohongshu-mcp 才能使用。\n\n"
- "安装步骤:\n"
- "1. npm install -g mcporter\n"
- "2. docker run -d --name xiaohongshu-mcp -p 18060:18060 xpzouying/xiaohongshu-mcp\n"
- "3. mcporter config add xiaohongshu http://localhost:18060/mcp\n"
- "4. 运行 agent-reach doctor 检查状态\n\n"
- "详见 https://github.com/xpzouying/xiaohongshu-mcp"
- ),
- url=url, platform="xiaohongshu",
- )
-
- note_id = self._extract_note_id(url)
- if not note_id:
- return ReadResult(
- title="XiaoHongShu",
- content=f"⚠️ 无法从 URL 提取笔记 ID: {url}",
- url=url, platform="xiaohongshu",
- )
-
- # Step 1: try xsec_token from URL query param (e.g. from search results)
- xsec_token = self._extract_token_from_url(url)
-
- # Step 2: try homepage feeds
- if not xsec_token:
- xsec_token = self._find_token_in_feeds(note_id)
-
- # Step 3: search for the note to get a fresh token
- if not xsec_token:
- xsec_token = self._find_token_by_search(note_id)
-
- # If no token found, fallback to Jina Reader
- if not xsec_token:
- return await self._read_jina(url)
-
- # Get detail via MCP
- out = self._call(
- f'xiaohongshu.get_feed_detail(feed_id: "{note_id}", xsec_token: "{xsec_token}")',
- timeout=15,
- )
-
- return ReadResult(
- title=self._extract_title(out) or f"XHS {note_id}",
- content=out.strip(),
- url=url, platform="xiaohongshu",
- )
-
- async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
- if not self._mcporter_ok():
- raise ValueError(
- "小红书搜索需要 mcporter + xiaohongshu-mcp。\n"
- "安装: npm install -g mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp"
- )
- limit = kwargs.get("limit", 10)
- safe_q = query.replace('"', '\\"')
- out = self._call(f'xiaohongshu.search_feeds(keyword: "{safe_q}")', timeout=30)
-
- results = []
- try:
- data = json.loads(out)
- for item in data.get("feeds", [])[:limit]:
- card = item.get("noteCard", {})
- user = card.get("user", {})
- interact = card.get("interactInfo", {})
- note_id = item.get("id", "")
- xsec_token = item.get("xsecToken", "")
- note_url = f"https://www.xiaohongshu.com/explore/{note_id}"
- if xsec_token:
- note_url += f"?xsec_token={xsec_token}"
- results.append(SearchResult(
- title=card.get("displayTitle", ""),
- url=note_url,
- snippet=f"👤 {user.get('nickname', '')} · ❤ {interact.get('likedCount', '0')}",
- score=0,
- ))
- except (json.JSONDecodeError, KeyError):
- pass
- return results
-
- # ── Helpers ──
-
- def _extract_note_id(self, url: str) -> str:
- """Extract note ID from URL path, ignoring query params."""
- path = urlparse(url).path.strip("/").split("/")
- return path[-1] if path else ""
-
- def _extract_token_from_url(self, url: str) -> Optional[str]:
- """Extract xsec_token from URL query parameter if present."""
- qs = parse_qs(urlparse(url).query)
- tokens = qs.get("xsec_token", [])
- return tokens[0] if tokens else None
-
- def _find_token_in_feeds(self, note_id: str) -> Optional[str]:
- """Try to find xsec_token for a note from homepage feeds."""
- try:
- out = self._call("xiaohongshu.list_feeds()", timeout=15)
- data = json.loads(out)
- for feed in data.get("feeds", []):
- if feed.get("id") == note_id:
- return feed.get("xsecToken") or None
- except Exception:
- pass
- return None
-
- def _find_token_by_search(self, note_id: str) -> Optional[str]:
- """Search for the note ID to get a fresh xsec_token."""
- try:
- out = self._call(
- f'xiaohongshu.search_feeds(keyword: "{note_id}")', timeout=20
- )
- data = json.loads(out)
- for feed in data.get("feeds", []):
- if feed.get("id") == note_id:
- return feed.get("xsecToken") or None
- # If exact match not found but results exist, try the first one
- # (search by note_id sometimes returns the note with a different key)
- except Exception:
- pass
- return None
-
- def _extract_title(self, text: str) -> str:
- for line in text.split("\n"):
- line = line.strip()
- if line and not line.startswith(("{", "[", "#", "http")):
- return line[:80]
- return ""
-
- async def _read_jina(self, url: str) -> ReadResult:
- """Fallback: read XHS note via Jina Reader when xsec_token unavailable."""
- import requests
- try:
- resp = requests.get(
- f"https://r.jina.ai/{url}",
- headers={"Accept": "text/markdown"},
- timeout=15,
- )
- resp.raise_for_status()
- text = resp.text
- if len(text.strip()) < 50 or "登录" in text[:200]:
- return ReadResult(
- title="XiaoHongShu",
- content=(
- f"⚠️ 无法获取笔记详情: {url}\n\n"
- "小红书需要 xsec_token 才能通过 MCP 读取笔记。\n"
- "请尝试先搜索相关关键词,再从结果中读取。"
- ),
- url=url, platform="xiaohongshu",
- )
- title = ""
- for line in text.split("\n"):
- line = line.strip()
- if line and not line.startswith(("#", "http", "![", "[")):
- title = line[:80]
- break
- return ReadResult(
- title=title or "XiaoHongShu",
- content=text.strip(),
- url=url, platform="xiaohongshu",
- )
- except Exception:
- return ReadResult(
- title="XiaoHongShu",
- content=(
- f"⚠️ 无法获取笔记详情: {url}\n\n"
- "小红书需要 xsec_token 才能通过 MCP 读取笔记。\n"
- "请尝试先搜索相关关键词,再从结果中读取。"
- ),
- url=url, platform="xiaohongshu",
- )
diff --git a/agent_reach/channels/youtube.py b/agent_reach/channels/youtube.py
index a544f8c..aaae963 100644
--- a/agent_reach/channels/youtube.py
+++ b/agent_reach/channels/youtube.py
@@ -1,125 +1,22 @@
# -*- coding: utf-8 -*-
-"""YouTube — via yt-dlp (video info, subtitles, and search).
+"""YouTube — check if yt-dlp is available."""
-Backend: yt-dlp (https://github.com/yt-dlp/yt-dlp)
-Supports: read (info + subtitles), search (ytsearch)
-"""
-
-import json
import shutil
-import subprocess
-import tempfile
-from pathlib import Path
-from urllib.parse import urlparse
-from .base import Channel, ReadResult, SearchResult
-from typing import List
+from .base import Channel
class YouTubeChannel(Channel):
name = "youtube"
- description = "YouTube 视频字幕"
+ description = "YouTube 视频和字幕"
backends = ["yt-dlp"]
- requires_tools = ["yt-dlp"]
tier = 0
def can_handle(self, url: str) -> bool:
+ from urllib.parse import urlparse
d = urlparse(url).netloc.lower()
return "youtube.com" in d or "youtu.be" in d
- async def read(self, url: str, config=None) -> ReadResult:
- if not shutil.which("yt-dlp"):
- raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp")
-
- with tempfile.TemporaryDirectory() as tmpdir:
- info = self._get_info(url)
- title = info.get("title", url)
- author = info.get("uploader", "")
-
- transcript = self._get_subtitles(url, tmpdir)
- if not transcript:
- transcript = f"[Video: {title}]\n[No subtitles available.]"
-
- return ReadResult(
- title=title, content=transcript, url=url,
- author=author, platform="youtube",
- extra={
- "duration": info.get("duration_string"),
- "view_count": info.get("view_count"),
- "upload_date": info.get("upload_date"),
- },
- )
-
- async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
- """Search YouTube via yt-dlp's ytsearch."""
- if not shutil.which("yt-dlp"):
- raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp")
-
- limit = kwargs.get("limit", 10)
-
- try:
- r = subprocess.run(
- ["yt-dlp", "--dump-json", "--flat-playlist",
- f"ytsearch{limit}:{query}"],
- capture_output=True, text=True, timeout=30,
- )
- results = []
- for line in r.stdout.strip().split("\n"):
- if not line.strip():
- continue
- try:
- d = json.loads(line)
- vid = d.get("id", "")
- results.append(SearchResult(
- title=d.get("title", ""),
- url=f"https://youtube.com/watch?v={vid}" if vid else "",
- snippet=(
- f"👤 {d.get('channel', '?')} · "
- f"⏱ {d.get('duration_string', '?')} · "
- f"👁 {d.get('view_count', '?')}"
- ),
- extra={
- "channel": d.get("channel"),
- "duration": d.get("duration_string"),
- "view_count": d.get("view_count"),
- },
- ))
- except json.JSONDecodeError:
- continue
- return results
- except subprocess.TimeoutExpired:
- return []
-
- def _get_info(self, url: str) -> dict:
- try:
- r = subprocess.run(
- ["yt-dlp", "--dump-json", "--no-download", url],
- capture_output=True, text=True, timeout=30,
- )
- if r.returncode == 0:
- return json.loads(r.stdout)
- except (subprocess.TimeoutExpired, json.JSONDecodeError):
- pass
- return {}
-
- def _get_subtitles(self, url: str, tmpdir: str) -> str:
- try:
- subprocess.run(
- ["yt-dlp", "--write-auto-sub", "--write-sub",
- "--sub-lang", "en,zh-Hans,zh",
- "--skip-download", "--sub-format", "vtt",
- "-o", f"{tmpdir}/%(id)s.%(ext)s", url],
- capture_output=True, text=True, timeout=30,
- )
- for f in Path(tmpdir).glob("*.vtt"):
- text = f.read_text(errors="replace")
- lines = []
- for line in text.split("\n"):
- line = line.strip()
- if not line or line.startswith("WEBVTT") or "-->" in line or line.isdigit():
- continue
- if line not in lines[-1:]:
- lines.append(line)
- return "\n".join(lines)
- except subprocess.TimeoutExpired:
- pass
- return ""
+ def check(self, config=None):
+ if shutil.which("yt-dlp"):
+ return "ok", "可提取视频信息和字幕"
+ return "off", "yt-dlp 未安装。安装:pip install yt-dlp"
diff --git a/agent_reach/cli.py b/agent_reach/cli.py
index 8e1f36e..f5f3e7e 100644
--- a/agent_reach/cli.py
+++ b/agent_reach/cli.py
@@ -1,20 +1,15 @@
# -*- coding: utf-8 -*-
"""
-Agent Reach CLI — command-line interface.
+Agent Reach CLI — installer, doctor, and configuration tool.
Usage:
- agent-reach read
- agent-reach search
- agent-reach search-reddit [--sub ]
- agent-reach search-github [--lang ]
- agent-reach search-twitter
- agent-reach setup
+ agent-reach install --env=auto
agent-reach doctor
- agent-reach version
+ agent-reach configure twitter-cookies "auth_token=xxx; ct0=yyy"
+ agent-reach setup
"""
import sys
-import asyncio
import argparse
import json
import os
@@ -48,57 +43,6 @@ def main():
sub = parser.add_subparsers(dest="command", help="Available commands")
# ── read ──
- p_read = sub.add_parser("read", help="Read content from a URL")
- p_read.add_argument("url", help="URL to read")
- p_read.add_argument("--json", dest="as_json", action="store_true", help="Output as JSON")
-
- # ── search ──
- p_search = sub.add_parser("search", help="Search the web (Exa)")
- p_search.add_argument("query", nargs="+", help="Search query")
- p_search.add_argument("-n", "--num", type=int, default=5, help="Number of results")
-
- # ── search-reddit ──
- p_sr = sub.add_parser("search-reddit", help="Search Reddit")
- p_sr.add_argument("query", nargs="+", help="Search query")
- p_sr.add_argument("--sub", help="Subreddit filter")
- p_sr.add_argument("-n", "--num", type=int, default=10, help="Number of results")
-
- # ── search-github ──
- p_sg = sub.add_parser("search-github", help="Search GitHub")
- p_sg.add_argument("query", nargs="+", help="Search query")
- p_sg.add_argument("--lang", help="Language filter")
- p_sg.add_argument("-n", "--num", type=int, default=5, help="Number of results")
-
- # ── search-twitter ──
- p_st = sub.add_parser("search-twitter", help="Search Twitter")
- p_st.add_argument("query", nargs="+", help="Search query")
- p_st.add_argument("-n", "--num", type=int, default=10, help="Number of results")
-
- # ── search-youtube ──
- p_sy = sub.add_parser("search-youtube", help="Search YouTube")
- p_sy.add_argument("query", nargs="+", help="Search query")
- p_sy.add_argument("-n", "--num", type=int, default=5, help="Number of results")
-
- # ── search-bilibili ──
- p_sb = sub.add_parser("search-bilibili", help="Search Bilibili")
- p_sb.add_argument("query", nargs="+", help="Search query")
- p_sb.add_argument("-n", "--num", type=int, default=5, help="Number of results")
-
- # ── search-xhs ──
- p_sx = sub.add_parser("search-xhs", help="Search XiaoHongShu")
- p_sx.add_argument("query", nargs="+", help="Search query")
- p_sx.add_argument("-n", "--num", type=int, default=10, help="Number of results")
-
- # ── search-linkedin ──
- p_sl = sub.add_parser("search-linkedin", help="Search LinkedIn")
- p_sl.add_argument("query", nargs="+", help="Search query")
- p_sl.add_argument("-n", "--num", type=int, default=10, help="Number of results")
-
- # ── search-bosszhipin ──
- p_sbz = sub.add_parser("search-bosszhipin", help="Search Boss直聘")
- p_sbz.add_argument("query", nargs="+", help="Search query")
- p_sbz.add_argument("-n", "--num", type=int, default=10, help="Number of results")
-
# ── setup ──
sub.add_parser("setup", help="Interactive configuration wizard")
@@ -161,10 +105,6 @@ def main():
_cmd_install(args)
elif args.command == "configure":
_cmd_configure(args)
- elif args.command == "read":
- asyncio.run(_cmd_read(args))
- elif args.command.startswith("search"):
- asyncio.run(_cmd_search(args))
# ── Command handlers ────────────────────────────────
@@ -849,98 +789,6 @@ def _cmd_setup():
print()
-async def _cmd_read(args):
- from agent_reach.core import AgentReach
- eyes = AgentReach()
- try:
- result = await eyes.read(args.url)
- if args.as_json:
- print(json.dumps(result, ensure_ascii=False, indent=2))
- else:
- print(f"\n📖 {result.get('title', 'Untitled')}")
- print(f"🔗 {result.get('url', '')}")
- if result.get("author"):
- print(f"👤 {result['author']}")
- print(f"\n{result.get('content', '')}")
- except Exception as e:
- error_str = str(e)
- if "400" in error_str and "Bad Request" in error_str:
- print(f"❌ Invalid URL: {args.url}", file=sys.stderr)
- print(" Please provide a valid URL (e.g., https://example.com)", file=sys.stderr)
- elif "ConnectionError" in type(e).__name__ or "Timeout" in type(e).__name__:
- print(f"❌ Could not connect to: {args.url}", file=sys.stderr)
- print(" Check your internet connection or the URL.", file=sys.stderr)
- else:
- print(f"❌ Error: {e}", file=sys.stderr)
- sys.exit(1)
-
-
-async def _cmd_search(args):
- from agent_reach.core import AgentReach
- eyes = AgentReach()
- query = " ".join(args.query).strip()
- num = args.num
-
- if not query:
- print("Please provide a search query.", file=sys.stderr)
- sys.exit(1)
-
- try:
- if args.command == "search":
- results = await eyes.search(query, num_results=num)
- elif args.command == "search-reddit":
- results = await eyes.search_reddit(query, subreddit=getattr(args, "sub", None), limit=num)
- elif args.command == "search-github":
- results = await eyes.search_github(query, language=getattr(args, "lang", None), limit=num)
- elif args.command == "search-twitter":
- results = await eyes.search_twitter(query, limit=num)
- elif args.command == "search-youtube":
- results = await eyes.search_youtube(query, limit=num)
- elif args.command == "search-bilibili":
- results = await eyes.search_bilibili(query, limit=num)
- elif args.command == "search-xhs":
- results = await eyes.search_xhs(query, limit=num)
- elif args.command == "search-linkedin":
- results = await eyes.search_linkedin(query, limit=num)
- elif args.command == "search-bosszhipin":
- results = await eyes.search_bosszhipin(query, limit=num)
- else:
- print(f"Unknown command: {args.command}", file=sys.stderr)
- sys.exit(1)
- except Exception as e:
- error_str = str(e)
- if "401" in error_str or "Unauthorized" in error_str:
- print("⚠️ Exa API key not configured or invalid.")
- print("Get a free key at https://exa.ai (1000 searches/month free)")
- print("Then run: agent-reach configure exa-key YOUR_KEY")
- sys.exit(1)
- elif "exa" in error_str.lower() or "api_key" in error_str.lower():
- print("⚠️ Exa API key not configured.")
- print("Get a free key at https://exa.ai")
- print("Then run: agent-reach configure exa-key YOUR_KEY")
- sys.exit(1)
- else:
- print(f"❌ Error: {e}", file=sys.stderr)
- sys.exit(1)
-
- if not results:
- print("No results found.")
- return
-
- for i, r in enumerate(results, 1):
- title = r.get("title") or r.get("name") or r.get("text", "")[:60]
- url = r.get("url", "")
- snippet = r.get("snippet") or r.get("description") or r.get("text", "")
- print(f"\n{i}. {title}")
- print(f" 🔗 {url}")
- if snippet:
- print(f" {snippet[:200]}")
- # Extra info for GitHub
- extra = r.get("extra", {})
- if extra.get("stars"):
- print(f" ⭐ {extra['stars']} 🍴 {extra.get('forks', 0)} 📝 {extra.get('language', '')}")
-
-
def _cmd_check_update():
"""Check for newer versions on GitHub."""
import requests
diff --git a/agent_reach/core.py b/agent_reach/core.py
index c5e8a3c..fd8caae 100644
--- a/agent_reach/core.py
+++ b/agent_reach/core.py
@@ -1,120 +1,36 @@
# -*- coding: utf-8 -*-
"""
-AgentReach — the unified entry point.
+AgentReach — installer, doctor, and configuration tool.
-Pure glue: routes URLs to the right channel, routes searches to the right engine.
-Every channel is a thin wrapper around an external tool. Swap any backend anytime.
+Agent Reach helps AI agents install and configure upstream platform tools
+(bird CLI, yt-dlp, mcporter, gh CLI, etc.). After installation, agents
+call the upstream tools directly — no wrapper layer needed.
Usage:
- from agent_reach import AgentReach
+ from agent_reach.doctor import check_all, format_report
+ from agent_reach.config import Config
- eyes = AgentReach()
- content = await eyes.read("https://github.com/openai/gpt-4")
- results = await eyes.search("AI agent framework")
+ config = Config()
+ results = check_all(config)
+ print(format_report(results))
"""
-import asyncio
-from typing import Any, Dict, List, Optional
+from typing import Dict, Optional
from agent_reach.config import Config
-from agent_reach.channels import get_channel_for_url, get_channel, get_all_channels
class AgentReach:
- """Give your AI Agent eyes to see the entire internet."""
+ """Give your AI Agent eyes to see the entire internet.
+
+ This class provides health-check functionality.
+ For reading/searching, use the upstream tools directly
+ (see SKILL.md for commands).
+ """
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
- # ── Reading ─────────────────────────────────────────
-
- async def read(self, url: str) -> Dict[str, Any]:
- """
- Read content from any URL. Auto-detects platform.
-
- Supported: Web, GitHub, Reddit, Twitter, YouTube,
- Bilibili, RSS, and more.
-
- Returns:
- Dict with title, content, url, author, platform, etc.
- """
- if not url.startswith(("http://", "https://")):
- url = f"https://{url}"
-
- channel = get_channel_for_url(url)
- result = await channel.read(url, config=self.config)
- return result.to_dict()
-
- async def read_batch(self, urls: List[str]) -> List[Dict[str, Any]]:
- """Read multiple URLs concurrently."""
- tasks = [self.read(url) for url in urls]
- results = await asyncio.gather(*tasks, return_exceptions=True)
- return [r for r in results if not isinstance(r, Exception)]
-
- def detect_platform(self, url: str) -> str:
- """Detect what platform a URL belongs to."""
- channel = get_channel_for_url(url)
- return channel.name
-
- # ── Searching ───────────────────────────────────────
-
- async def search(self, query: str, num_results: int = 5) -> List[Dict[str, Any]]:
- """Semantic web search via Exa."""
- ch = get_channel("exa_search")
- results = await ch.search(query, config=self.config, limit=num_results)
- return [r.to_dict() for r in results]
-
- async def search_reddit(self, query: str, subreddit: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
- """Search Reddit via Exa (bypasses IP blocks)."""
- ch = get_channel("exa_search")
- q = f"site:reddit.com/r/{subreddit} {query}" if subreddit else f"site:reddit.com {query}"
- results = await ch.search(q, config=self.config, limit=limit)
- return [r.to_dict() for r in results]
-
- async def search_github(self, query: str, language: Optional[str] = None, limit: int = 5) -> List[Dict[str, Any]]:
- """Search GitHub repositories."""
- ch = get_channel("github")
- results = await ch.search(query, config=self.config, language=language, limit=limit)
- return [r.to_dict() for r in results]
-
- async def search_twitter(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
- """Search Twitter. Uses bird CLI if available, else Exa."""
- ch = get_channel("twitter")
- results = await ch.search(query, config=self.config, limit=limit)
- return [r.to_dict() for r in results]
-
- async def search_youtube(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
- """Search YouTube via yt-dlp."""
- ch = get_channel("youtube")
- results = await ch.search(query, config=self.config, limit=limit)
- return [r.to_dict() for r in results]
-
- async def search_bilibili(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
- """Search Bilibili. Tries yt-dlp first, falls back to Exa."""
- ch = get_channel("bilibili")
- results = await ch.search(query, config=self.config, limit=limit)
- return [r.to_dict() for r in results]
-
- async def search_xhs(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
- """Search XiaoHongShu via mcporter."""
- ch = get_channel("xiaohongshu")
- results = await ch.search(query, config=self.config, limit=limit)
- return [r.to_dict() for r in results]
-
- async def search_linkedin(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
- """Search LinkedIn via MCP or Exa."""
- ch = get_channel("linkedin")
- results = await ch.search(query, config=self.config, limit=limit)
- return [r.to_dict() for r in results]
-
- async def search_bosszhipin(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
- """Search Boss直聘 via MCP or Exa."""
- ch = get_channel("bosszhipin")
- results = await ch.search(query, config=self.config, limit=limit)
- return [r.to_dict() for r in results]
-
- # ── Health ──────────────────────────────────────────
-
def doctor(self) -> Dict[str, dict]:
"""Check all channel availability."""
from agent_reach.doctor import check_all
@@ -124,13 +40,3 @@ class AgentReach:
"""Get formatted health report."""
from agent_reach.doctor import check_all, format_report
return format_report(check_all(self.config))
-
- # ── Sync wrappers ───────────────────────────────────
-
- def read_sync(self, url: str) -> Dict[str, Any]:
- """Synchronous version of read()."""
- return asyncio.run(self.read(url))
-
- def search_sync(self, query: str, num_results: int = 5) -> List[Dict[str, Any]]:
- """Synchronous version of search()."""
- return asyncio.run(self.search(query, num_results))
diff --git a/agent_reach/integrations/mcp_server.py b/agent_reach/integrations/mcp_server.py
index 13a91bc..636e4ee 100644
--- a/agent_reach/integrations/mcp_server.py
+++ b/agent_reach/integrations/mcp_server.py
@@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
"""
-Agent Reach MCP Server — expose all capabilities as MCP tools.
+Agent Reach MCP Server — expose doctor/status as MCP tool.
Run: python -m agent_reach.integrations.mcp_server
-8 tools for any MCP-compatible AI Agent.
+Agent Reach is an installer + doctor tool. For actual reading/searching,
+agents should call upstream tools directly (bird, yt-dlp, mcporter, etc.).
"""
import asyncio
@@ -35,50 +36,15 @@ def create_server():
@server.list_tools()
async def list_tools():
return [
- Tool(name="read_url",
- description="Read content from any URL. Supports: web, GitHub, Reddit, Twitter, YouTube, Bilibili, RSS.",
- inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}),
- Tool(name="read_batch",
- description="Read multiple URLs concurrently.",
- inputSchema={"type": "object", "properties": {"urls": {"type": "array", "items": {"type": "string"}}}, "required": ["urls"]}),
- Tool(name="detect_platform",
- description="Detect what platform a URL belongs to.",
- inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}),
- Tool(name="search",
- description="Semantic web search via Exa.",
- inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "num_results": {"type": "integer", "default": 5}}, "required": ["query"]}),
- Tool(name="search_reddit",
- description="Search Reddit posts.",
- inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "subreddit": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}),
- Tool(name="search_github",
- description="Search GitHub repositories.",
- inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "language": {"type": "string"}, "limit": {"type": "integer", "default": 5}}, "required": ["query"]}),
- Tool(name="search_twitter",
- description="Search Twitter/X posts.",
- inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}),
Tool(name="get_status",
- description="Get Agent Reach status: which channels are active.",
+ description="Get Agent Reach status: which channels are installed and active.",
inputSchema={"type": "object", "properties": {}}),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
try:
- if name == "read_url":
- result = await eyes.read(arguments["url"])
- elif name == "read_batch":
- result = await eyes.read_batch(arguments["urls"])
- elif name == "detect_platform":
- result = eyes.detect_platform(arguments["url"])
- elif name == "search":
- result = await eyes.search(arguments["query"], arguments.get("num_results", 5))
- elif name == "search_reddit":
- result = await eyes.search_reddit(arguments["query"], arguments.get("subreddit"), arguments.get("limit", 10))
- elif name == "search_github":
- result = await eyes.search_github(arguments["query"], arguments.get("language"), arguments.get("limit", 5))
- elif name == "search_twitter":
- result = await eyes.search_twitter(arguments["query"], arguments.get("limit", 10))
- elif name == "get_status":
+ if name == "get_status":
result = eyes.doctor_report()
else:
result = f"Unknown tool: {name}"
diff --git a/agent_reach/skill/SKILL.md b/agent_reach/skill/SKILL.md
index b7af5af..596cf42 100644
--- a/agent_reach/skill/SKILL.md
+++ b/agent_reach/skill/SKILL.md
@@ -1,72 +1,39 @@
---
name: agent-reach
description: >
- Give your AI agent eyes to see the entire internet. Read and search across
- Twitter/X, Reddit, YouTube, GitHub, Bilibili, XiaoHongShu, LinkedIn,
- Boss直聘, RSS, and any web page — all from a single CLI.
- Use when: (1) reading content from URLs (tweets, Reddit posts, articles, videos),
- (2) searching across platforms (web, Twitter, Reddit, GitHub, YouTube, Bilibili,
- XiaoHongShu, LinkedIn, Boss直聘),
- (3) user asks to configure/enable a platform channel,
- (4) checking channel health or updating Agent Reach.
- Triggers: "search Twitter/Reddit/YouTube", "read this URL", "find posts about",
- "搜索", "读取", "查一下", "看看这个链接",
- "帮我配", "帮我添加", "帮我安装".
+ Give your AI agent eyes to see the entire internet. Install and configure
+ upstream tools for Twitter/X, Reddit, YouTube, GitHub, Bilibili, XiaoHongShu,
+ LinkedIn, Boss直聘, RSS, and any web page — then call them directly.
+ Use when: (1) setting up platform access tools for the first time,
+ (2) checking which platforms are available,
+ (3) user asks to configure/enable a platform channel.
+ Triggers: "帮我配", "帮我添加", "帮我安装", "agent reach", "install channels",
+ "configure twitter", "enable reddit".
---
# Agent Reach
-Read and search the internet across 12+ platforms via unified CLI.
+Install and configure upstream tools for 12+ platforms. After setup, call them directly — no wrapper layer.
## Setup
-First check if agent-reach is installed:
-```bash
-agent-reach doctor
-```
-
-If command not found, install it:
```bash
pip install https://github.com/Panniantong/agent-reach/archive/main.zip
agent-reach install --env=auto
+agent-reach doctor
```
-`install` auto-detects your environment and installs core dependencies (Node.js, mcporter, bird CLI, gh CLI). Read the output and run `agent-reach doctor` to see what's active.
+`install` auto-detects your environment and installs core dependencies (Node.js, mcporter, bird CLI, gh CLI, yt-dlp, feedparser). Run `doctor` to see what's active.
-## Commands
-
-### Read any URL
-```bash
-agent-reach read
-agent-reach read --json # structured output
-```
-Handles: tweets, Reddit posts, articles, YouTube/Bilibili (transcripts), GitHub repos, LinkedIn profiles, Boss直聘 jobs, XiaoHongShu notes, RSS feeds, and any web page.
-
-### Search
-
-```bash
-agent-reach search "query" # web search (Exa)
-agent-reach search-twitter "query" # Twitter/X
-agent-reach search-reddit "query" # Reddit (--sub )
-agent-reach search-github "query" # GitHub (--lang )
-agent-reach search-youtube "query" # YouTube
-agent-reach search-bilibili "query" # Bilibili (B站)
-agent-reach search-xhs "query" # XiaoHongShu (小红书)
-agent-reach search-linkedin "query" # LinkedIn
-agent-reach search-bosszhipin "query" # Boss直聘
-```
-
-All search commands support `-n ` for number of results.
-
-### Management
+## Management
```bash
agent-reach doctor # channel status overview
-agent-reach watch # quick health + update check (for scheduled tasks)
+agent-reach watch # quick health + update check
agent-reach check-update # check for new versions
```
-### Configure channels
+## Configure channels
```bash
agent-reach configure twitter-cookies "auth_token=xxx; ct0=yyy"
@@ -76,45 +43,188 @@ agent-reach configure --from-browser chrome # auto-extract cookies from local
## Configuring a channel ("帮我配 XXX")
-All channels follow the same flow. When a user asks to configure/enable any channel:
+When a user asks to configure/enable any channel:
1. Run `agent-reach doctor`
-2. Find the channel in the output — it shows the current status (✅/⚠️/⬜) and **what to do next**
-3. Execute whatever you can automatically (install packages, start services, register MCP)
-4. For steps that require human action (scan QR code, paste cookies, login), tell the user exactly what to do
-5. After the user completes their part, run `agent-reach doctor` again to verify
+2. Find the channel — it shows status (✅/⚠️/⬜) and **what to do next**
+3. Execute what you can automatically (install packages, start services)
+4. For human-required steps (scan QR, paste cookies), tell the user what to do
+5. Run `agent-reach doctor` again to verify
-**Do NOT memorize per-channel steps.** Always rely on `doctor` output — it stays up-to-date even when backends change.
+**Do NOT memorize per-channel steps.** Always rely on `doctor` output.
### Common human actions
-These are things only the user can do. When `doctor` indicates one is needed, explain it clearly:
+- **Paste cookies:** User installs [Cookie-Editor](https://chromewebstore.google.com/detail/cookie-editor/hlkenndednhfkekhgcdicdfddnkalmdm) → goes to the website → exports Header String → sends it to you → you run `agent-reach configure -cookies "..."`
+- **Scan QR code:** User opens the URL on their phone/browser and scans with the platform's app
+- **Proxy:** Reddit/Bilibili/XiaoHongShu may block server IPs — suggest a residential proxy if on a server
-- **Paste cookies:** User installs [Cookie-Editor](https://chromewebstore.google.com/detail/cookie-editor/hlkenndednhfkekhgcdicdfddnkalmdm) Chrome extension → goes to the website → exports Header String → sends it to you → you run `agent-reach configure -cookies "..."`
-- **Scan QR code:** User opens the URL shown in `doctor` output on their phone/browser and scans with the platform's app
-- **Browser login:** Some MCP services need a one-time browser login; on servers without a display, user may need VNC
-- **Proxy:** Reddit/Bilibili/XiaoHongShu block server IPs — suggest a residential proxy (~$1/month) if on a server
+---
-## Tips
+## Using Upstream Tools Directly
-- Always try `agent-reach read ` first for any URL — it auto-detects the platform
-- If a channel is ⬜ but the user hasn't asked for it, don't push — let them opt in
-- If a channel breaks, run `agent-reach doctor` to diagnose
-- LinkedIn and Boss直聘 have Jina Reader fallback even without full setup
-- Twitter search 在 bird 失败时会自动 fallback 到 Exa 搜索
+After `agent-reach install`, call the upstream tools directly. No need for `agent-reach read` or `agent-reach search`.
+
+### Twitter/X (bird CLI)
+
+```bash
+# Search tweets
+bird search "query" --json -n 10
+
+# Read a specific tweet
+bird read https://x.com/user/status/123 --json
+
+# Read a user's timeline
+bird timeline @username --json -n 20
+```
+
+### YouTube (yt-dlp)
+
+```bash
+# Get video metadata
+yt-dlp --dump-json "https://www.youtube.com/watch?v=xxx"
+
+# Download subtitles only
+yt-dlp --write-sub --write-auto-sub --sub-lang "zh-Hans,zh,en" --skip-download -o "/tmp/%(id)s" "URL"
+# Then read the .vtt file
+
+# Search (yt-dlp ytsearch)
+yt-dlp --dump-json "ytsearch5:query"
+```
+
+### Bilibili (yt-dlp)
+
+```bash
+# Get video metadata
+yt-dlp --dump-json "https://www.bilibili.com/video/BVxxx"
+
+# Download subtitles
+yt-dlp --write-sub --write-auto-sub --sub-lang "zh-Hans,zh,en" --convert-subs vtt --skip-download -o "/tmp/%(id)s" "URL"
+```
+
+### Reddit (JSON API)
+
+```bash
+# Read a subreddit
+curl -s "https://www.reddit.com/r/python/hot.json?limit=10" -H "User-Agent: agent-reach/1.0"
+
+# Read a post with comments
+curl -s "https://www.reddit.com/r/python/comments/POST_ID.json" -H "User-Agent: agent-reach/1.0"
+
+# Search
+curl -s "https://www.reddit.com/search.json?q=query&limit=10" -H "User-Agent: agent-reach/1.0"
+```
+
+Note: On servers, Reddit may block your IP. Use proxy or search via Exa instead.
+
+### 小红书 / XiaoHongShu (mcporter + xiaohongshu-mcp)
+
+```bash
+# Search notes
+mcporter call 'xiaohongshu.search_feeds(keyword: "query")'
+
+# Read a note
+mcporter call 'xiaohongshu.get_feed_detail(feed_id: "xxx", xsec_token: "yyy")'
+
+# Get comments
+mcporter call 'xiaohongshu.get_feed_comments(feed_id: "xxx", xsec_token: "yyy")'
+
+# Post a note
+mcporter call 'xiaohongshu.create_image_feed(title: "标题", desc: "内容", image_paths: ["/path/to/img.jpg"])'
+```
+
+### GitHub (gh CLI)
+
+```bash
+# Search repos
+gh search repos "query" --sort stars --limit 10
+
+# View a repo
+gh repo view owner/repo
+
+# Search code
+gh search code "query" --language python
+
+# List issues
+gh issue list -R owner/repo --state open
+
+# View a specific issue/PR
+gh issue view 123 -R owner/repo
+```
+
+### Web — Any URL (Jina Reader)
+
+```bash
+# Read any webpage as markdown
+curl -s "https://r.jina.ai/URL" -H "Accept: text/markdown"
+
+# Search the web
+curl -s "https://s.jina.ai/query" -H "Accept: text/markdown"
+```
+
+### Exa Search (mcporter + exa MCP)
+
+```bash
+# Web search
+mcporter call 'exa.web_search_exa(query: "query", numResults: 5)'
+
+# Code search (GitHub, StackOverflow, docs)
+mcporter call 'exa.get_code_context_exa(query: "how to parse JSON in Python", tokensNum: 3000)'
+
+# Company research
+mcporter call 'exa.company_research_exa(companyName: "OpenAI")'
+```
+
+### LinkedIn (mcporter + linkedin-scraper-mcp)
+
+```bash
+# View a profile
+mcporter call 'linkedin.get_person_profile(linkedin_url: "https://linkedin.com/in/username")'
+
+# Search people
+mcporter call 'linkedin.search_people(keyword: "AI engineer", limit: 10)'
+
+# View company
+mcporter call 'linkedin.get_company_profile(linkedin_url: "https://linkedin.com/company/xxx")'
+```
+
+Fallback: `curl -s "https://r.jina.ai/https://linkedin.com/in/username"`
+
+### Boss直聘 (mcporter + mcp-bosszp)
+
+```bash
+# Browse recommended jobs
+mcporter call 'bosszhipin.get_recommend_jobs_tool(page: 1)'
+
+# Search jobs
+mcporter call 'bosszhipin.search_jobs_tool(keyword: "Python", city: "北京", page: 1)'
+
+# View job details
+mcporter call 'bosszhipin.get_job_detail_tool(job_url: "https://www.zhipin.com/job_detail/xxx")'
+```
+
+Fallback: `curl -s "https://r.jina.ai/https://www.zhipin.com/job_detail/xxx"`
+
+### RSS (feedparser)
+
+```python
+python3 -c "
+import feedparser
+d = feedparser.parse('https://example.com/feed')
+for e in d.entries[:5]:
+ print(f'{e.title} — {e.link}')
+"
+```
## Troubleshooting
### Twitter "fetch failed"
-bird CLI 用 Node.js 原生 `fetch()`,不走 `HTTP_PROXY`/`HTTPS_PROXY` 环境变量。
-Agent Reach 已内置解决方案:通过 `undici` 的 `EnvHttpProxyAgent` 自动注入代理。
+bird CLI uses Node.js native `fetch()`, which doesn't respect `HTTP_PROXY`. Solutions:
+1. Ensure `undici` is installed: `npm install -g undici`
+2. Configure proxy: `agent-reach configure proxy http://user:pass@ip:port`
+3. If still failing, use transparent proxy (Clash TUN, Proxifier)
-**你(Agent)需要确认:**
-1. `undici` 已全局安装:`npm list -g undici`,没有就装:`npm install -g undici`
-2. 用户已配置代理:`agent-reach configure proxy http://user:pass@ip:port`
-3. Agent Reach 会自动处理注入,不需要用户额外操作
+### Channel broken?
-如果以上都没问题仍然 "fetch failed",建议用户使用透明代理(如 Clash TUN 模式、Proxifier)。
-
-详见 https://github.com/Panniantong/Agent-Reach/blob/main/docs/troubleshooting.md
+Run `agent-reach doctor` — it shows what's wrong and how to fix it.