refactor: strip to installer + doctor + docs, remove read/search wrapper layer

BREAKING CHANGE: Remove all `agent-reach read` and `agent-reach search-*` commands.

Agent Reach is now an installer, configuration tool, and doctor —
not a wrapper layer. After installation, agents call upstream tools
directly (bird CLI, yt-dlp, mcporter, gh CLI, Jina Reader, etc.).

What's kept:
- agent-reach install: one-shot installer
- agent-reach doctor: channel status overview
- agent-reach configure: cookies, proxy, credentials
- agent-reach setup: interactive wizard
- SKILL.md: complete guide for agents to use upstream tools directly

What's removed:
- agent-reach read URL (and all channel read() methods)
- agent-reach search-* commands (and all channel search() methods)
- ReadResult / SearchResult data classes
- URL routing system (get_channel_for_url)
- All parsing/conversion logic (VTT, Reddit JSON, bird text parser, etc.)
- MCP server read/search tools (kept only get_status)

Net change: -1790 lines. Less code = fewer bugs.
This commit is contained in:
Panniantong 2026-02-26 08:15:56 +01:00
parent 1cbf6a7b9c
commit a37e9aa190
18 changed files with 400 additions and 2190 deletions

View file

@ -115,14 +115,14 @@ AI Agent 已经能帮你写代码、改文档、管项目——但你让它去
不需要任何配置,告诉 Agent 就行:
- "帮我看看这个链接" → 任意网页
- "这个 GitHub 仓库是做什么的" → GitHub 仓库、Issue、代码
- "这个视频讲了什么" → YouTube / B站字幕提取
- "帮我看看这条推文" → Twitter 推文
- "订阅这个 RSS" → RSS / Atom 源
- "搜一下 GitHub 上有什么 LLM 框架" → GitHub 搜索
- "帮我看看这个链接" → `curl https://r.jina.ai/URL`任意网页
- "这个 GitHub 仓库是做什么的" → `gh repo view owner/repo`
- "这个视频讲了什么" → `yt-dlp --dump-json URL` 提取字幕
- "帮我看看这条推文" → `bird read URL --json`
- "订阅这个 RSS" → `feedparser` 解析
- "搜一下 GitHub 上有什么 LLM 框架" → `gh search repos "LLM framework"`
**不需要记命令。** Agent 自己知道该调什么。
**不需要记命令。** Agent 读了 SKILL.md 之后自己知道该调什么。
---
@ -134,9 +134,11 @@ AI Agent 已经能帮你写代码、改文档、管项目——但你让它去
Agent Reach 做的事情很简单:**帮你把这些选型和配置的活儿做完了。**
安装完成后Agent 直接调用上游工具bird CLI、yt-dlp、mcporter、gh CLI 等),不需要经过 Agent Reach 的包装层。
### 🔌 每个渠道都是可插拔的
每个平台对应一个独立的 Python 文件,实现统一接口。**后端工具随时可以换**——哪天出了更好的工具,改一个文件就行,其他不用动。
每个平台背后是一个独立的上游工具。**不满意?换掉就行。**
```
channels/
@ -229,13 +231,13 @@ Star 一下,下次需要的时候能找到。⭐
<details>
<summary><strong>AI Agent 怎么搜索 Twitter / X不想付 API 费用</strong></summary>
Agent Reach 使用 [bird CLI](https://www.npmjs.com/package/@steipete/bird) 通过 Cookie 认证访问 Twitter完全免费。安装 Agent Reach 后,用 Cookie-Editor 导出你的 Twitter Cookie运行 `agent-reach configure twitter-cookies "your_cookies"` 即可。之后 Agent 就可以用 `agent-reach search-twitter "关键词"` 搜索推文了。
Agent Reach 使用 [bird CLI](https://www.npmjs.com/package/@steipete/bird) 通过 Cookie 认证访问 Twitter完全免费。安装 Agent Reach 后,用 Cookie-Editor 导出你的 Twitter Cookie运行 `agent-reach configure twitter-cookies "your_cookies"` 即可。之后 Agent 就可以用 `bird search "关键词" --json` 搜索推文了。
</details>
<details>
<summary><strong>How to search Twitter/X with AI agent for free (no API)?</strong></summary>
Agent Reach uses the bird CLI with cookie auth — zero API fees. After installing, export your Twitter cookies with the Cookie-Editor extension, run `agent-reach configure twitter-cookies "your_cookies"`, then your agent can search with `agent-reach search-twitter "query"`.
Agent Reach uses the bird CLI with cookie auth — zero API fees. After installing, export your Twitter cookies with the Cookie-Editor extension, run `agent-reach configure twitter-cookies "your_cookies"`, then your agent can search with `bird search "query" --json`.
</details>
<details>
@ -247,19 +249,19 @@ Reddit 封锁数据中心 IP。配置一个住宅代理即可解决`agent-rea
<details>
<summary><strong>How to get YouTube video transcripts for AI?</strong></summary>
`agent-reach read https://youtube.com/watch?v=xxx` automatically extracts the transcript. Uses yt-dlp under the hood, supports multiple languages. No API key needed.
`yt-dlp --dump-json "https://youtube.com/watch?v=xxx"` extracts video metadata; `yt-dlp --write-sub --skip-download "URL"` extracts subtitles. Uses yt-dlp under the hood, supports multiple languages. No API key needed.
</details>
<details>
<summary><strong>怎么让 AI Agent 读小红书?</strong></summary>
小红书需要通过 Docker 运行一个 MCP 服务。安装 Docker 后,运行 `agent-reach install` 会自动配置。之后 Agent 就能用 `agent-reach read <小红书链接>` 或 `agent-reach search-xhs "关键词"` 了。
小红书需要通过 Docker 运行一个 MCP 服务。安装 Docker 后,运行 `agent-reach install` 会自动配置。之后 Agent 就能用 `mcporter call 'xiaohongshu.get_feed_detail(...)'` 读取笔记或 `mcporter call 'xiaohongshu.search_feeds(keyword: "关键词")'` 搜索了。
</details>
<details>
<summary><strong>Compatible with Claude Code / Cursor / OpenClaw / Windsurf?</strong></summary>
Yes! Agent Reach is a standard CLI tool — any AI coding agent that can run shell commands can use it. Works with Claude Code, Cursor, OpenClaw, Windsurf, Codex, and more. Just `pip install agent-reach` and the agent can start using it immediately.
Yes! Agent Reach is an installer + configuration tool — any AI coding agent that can run shell commands can use it. Works with Claude Code, Cursor, OpenClaw, Windsurf, Codex, and more. Just `pip install agent-reach`, run `agent-reach install`, and the agent can start using the upstream tools immediately.
</details>
<details>

View file

@ -1,14 +1,10 @@
# -*- coding: utf-8 -*-
"""
Channel registry routes URLs to the right channel.
This is the core of Agent Reach' pluggable architecture.
Add a new channel: just create a file and register it here.
Swap a backend: just change the implementation inside the channel file.
Channel registry lists all supported platforms for doctor checks.
"""
from typing import Dict, List, Optional
from .base import Channel, ReadResult, SearchResult
from typing import List, Optional
from .base import Channel
# Import all channels
from .web import WebChannel
@ -24,7 +20,7 @@ from .linkedin import LinkedInChannel
from .bosszhipin import BossZhipinChannel
# Channel registry — order matters (first match wins, web is last as fallback)
# Channel registry
ALL_CHANNELS: List[Channel] = [
GitHubChannel(),
TwitterChannel(),
@ -36,22 +32,9 @@ ALL_CHANNELS: List[Channel] = [
BossZhipinChannel(),
RSSChannel(),
ExaSearchChannel(),
WebChannel(), # Fallback — handles any URL
WebChannel(),
]
# Search-capable channels
SEARCH_CHANNELS: Dict[str, Channel] = {
ch.name: ch for ch in ALL_CHANNELS if ch.can_search()
}
def get_channel_for_url(url: str) -> Channel:
"""Find the right channel for a URL."""
for channel in ALL_CHANNELS:
if channel.can_handle(url):
return channel
return WebChannel() # Should never reach here, but just in case
def get_channel(name: str) -> Optional[Channel]:
"""Get a channel by name."""
@ -67,7 +50,7 @@ def get_all_channels() -> List[Channel]:
__all__ = [
"Channel", "ReadResult", "SearchResult",
"ALL_CHANNELS", "SEARCH_CHANNELS",
"get_channel_for_url", "get_channel", "get_all_channels",
"Channel",
"ALL_CHANNELS",
"get_channel", "get_all_channels",
]

View file

@ -1,110 +1,28 @@
# -*- coding: utf-8 -*-
"""
Channel base class the universal interface for all platforms.
Channel base class platform availability checking.
Every channel (YouTube, Twitter, GitHub, etc.) implements this interface.
The backend tool can be swapped anytime without changing anything else.
Each channel represents a platform (YouTube, Twitter, GitHub, etc.)
and provides:
- can_handle(url) does this URL belong to this platform?
- check(config) is the upstream tool installed and configured?
Example:
class YouTubeChannel(Channel):
name = "youtube"
backends = ["yt-dlp"] # current backend, can be swapped
async def read(self, url, config):
# Just call yt-dlp, return standardized dict
...
After installation, agents call upstream tools directly.
"""
import shutil
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
@dataclass
class ReadResult:
"""Standardized read result. Every channel returns this."""
title: str
content: str
url: str
author: str = ""
date: str = ""
platform: str = ""
extra: dict = None
def __post_init__(self):
self.extra = self.extra or {}
def to_dict(self) -> dict:
d = {
"title": self.title,
"content": self.content,
"url": self.url,
"platform": self.platform,
}
if self.author:
d["author"] = self.author
if self.date:
d["date"] = self.date
if self.extra:
d["extra"] = self.extra
return d
@dataclass
class SearchResult:
"""Standardized search result."""
title: str
url: str
snippet: str = ""
author: str = ""
date: str = ""
score: float = 0
extra: dict = None
def __post_init__(self):
self.extra = self.extra or {}
def to_dict(self) -> dict:
d = {
"title": self.title,
"url": self.url,
"snippet": self.snippet,
}
if self.author:
d["author"] = self.author
if self.date:
d["date"] = self.date
if self.extra:
d["extra"] = self.extra
return d
from typing import List, Tuple
class Channel(ABC):
"""
Base class for all channels.
Subclasses just need to implement:
- read(url, config) ReadResult
- can_handle(url) bool
- check(config) (status, message)
Optionally:
- search(query, config, **kwargs) list[SearchResult]
"""
"""Base class for all channels."""
name: str = "" # e.g. "youtube"
description: str = "" # e.g. "YouTube video transcripts"
backends: List[str] = [] # e.g. ["yt-dlp"] — what external tool is used
requires_config: List[str] = [] # e.g. ["reddit_proxy"]
requires_tools: List[str] = [] # e.g. ["yt-dlp"]
description: str = "" # e.g. "YouTube 视频和字幕"
backends: List[str] = [] # e.g. ["yt-dlp"] — what upstream tool is used
tier: int = 0 # 0=zero-config, 1=needs free key, 2=needs setup
@abstractmethod
async def read(self, url: str, config=None) -> ReadResult:
"""Read content from a URL. Must return ReadResult."""
...
@abstractmethod
def can_handle(self, url: str) -> bool:
"""Check if this channel can handle this URL."""
@ -112,29 +30,7 @@ class Channel(ABC):
def check(self, config=None) -> Tuple[str, str]:
"""
Check if this channel is available.
Check if this channel's upstream tool is available.
Returns (status, message) where status is 'ok'/'warn'/'off'/'error'.
"""
# Check required tools
for tool in self.requires_tools:
if not shutil.which(tool):
return "off", f"需要安装pip install {tool}"
# Check required config
for key in self.requires_config:
if config and not config.get(key):
return "off", f"需要配置 {key},运行 agent-reach setup"
return "ok", f"{''.join(self.backends) if self.backends else '内置'}"
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
"""Search this platform. Override if supported."""
raise NotImplementedError(f"{self.name} does not support search")
def can_search(self) -> bool:
"""Whether this channel supports search."""
try:
# Check if search is overridden
return type(self).search is not Channel.search
except:
return False

View file

@ -1,207 +1,26 @@
# -*- coding: utf-8 -*-
"""Bilibili — via yt-dlp (same backend as YouTube).
"""Bilibili — check if yt-dlp is available."""
Backend: yt-dlp (https://github.com/yt-dlp/yt-dlp)
yt-dlp natively supports Bilibili video info, subtitles, and search.
"""
import json
import os
import shutil
import subprocess
from urllib.parse import urlparse
from .base import Channel, ReadResult, SearchResult
from typing import List
from .base import Channel
class BilibiliChannel(Channel):
name = "bilibili"
description = "B站视频信息和字幕"
description = "B站视频和字幕"
backends = ["yt-dlp"]
requires_tools = ["yt-dlp"]
tier = 0
tier = 1
def can_handle(self, url: str) -> bool:
from urllib.parse import urlparse
d = urlparse(url).netloc.lower()
return "bilibili.com" in d or "b23.tv" in d
def check(self, config=None):
if not shutil.which("yt-dlp"):
return "off", "yt-dlp 未安装。安装pip install yt-dlp"
proxy = config.get("bilibili_proxy") if config else None
proxy = (config.get("bilibili_proxy") if config else None) or os.environ.get("BILIBILI_PROXY")
if proxy:
return "ok", "已配置代理,完整可用"
import os
is_server = bool(os.environ.get("SSH_CONNECTION") or os.path.exists("/etc/cloud"))
if is_server:
return "warn", "服务器 IP 可能被封配置代理即可解决agent-reach configure proxy URL"
return "ok", "本地直连可用"
async def read(self, url: str, config=None) -> ReadResult:
if not shutil.which("yt-dlp"):
raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp")
proxy = config.get("bilibili_proxy") if config else None
# Get video info via yt-dlp
info = self._get_info(url, proxy)
if not info:
return ReadResult(
title="Bilibili",
content=f"⚠️ 无法获取视频信息: {url}\n服务器 IP 可能被封配个代理agent-reach configure proxy URL",
url=url, platform="bilibili",
)
title = info.get("title", url)
author = info.get("uploader", "")
desc = info.get("description", "")
# Try subtitles
subtitle = self._get_subtitles(url, proxy)
content = desc
if subtitle:
content += f"\n\n## 字幕\n{subtitle}"
return ReadResult(
title=title, content=content, url=url,
author=author, platform="bilibili",
extra={
"view_count": info.get("view_count"),
"like_count": info.get("like_count"),
"duration": info.get("duration_string"),
},
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
"""Search Bilibili.
Strategy:
1. Try yt-dlp bilisearch (works on local machines)
2. Fallback to Exa site:bilibili.com (works on servers)
"""
if not shutil.which("yt-dlp"):
raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp")
limit = kwargs.get("limit", 5)
proxy = config.get("bilibili_proxy") if config else None
# Strategy 1: yt-dlp bilisearch
results = self._search_ytdlp(query, limit, proxy)
if results:
return results
# Strategy 2: Exa fallback (server-friendly)
results = self._search_exa(query, limit)
if results:
return results
return []
def _search_ytdlp(self, query: str, limit: int, proxy: str = None) -> List[SearchResult]:
"""Search via yt-dlp bilisearch (needs local/Chinese IP)."""
cmd = [
"yt-dlp", "--dump-json", "--no-download",
f"bilisearch{limit}:{query}",
]
if proxy:
cmd += ["--proxy", proxy]
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if r.returncode != 0:
return []
results = []
for line in r.stdout.strip().split("\n"):
if not line.strip():
continue
try:
d = json.loads(line)
vid = d.get("id", "")
url = d.get("webpage_url", f"https://www.bilibili.com/video/av{vid}")
results.append(SearchResult(
title=d.get("title", f"av{vid}"),
url=url,
snippet=f"👤 {d.get('uploader', '?')} · 👁 {d.get('view_count', '?')}",
extra={
"view_count": d.get("view_count"),
"uploader": d.get("uploader"),
"duration": d.get("duration_string"),
},
))
except json.JSONDecodeError:
continue
return results
except subprocess.TimeoutExpired:
return []
def _search_exa(self, query: str, limit: int) -> List[SearchResult]:
"""Fallback: search via Exa (site:bilibili.com). Works on any IP."""
try:
r = subprocess.run(
["mcporter", "call",
f'exa.web_search_exa(query: "site:bilibili.com {query}", numResults: {limit})'],
capture_output=True, text=True, timeout=30,
)
if r.returncode != 0:
return []
results = []
# Parse mcporter output: Title: / Author: / URL: / Text: blocks
title, author, url = "", "", ""
for line in r.stdout.split("\n"):
if line.startswith("Title: "):
title = line[7:].strip()
elif line.startswith("Author: "):
author = line[8:].strip()
elif line.startswith("URL: "):
url = line[5:].strip()
if url and "bilibili.com" in url:
results.append(SearchResult(
title=title or url,
url=url,
snippet=f"👤 {author}" if author else "(via Exa search)",
))
title, author, url = "", "", ""
return results
except Exception:
return []
def _get_info(self, url: str, proxy: str = None) -> dict:
cmd = ["yt-dlp", "--dump-json", "--no-download", url]
if proxy:
cmd += ["--proxy", proxy]
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if r.returncode == 0:
return json.loads(r.stdout)
except (subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return {}
def _get_subtitles(self, url: str, proxy: str = None) -> str:
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as tmpdir:
cmd = [
"yt-dlp", "--write-sub", "--write-auto-sub",
"--sub-lang", "zh-Hans,zh,en",
"--skip-download", "--sub-format", "vtt",
"-o", f"{tmpdir}/%(id)s.%(ext)s", url,
]
if proxy:
cmd += ["--proxy", proxy]
try:
subprocess.run(cmd, capture_output=True, text=True, timeout=30)
for f in Path(tmpdir).glob("*.vtt"):
text = f.read_text(errors="replace")
lines = []
for line in text.split("\n"):
line = line.strip()
if not line or line.startswith("WEBVTT") or "-->" in line or line.isdigit():
continue
if line not in lines[-1:]:
lines.append(line)
return "\n".join(lines)
except subprocess.TimeoutExpired:
pass
return ""
return "ok", "可提取视频信息和字幕(代理已配置)"
return "ok", "可提取视频信息和字幕(本地环境)。服务器可能需要代理"

View file

@ -1,62 +1,9 @@
# -*- coding: utf-8 -*-
"""Boss直聘 (BOSS Zhipin) — via mcp-bosszp (MCP) or Jina Reader fallback.
"""Boss直聘 — check if mcp-bosszp is available."""
Backend: mcp-bosszp (161 stars, FastMCP + Playwright)
Swap to: any Boss直聘 access tool
"""
import json
import shutil
import subprocess
from urllib.parse import urlparse
from .base import Channel, ReadResult, SearchResult
from typing import List
import requests
def _mcporter_has_bosszhipin() -> bool:
"""Check if mcporter has Boss直聘 MCP configured."""
if not shutil.which("mcporter"):
return False
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
# Check for various possible config names
out = r.stdout.lower()
return "boss" in out or "zhipin" in out or "bosszhipin" in out
except Exception:
return False
def _mcporter_call(expr: str, timeout: int = 30) -> str:
"""Call a Boss直聘 MCP tool via mcporter."""
r = subprocess.run(
["mcporter", "call", expr],
capture_output=True, text=True, timeout=timeout,
)
if r.returncode != 0:
raise RuntimeError(r.stderr or r.stdout)
return r.stdout
def _get_mcp_name() -> str:
"""Get the actual MCP server name configured in mcporter."""
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
for line in r.stdout.split("\n"):
line_lower = line.strip().lower()
for name in ["bosszhipin", "boss-zp", "bosszp", "boss"]:
if name in line_lower:
# Extract the actual server name
parts = line.strip().split()
if parts:
return parts[0]
return "bosszhipin"
except Exception:
return "bosszhipin"
from .base import Channel
class BossZhipinChannel(Channel):
@ -66,118 +13,29 @@ class BossZhipinChannel(Channel):
tier = 2
def can_handle(self, url: str) -> bool:
from urllib.parse import urlparse
domain = urlparse(url).netloc.lower()
return "zhipin.com" in domain or "boss.com" in domain
def check(self, config=None):
if _mcporter_has_bosszhipin():
return "ok", "可搜索职位、向 HR 打招呼"
if not shutil.which("mcporter"):
return "off", (
"可通过 Jina Reader 读取职位页面。完整功能需要:\n"
" 1. git clone https://github.com/mucsbr/mcp-bosszp.git\n"
" 2. cd mcp-bosszp && pip install -r requirements.txt && playwright install chromium\n"
" 3. python boss_zhipin_fastmcp_v2.py启动后扫码登录\n"
" 4. mcporter config add bosszhipin http://localhost:8000/mcp"
)
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
out = r.stdout.lower()
if "boss" in out or "zhipin" in out:
return "ok", "可搜索职位、向 HR 打招呼"
except Exception:
pass
return "off", (
"可通过 Jina Reader 读取职位页面。完整功能需要:\n"
" 1. git clone https://github.com/mucsbr/mcp-bosszp.git\n"
" 2. cd mcp-bosszp && pip install -r requirements.txt && playwright install chromium\n"
" 3. python boss_zhipin_fastmcp_v2.py启动后扫码登录\n"
" 4. mcporter config add bosszhipin http://localhost:8000/mcp\n"
" 或用 Dockerdocker-compose up -d\n"
"mcporter 已装但 Boss直聘 MCP 未配置。\n"
" 详见 https://github.com/mucsbr/mcp-bosszp"
)
async def read(self, url: str, config=None) -> ReadResult:
# Boss直聘 pages mostly work with Jina Reader
return await self._read_jina(url)
async def _read_jina(self, url: str) -> ReadResult:
"""Read Boss直聘 page via Jina Reader."""
try:
resp = requests.get(
f"https://r.jina.ai/{url}",
headers={"Accept": "text/markdown"},
timeout=15,
)
resp.raise_for_status()
text = resp.text
if len(text.strip()) < 50:
return ReadResult(
title="Boss直聘",
content=(
f"⚠️ 无法读取此页面内容: {url}\n\n"
"提示:\n"
"- 安装 mcp-bosszp 可解锁职位搜索和自动打招呼\n"
"- 详见 https://github.com/mucsbr/mcp-bosszp"
),
url=url,
platform="bosszhipin",
)
return ReadResult(
title=text[:100] if text else url,
content=text,
url=url,
platform="bosszhipin",
)
except Exception:
return ReadResult(
title="Boss直聘",
content=(
f"⚠️ 无法读取此 Boss直聘页面: {url}\n\n"
"提示:\n"
"- Boss直聘部分页面需要登录\n"
"- 安装 mcp-bosszp 可解锁完整功能\n"
"- 详见 https://github.com/mucsbr/mcp-bosszp"
),
url=url,
platform="bosszhipin",
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
limit = kwargs.get("limit", 10)
# Try MCP search first
if _mcporter_has_bosszhipin():
try:
return await self._search_mcp(query, limit, config)
except Exception:
pass
# Fallback to Exa
from agent_reach.channels.exa_search import ExaSearchChannel
exa = ExaSearchChannel()
return await exa.search(f"site:zhipin.com {query}", config=config, limit=limit)
async def _search_mcp(self, query: str, limit: int, config=None) -> List[SearchResult]:
"""Search Boss直聘 via MCP."""
server = _get_mcp_name()
try:
out = _mcporter_call(
f'{server}.get_recommend_jobs_tool(page: 1)',
timeout=30,
)
return self._parse_jobs(out, limit)
except Exception:
return []
def _parse_jobs(self, text: str, limit: int) -> List[SearchResult]:
"""Parse MCP job search output into SearchResults."""
results = []
try:
data = json.loads(text)
jobs = data if isinstance(data, list) else data.get("jobs", data.get("results", []))
for job in jobs[:limit]:
if isinstance(job, dict):
title = job.get("title") or job.get("jobName", "")
company = job.get("company") or job.get("brandName", "")
salary = job.get("salary") or job.get("salaryDesc", "")
url = job.get("url", "")
snippet = f"🏢 {company}" if company else ""
if salary:
snippet += f" · 💰 {salary}"
results.append(SearchResult(
title=title,
url=url,
snippet=snippet,
))
except (json.JSONDecodeError, KeyError):
pass
return results

View file

@ -1,110 +1,36 @@
# -*- coding: utf-8 -*-
"""Exa semantic search — via mcporter + Exa MCP server.
"""Exa Search — check if mcporter + Exa MCP is available."""
Backend: Exa MCP at mcp.exa.ai (OAuth, no API key needed)
Requires: mcporter CLI
"""
import json
import shutil
import subprocess
from .base import Channel, SearchResult
from typing import List
from .base import Channel
class ExaSearchChannel(Channel):
name = "exa_search"
description = "全网语义搜索(同时支持 Reddit/Twitter 搜索)"
backends = ["exa-mcp"]
tier = 1
def _mcporter_ok(self) -> bool:
if not shutil.which("mcporter"):
return False
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
return "exa" in r.stdout
except Exception:
return False
def _call(self, expr: str, timeout: int = 30) -> str:
r = subprocess.run(
["mcporter", "call", expr],
capture_output=True, text=True, timeout=timeout,
)
if r.returncode != 0:
raise RuntimeError(r.stderr or r.stdout)
return r.stdout
# ── Channel interface ──
description = "全网语义搜索"
backends = ["Exa via mcporter"]
tier = 0
def can_handle(self, url: str) -> bool:
return False # search-only
async def read(self, url: str, config=None):
raise NotImplementedError("Exa is a search engine, not a reader")
return False # Search-only channel
def check(self, config=None):
if not shutil.which("mcporter"):
return "off", (
"需要 mcporter。安装npm install -g mcporter && "
"mcporter config add exa https://mcp.exa.ai/mcp"
)
if not self._mcporter_ok():
return "off", "mcporter 已装但 Exa 未配置。运行mcporter config add exa https://mcp.exa.ai/mcp"
return "ok", "MCP 已连接,免 Key 直接可用(全网搜索 + Reddit + Twitter"
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
if not self._mcporter_ok():
raise ValueError(
"Exa 搜索需要 mcporter。安装\n"
"需要 mcporter + Exa MCP。安装\n"
" npm install -g mcporter\n"
" mcporter config add exa https://mcp.exa.ai/mcp"
)
limit = kwargs.get("limit", 5)
safe_q = query.replace('"', '\\"')
out = self._call(
f'exa.web_search_exa(query: "{safe_q}", numResults: {min(limit, 10)})',
timeout=30,
)
return self._parse_output(out, limit)
# ── Parse mcporter text output ──
def _parse_output(self, text: str, limit: int) -> List[SearchResult]:
"""Parse mcporter's Title/URL/Text block format."""
results = []
cur = {}
for line in text.split("\n"):
line = line.strip()
if line.startswith("Title: "):
if cur.get("title"):
results.append(self._make_result(cur))
cur = {"title": line[7:]}
elif line.startswith("URL: "):
cur["url"] = line[5:]
elif line.startswith("Published Date: "):
cur["date"] = line[16:]
elif line.startswith("Text: "):
cur["text"] = line[6:]
elif "text" in cur and line:
cur["text"] += " " + line
if cur.get("title"):
results.append(self._make_result(cur))
return results[:limit]
@staticmethod
def _make_result(d: dict) -> SearchResult:
return SearchResult(
title=d.get("title", ""),
url=d.get("url", ""),
snippet=d.get("text", "")[:500],
date=d.get("date", ""),
score=0,
)
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
if "exa" in r.stdout.lower():
return "ok", "全网语义搜索可用(免费,无需 API Key"
return "off", (
"mcporter 已装但 Exa 未配置。运行:\n"
" mcporter config add exa https://mcp.exa.ai/mcp"
)
except Exception:
return "off", "mcporter 连接异常"

View file

@ -1,16 +1,9 @@
# -*- coding: utf-8 -*-
"""GitHub — via gh CLI.
"""GitHub — check if gh CLI is available."""
Backend: gh CLI (https://cli.github.com)
Swap to: GitHub REST API
"""
import json
import shutil
import subprocess
from urllib.parse import urlparse
from .base import Channel, ReadResult, SearchResult
from typing import List
from .base import Channel
class GitHubChannel(Channel):
@ -19,121 +12,18 @@ class GitHubChannel(Channel):
backends = ["gh CLI"]
tier = 0
def _gh(self, args: list, timeout: int = 15) -> str:
r = subprocess.run(
["gh"] + args,
capture_output=True, text=True, timeout=timeout,
)
if r.returncode != 0:
raise RuntimeError(r.stderr or r.stdout)
return r.stdout
def _gh_json(self, args: list, timeout: int = 15) -> dict:
return json.loads(self._gh(args + ["--json"], timeout))
def can_handle(self, url: str) -> bool:
from urllib.parse import urlparse
return "github.com" in urlparse(url).netloc.lower()
def check(self, config=None):
if not shutil.which("gh"):
return "warn", "gh CLI 未安装。安装https://cli.github.com 。公开仓库仍可通过 Jina Reader 读取"
return "warn", "gh CLI 未安装。安装https://cli.github.com"
try:
self._gh(["auth", "status"], timeout=5)
subprocess.run(
["gh", "auth", "status"],
capture_output=True, text=True, timeout=5
)
return "ok", "完整可用读取、搜索、Fork、Issue、PR 等)"
except Exception:
return "ok", "gh CLI 已装但未认证。运行 gh auth login 可解锁完整功能"
async def read(self, url: str, config=None) -> ReadResult:
if not shutil.which("gh"):
# Fallback to Jina Reader for public repos
from agent_reach.channels.web import WebChannel
return await WebChannel().read(url, config)
path = urlparse(url).path.strip("/").split("/")
if len(path) < 2:
from agent_reach.channels.web import WebChannel
return await WebChannel().read(url, config)
owner, repo = path[0], path[1]
# Issues / PRs
if len(path) >= 4 and path[2] in ("issues", "pull"):
return await self._read_issue(owner, repo, path[3], url)
# Repo
return await self._read_repo(owner, repo, url)
async def _read_repo(self, owner: str, repo: str, url: str) -> ReadResult:
slug = f"{owner}/{repo}"
try:
# Get repo info
info = self._gh(["repo", "view", slug])
# Get README
try:
readme = self._gh(
["api", f"repos/{slug}/readme", "--jq", ".content"],
timeout=10,
)
import base64
readme_text = base64.b64decode(readme).decode("utf-8", errors="replace")
except Exception:
readme_text = ""
content = readme_text or info
return ReadResult(
title=slug, content=content, url=url,
author=owner, platform="github",
)
except Exception:
from agent_reach.channels.web import WebChannel
return await WebChannel().read(url)
async def _read_issue(self, owner: str, repo: str, num: str, url: str) -> ReadResult:
slug = f"{owner}/{repo}"
try:
out = self._gh(["issue", "view", num, "-R", slug])
return ReadResult(
title=f"{slug}#{num}", content=out, url=url,
platform="github",
)
except Exception:
# Might be a PR
try:
out = self._gh(["pr", "view", num, "-R", slug])
return ReadResult(
title=f"{slug}#{num}", content=out, url=url,
platform="github",
)
except Exception:
from agent_reach.channels.web import WebChannel
return await WebChannel().read(url)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
if not shutil.which("gh"):
raise ValueError("GitHub search requires gh CLI. Install: https://cli.github.com")
language = kwargs.get("language")
limit = kwargs.get("limit", 5)
args = ["search", "repos", query, "--sort", "stars", f"--limit={limit}"]
if language:
args += [f"--language={language}"]
out = self._gh(args, timeout=15)
results = []
for line in out.strip().split("\n"):
if not line.strip():
continue
parts = line.split("\t")
if len(parts) >= 1:
slug = parts[0].strip()
desc = parts[1].strip() if len(parts) > 1 else ""
stars = parts[3].strip() if len(parts) > 3 else ""
lang = parts[5].strip() if len(parts) > 5 else ""
results.append(SearchResult(
title=slug,
url=f"https://github.com/{slug}",
snippet=desc,
extra={"stars": stars, "language": lang},
))
return results

View file

@ -1,268 +1,39 @@
# -*- coding: utf-8 -*-
"""LinkedIn — via linkedin-scraper-mcp (MCP) or Jina Reader fallback.
Backend: linkedin-scraper-mcp (916 stars, Patchright browser automation)
Swap to: any LinkedIn access tool
"""
"""LinkedIn — check if linkedin-scraper-mcp is available."""
import shutil
import subprocess
from urllib.parse import urlparse
from .base import Channel, ReadResult, SearchResult
from typing import List
import requests
def _mcporter_has_linkedin() -> bool:
"""Check if mcporter has linkedin MCP configured."""
if not shutil.which("mcporter"):
return False
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
return "linkedin" in r.stdout.lower()
except Exception:
return False
def _mcporter_call(expr: str, timeout: int = 30) -> str:
"""Call a LinkedIn MCP tool via mcporter."""
r = subprocess.run(
["mcporter", "call", expr],
capture_output=True, text=True, timeout=timeout,
)
if r.returncode != 0:
raise RuntimeError(r.stderr or r.stdout)
return r.stdout
from .base import Channel
class LinkedInChannel(Channel):
name = "linkedin"
description = "LinkedIn 个人/公司 Profile 和职位"
description = "LinkedIn 职业社交"
backends = ["linkedin-scraper-mcp", "Jina Reader"]
tier = 2
def can_handle(self, url: str) -> bool:
domain = urlparse(url).netloc.lower()
return "linkedin.com" in domain
from urllib.parse import urlparse
return "linkedin.com" in urlparse(url).netloc.lower()
def check(self, config=None):
if _mcporter_has_linkedin():
return "ok", "完整可用Profile、公司、职位搜索"
# Check if linkedin-scraper-mcp is installed as CLI
if shutil.which("linkedin-scraper-mcp"):
return "warn", (
"linkedin-scraper-mcp 已安装但未接入 mcporter。运行\n"
" 1. linkedin-scraper-mcp --login在有浏览器的机器上登录\n"
" 2. linkedin-scraper-mcp --transport streamable-http --port 8001\n"
" 3. mcporter config add linkedin http://localhost:8001/mcp"
if not shutil.which("mcporter"):
return "off", (
"基本内容可通过 Jina Reader 读取。完整功能需要:\n"
" pip install linkedin-scraper-mcp\n"
" mcporter config add linkedin http://localhost:3000/mcp\n"
" 详见 https://github.com/stickerdaniel/linkedin-mcp-server"
)
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
if "linkedin" in r.stdout.lower():
return "ok", "完整可用Profile、公司、职位搜索"
except Exception:
pass
return "off", (
"可通过 Jina Reader 读取部分内容。完整功能需要:\n"
" 1. pip install linkedin-scraper-mcp\n"
" 2. linkedin-scraper-mcp --login在有浏览器的机器上登录\n"
" 3. linkedin-scraper-mcp --transport streamable-http --port 8001\n"
" 4. mcporter config add linkedin http://localhost:8001/mcp\n"
" 详见 https://github.com/stickerdaniel/linkedin-mcp-server"
"mcporter 已装但 LinkedIn MCP 未配置。运行:\n"
" pip install linkedin-scraper-mcp\n"
" mcporter config add linkedin http://localhost:3000/mcp"
)
async def read(self, url: str, config=None) -> ReadResult:
path = urlparse(url).path.strip("/")
# Try MCP first
if _mcporter_has_linkedin():
try:
if "/in/" in url:
return await self._read_profile_mcp(url)
elif "/company/" in url:
return await self._read_company_mcp(url)
elif "/jobs/view/" in url:
return await self._read_job_mcp(url)
except Exception:
pass # Fall through to Jina
# Fallback: Jina Reader
return await self._read_jina(url)
async def _read_profile_mcp(self, url: str) -> ReadResult:
"""Read a LinkedIn profile via MCP."""
import re
# Extract username from URL: /in/username/
match = re.search(r"/in/([^/]+)", url)
if not match:
return await self._read_jina(url)
username = match.group(1)
safe_username = username.replace('"', '\\"')
out = _mcporter_call(
f'linkedin.get_person_profile(linkedin_username: "{safe_username}")',
timeout=60,
)
return ReadResult(
title=self._extract_title(out) or f"LinkedIn Profile - {username}",
content=out.strip(),
url=url,
platform="linkedin",
)
async def _read_company_mcp(self, url: str) -> ReadResult:
"""Read a LinkedIn company page via MCP."""
import re
# Extract company name from URL: /company/name/
match = re.search(r"/company/([^/]+)", url)
if not match:
return await self._read_jina(url)
company = match.group(1)
safe_company = company.replace('"', '\\"')
out = _mcporter_call(
f'linkedin.get_company_profile(company_name: "{safe_company}")',
timeout=60,
)
return ReadResult(
title=self._extract_title(out) or "LinkedIn Company",
content=out.strip(),
url=url,
platform="linkedin",
)
async def _read_job_mcp(self, url: str) -> ReadResult:
"""Read a LinkedIn job posting via MCP."""
import re
match = re.search(r"/jobs/view/(\d+)", url)
if not match:
return await self._read_jina(url)
job_id = match.group(1)
out = _mcporter_call(
f'linkedin.get_job_details(job_id: "{job_id}")',
timeout=30,
)
return ReadResult(
title=self._extract_title(out) or f"LinkedIn Job {job_id}",
content=out.strip(),
url=url,
platform="linkedin",
)
async def _read_jina(self, url: str) -> ReadResult:
"""Fallback: use Jina Reader."""
try:
resp = requests.get(
f"https://r.jina.ai/{url}",
headers={"Accept": "text/markdown"},
timeout=15,
)
resp.raise_for_status()
text = resp.text
# Check if content is usable
if len(text.strip()) < 100 or "Sign in" in text[:200]:
return ReadResult(
title="LinkedIn",
content=(
f"⚠️ LinkedIn 页面需要登录才能完整查看。\n\n"
f"URL: {url}\n\n"
"完整功能需安装 linkedin-scraper-mcp\n"
" pip install linkedin-scraper-mcp\n"
" uvx linkedin-scraper-mcp --login\n"
" 详见 https://github.com/stickerdaniel/linkedin-mcp-server"
),
url=url,
platform="linkedin",
)
return ReadResult(
title=text[:100] if text else url,
content=text,
url=url,
platform="linkedin",
)
except Exception:
return ReadResult(
title="LinkedIn",
content=(
f"⚠️ 无法读取此 LinkedIn 页面: {url}\n\n"
"提示:\n"
"- LinkedIn 需要登录才能查看大部分内容\n"
"- 安装 linkedin-scraper-mcp 解锁完整功能\n"
"- 详见 https://github.com/stickerdaniel/linkedin-mcp-server"
),
url=url,
platform="linkedin",
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
limit = kwargs.get("limit", 10)
# Try MCP search first
if _mcporter_has_linkedin():
try:
return await self._search_mcp(query, limit)
except Exception:
pass
# Fallback to Exa
from agent_reach.channels.exa_search import ExaSearchChannel
exa = ExaSearchChannel()
return await exa.search(f"site:linkedin.com {query}", config=config, limit=limit)
async def _search_mcp(self, query: str, limit: int) -> List[SearchResult]:
"""Search LinkedIn via MCP."""
safe_q = query.replace('"', '\\"')
# Try job search first (most common use case)
try:
out = _mcporter_call(
f'linkedin.search_jobs(keywords: "{safe_q}")',
timeout=60,
)
results = self._parse_search_results(out, "job")
if results:
return results[:limit]
except Exception:
pass
# Try people search
try:
out = _mcporter_call(
f'linkedin.search_people(keywords: "{safe_q}")',
timeout=60,
)
results = self._parse_search_results(out, "people")
if results:
return results
except Exception:
pass
return []
def _parse_search_results(self, text: str, result_type: str) -> List[SearchResult]:
"""Parse MCP search output into SearchResults."""
import json
results = []
try:
data = json.loads(text)
items = data if isinstance(data, list) else data.get("results", data.get("jobs", []))
for item in items:
if isinstance(item, dict):
title = item.get("title") or item.get("name") or item.get("headline", "")
url = item.get("url") or item.get("link", "")
snippet = item.get("description") or item.get("company", "")
results.append(SearchResult(
title=title,
url=url,
snippet=snippet[:200] if snippet else "",
))
except (json.JSONDecodeError, KeyError):
# Try line-by-line parsing
pass
return results
def _extract_title(self, text: str) -> str:
"""Extract a title from MCP output."""
for line in text.split("\n"):
line = line.strip()
if line and not line.startswith(("{", "[", "#", "http")):
return line[:80]
return ""

View file

@ -1,178 +1,26 @@
# -*- coding: utf-8 -*-
"""Reddit — via Reddit JSON API + optional proxy.
Backend: Reddit public JSON API (append .json to any URL)
Swap to: any Reddit access method
"""
"""Reddit — check if proxy and credentials are configured."""
import os
import requests
from urllib.parse import urlparse
from .base import Channel, ReadResult
from .base import Channel
class RedditChannel(Channel):
name = "reddit"
description = "Reddit 帖子和评论"
backends = ["Reddit JSON API"]
tier = 2
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
backends = ["JSON API", "Exa"]
tier = 1
def can_handle(self, url: str) -> bool:
domain = urlparse(url).netloc.lower()
return "reddit.com" in domain or "redd.it" in domain
from urllib.parse import urlparse
d = urlparse(url).netloc.lower()
return "reddit.com" in d or "redd.it" in d
def check(self, config=None):
proxy = config.get("reddit_proxy") if config else None
has_bot = bool(os.environ.get("REDDIT_CLIENT_ID"))
if proxy and has_bot:
return "ok", "完整可用(代理 + OAuth Bot"
elif proxy:
return "ok", "代理已配置,可读取帖子。配置 REDDIT_CLIENT_ID/SECRET 可解锁高级搜索和发帖"
elif has_bot:
return "warn", "OAuth Bot 已配置但服务器直连可能被封。配个代理更稳定agent-reach configure proxy URL"
else:
return "off", "搜索用 Exa 免费可用。读帖子需配个代理agent-reach configure proxy URL"
async def read(self, url: str, config=None) -> ReadResult:
proxy = config.get("reddit_proxy") if config else None
proxies = {"http": proxy, "https": proxy} if proxy else None
# Clean URL: remove query params, trailing slash, then add .json
parsed = urlparse(url)
clean_path = parsed.path.rstrip("/")
# Remove trailing .json if already present (avoid double .json)
if clean_path.endswith(".json"):
clean_path = clean_path[:-5]
json_url = f"https://www.reddit.com{clean_path}.json"
try:
resp = requests.get(
json_url,
headers={"User-Agent": self.USER_AGENT},
proxies=proxies,
params={"limit": 50},
timeout=15,
)
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
status = e.response.status_code if e.response is not None else 0
if status in (403, 429):
return ReadResult(
title="Reddit",
content="⚠️ Reddit blocked this request (403 Forbidden). "
"Reddit blocks most server IPs.\n"
"Fix: agent-reach configure proxy http://user:pass@ip:port\n"
"Cheap option: https://www.webshare.io ($1/month)\n\n"
"Alternatively, search Reddit via Exa (free, no proxy needed): "
"agent-reach search-reddit \"your query\"",
url=url,
platform="reddit",
)
raise
data = resp.json()
# Subreddit listing page: /r/sub/, /r/sub/hot, /r/sub/new, /r/sub/top
if isinstance(data, dict) and data.get("kind") == "Listing":
return self._parse_listing(data, url)
if isinstance(data, list) and len(data) >= 1:
# Post page: [post_listing, comments_listing]
post = data[0]["data"]["children"][0]["data"]
title = post.get("title", "")
author = post.get("author", "")
selftext = post.get("selftext", "")
score = post.get("score", 0)
subreddit = post.get("subreddit", "")
# Extract comments
comments_text = ""
if len(data) >= 2:
comments_text = self._extract_comments(data[1])
content = selftext
if comments_text:
content += f"\n\n---\n## Comments\n{comments_text}"
return ReadResult(
title=title,
content=content,
url=url,
author=f"u/{author}",
platform="reddit",
extra={"subreddit": subreddit, "score": score},
)
raise ValueError(f"Could not parse Reddit response for: {url}")
def _parse_listing(self, data: dict, url: str) -> ReadResult:
"""Parse a subreddit listing (hot/new/top/rising)."""
children = data.get("data", {}).get("children", [])
# Extract subreddit name and sort from URL
parsed = urlparse(url)
path_parts = [p for p in parsed.path.strip("/").split("/") if p]
subreddit = path_parts[1] if len(path_parts) >= 2 else "reddit"
sort_type = path_parts[2] if len(path_parts) >= 3 else "hot"
lines = []
for i, child in enumerate(children, 1):
if child.get("kind") != "t3":
continue
post = child.get("data", {})
title = post.get("title", "")
author = post.get("author", "")
score = post.get("score", 0)
num_comments = post.get("num_comments", 0)
permalink = post.get("permalink", "")
post_url = post.get("url", "")
is_self = post.get("is_self", False)
lines.append(f"### {i}. {title}")
lines.append(f"👤 u/{author} · ⬆ {score} · 💬 {num_comments}")
if not is_self and post_url:
lines.append(f"🔗 {post_url}")
lines.append(f"📎 https://www.reddit.com{permalink}")
# Add selftext preview (first 200 chars)
selftext = post.get("selftext", "")
if selftext:
preview = selftext[:200].replace("\n", " ")
if len(selftext) > 200:
preview += "..."
lines.append(f"> {preview}")
lines.append("")
content = "\n".join(lines) if lines else "No posts found."
return ReadResult(
title=f"r/{subreddit}{sort_type}",
content=content,
url=url,
platform="reddit",
extra={"subreddit": subreddit, "sort": sort_type, "count": len(children)},
proxy = (config.get("reddit_proxy") if config else None) or os.environ.get("REDDIT_PROXY")
if proxy:
return "ok", "代理已配置,可读取帖子。搜索走 Exa"
return "warn", (
"无代理。服务器 IP 可能被 Reddit 封锁。配置代理:\n"
" agent-reach configure proxy http://user:pass@ip:port"
)
def _extract_comments(self, comments_data: dict, depth: int = 0, max_depth: int = 3) -> str:
"""Recursively extract comments."""
lines = []
children = comments_data.get("data", {}).get("children", [])
for child in children:
if child.get("kind") != "t1":
continue
data = child.get("data", {})
author = data.get("author", "[deleted]")
body = data.get("body", "")
score = data.get("score", 0)
indent = " " * depth
lines.append(f"{indent}**u/{author}** ({score} points):")
lines.append(f"{indent}{body}")
lines.append("")
# Recurse into replies
if depth < max_depth and data.get("replies") and isinstance(data["replies"], dict):
lines.append(self._extract_comments(data["replies"], depth + 1, max_depth))
return "\n".join(lines)

View file

@ -1,13 +1,7 @@
# -*- coding: utf-8 -*-
"""RSS feeds — via feedparser (free, pip dependency).
"""RSS — check if feedparser is available."""
Backend: feedparser (https://github.com/kurtmckee/feedparser)
Swap to: any RSS parser
"""
import feedparser
from urllib.parse import urlparse
from .base import Channel, ReadResult
from .base import Channel
class RSSChannel(Channel):
@ -17,41 +11,11 @@ class RSSChannel(Channel):
tier = 0
def can_handle(self, url: str) -> bool:
lower = url.lower()
domain = urlparse(url).netloc.lower()
return (lower.endswith(".xml") or "/rss" in lower or "/feed" in lower
or "/atom" in lower or "rss" in domain)
return any(x in url.lower() for x in ["/feed", "/rss", ".xml", "atom"])
async def read(self, url: str, config=None) -> ReadResult:
feed = feedparser.parse(url)
if feed.bozo and not feed.entries:
raise ValueError(f"Failed to parse RSS feed: {url}")
if not feed.entries:
raise ValueError(f"No entries in RSS feed: {url}")
# Return latest entry
entry = feed.entries[0]
content = entry.get("summary", "") or entry.get("description", "")
# If multiple entries, summarize all
if len(feed.entries) > 1:
lines = [f"# {feed.feed.get('title', 'RSS Feed')}\n"]
for i, e in enumerate(feed.entries[:20], 1):
title = e.get("title", "Untitled")
link = e.get("link", "")
summary = e.get("summary", "")[:200]
lines.append(f"## {i}. {title}")
lines.append(f"🔗 {link}")
if summary:
lines.append(summary)
lines.append("")
content = "\n".join(lines)
return ReadResult(
title=feed.feed.get("title", entry.get("title", url)),
content=content,
url=url,
platform="rss",
)
def check(self, config=None):
try:
import feedparser
return "ok", "可读取 RSS/Atom 源"
except ImportError:
return "off", "feedparser 未安装。安装pip install feedparser"

View file

@ -1,286 +1,38 @@
# -*- coding: utf-8 -*-
"""Twitter/X — via bird CLI (free) or Jina Reader fallback.
Backend: bird (@steipete/bird npm package) for search/timeline
Jina Reader for single tweets
Swap to: any Twitter access tool
"""
"""Twitter/X — check if bird CLI is available."""
import shutil
import subprocess
from urllib.parse import urlparse
from .base import Channel, ReadResult, SearchResult
from typing import List
import requests
def _bird_cmd():
"""Find bird CLI binary."""
return shutil.which("bird") or shutil.which("birdx")
def _bird_env(config=None):
"""Build env dict with Twitter cookies and proxy support for bird CLI.
Node.js native fetch() doesn't respect HTTP_PROXY/HTTPS_PROXY.
We inject undici's EnvHttpProxyAgent via NODE_OPTIONS so bird
automatically routes through the user's proxy.
"""
import os
import tempfile
env = os.environ.copy()
if config:
auth_token = config.get("twitter_auth_token")
ct0 = config.get("twitter_ct0")
if auth_token:
env["AUTH_TOKEN"] = auth_token
if ct0:
env["CT0"] = ct0
# Auto-inject undici proxy support if HTTP_PROXY/HTTPS_PROXY is set
has_proxy = env.get("HTTPS_PROXY") or env.get("HTTP_PROXY") or env.get("https_proxy") or env.get("http_proxy")
if has_proxy:
bootstrap = _get_proxy_bootstrap_path()
if bootstrap:
npm_root = subprocess.run(
["npm", "root", "-g"],
capture_output=True, text=True, timeout=5,
).stdout.strip()
existing_opts = env.get("NODE_OPTIONS", "")
env["NODE_OPTIONS"] = f"--require {bootstrap} {existing_opts}".strip()
env["NODE_PATH"] = npm_root
return env
def _get_proxy_bootstrap_path():
"""Create/return a bootstrap JS file that sets up undici proxy for fetch."""
import os
import tempfile
bootstrap_path = os.path.join(tempfile.gettempdir(), "agent-reach-undici-proxy.js")
if not os.path.exists(bootstrap_path):
# Check if undici is available
npm_root = subprocess.run(
["npm", "root", "-g"],
capture_output=True, text=True, timeout=5,
).stdout.strip()
undici_path = os.path.join(npm_root, "undici", "index.js")
if not os.path.exists(undici_path):
return None
with open(bootstrap_path, "w") as f:
f.write(
"try {\n"
" const { EnvHttpProxyAgent, setGlobalDispatcher } = require('undici');\n"
" if (process.env.HTTPS_PROXY || process.env.HTTP_PROXY) {\n"
" setGlobalDispatcher(new EnvHttpProxyAgent());\n"
" }\n"
"} catch(e) {}\n"
)
return bootstrap_path
from .base import Channel
class TwitterChannel(Channel):
name = "twitter"
description = "Twitter/X 推文"
backends = ["bird", "Jina Reader"]
tier = 0 # Single tweet reading is zero-config
backends = ["bird CLI"]
tier = 1
def can_handle(self, url: str) -> bool:
domain = urlparse(url).netloc.lower()
return "x.com" in domain or "twitter.com" in domain
from urllib.parse import urlparse
d = urlparse(url).netloc.lower()
return "x.com" in d or "twitter.com" in d
def check(self, config=None):
# Basic reading always works (Jina fallback)
bird = _bird_cmd()
if bird:
# Actually test bird connectivity
try:
result = subprocess.run(
[bird, "whoami"],
capture_output=True, timeout=15,
encoding='utf-8', errors='replace',
env=_bird_env(config),
)
if result.returncode == 0 and "fetch failed" not in result.stdout.lower() and "fetch failed" not in result.stderr.lower():
return "ok", "搜索、时间线、发推全部可用"
else:
error_hint = (result.stderr or result.stdout).strip()[:100]
if "fetch failed" in (error_hint + result.stdout).lower():
return "warn", (
f"bird 已安装但连接失败fetch failed。可能原因\n"
" 1. Cookie 无效或过期 → 重新导出 Cookie\n"
" 2. 需要代理但 Node.js fetch 不走系统代理 → 使用全局/透明代理(如 Clash TUN 模式、Proxifier\n"
" 3. 网络无法直连 x.com\n"
" 搜索功能暂不可用,将使用 Exa 搜索作为替代"
)
return "warn", f"bird 连接异常:{error_hint}。搜索将使用 Exa 替代"
except (subprocess.TimeoutExpired, FileNotFoundError):
return "warn", "bird 已安装但连接超时。搜索将使用 Exa 替代"
return "ok", "可读取推文。安装 bird + 配置 Cookie 可解锁搜索和发推"
async def read(self, url: str, config=None) -> ReadResult:
# Try bird first
bird = _bird_cmd()
if bird:
return await self._read_bird(url, bird, config)
# Fallback: Jina Reader
return await self._read_jina(url)
async def _read_bird(self, url: str, bird: str, config=None) -> ReadResult:
result = subprocess.run(
[bird, "read", url],
capture_output=True, timeout=30,
encoding='utf-8', errors='replace',
env=_bird_env(config),
)
if result.returncode != 0:
return await self._read_jina(url)
text = result.stdout.strip()
# Extract author from first line
author = ""
lines = text.split("\n")
if lines and lines[0].startswith("@"):
author = lines[0].split()[0]
return ReadResult(
title=text[:100],
content=text,
url=url,
author=author,
platform="twitter",
)
async def _read_jina(self, url: str) -> ReadResult:
try:
resp = requests.get(
f"https://r.jina.ai/{url}",
headers={"Accept": "text/markdown"},
timeout=15,
bird = shutil.which("bird") or shutil.which("birdx")
if not bird:
return "warn", (
"bird CLI 未安装。搜索可通过 Exa 替代。安装:\n"
" npm install -g @steipete/bird"
)
resp.raise_for_status()
text = resp.text
# Detect unusable Jina responses for X/Twitter (JS-required pages)
unusable_indicators = [
"page doesn", # "this page doesn't exist" (handles both ' and ')
"miss what", # "Don't miss what's happening"
"Something went wrong. Try reloading",
"Log in](", # Markdown link: [Log in](...)
]
if any(indicator in text for indicator in unusable_indicators):
return ReadResult(
title="Twitter/X",
content="⚠️ Could not read this tweet.\n"
"The tweet may have been deleted, or the account is private.\n\n"
"Tips:\n"
"- Make sure the URL is correct\n"
"- Try: bird read <url> (if bird CLI is installed)\n"
"- For protected tweets, configure Twitter cookies: "
"agent-reach configure twitter-cookies AUTH_TOKEN CT0",
url=url,
platform="twitter",
)
title = text[:100] if text else url
return ReadResult(
title=title,
content=text,
url=url,
platform="twitter",
try:
r = subprocess.run(
[bird, "whoami"], capture_output=True, text=True, timeout=10
)
if r.returncode == 0:
return "ok", "完整可用(读取、搜索推文)"
return "warn", (
"bird CLI 已安装但未配置 Cookie。运行\n"
" agent-reach configure twitter-cookies \"auth_token=xxx; ct0=yyy\""
)
except Exception:
return ReadResult(
title="Twitter/X",
content="⚠️ Could not read this tweet.\n"
"The tweet may have been deleted, or the account is private.\n\n"
"Tips:\n"
"- Make sure the URL is correct\n"
"- Try: bird read <url> (if bird CLI is installed)\n"
"- For protected tweets, configure Twitter cookies: "
"agent-reach configure twitter-cookies AUTH_TOKEN CT0",
url=url,
platform="twitter",
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
limit = kwargs.get("limit", 10)
bird = _bird_cmd()
if bird:
return await self._search_bird(query, limit, bird, config)
# Fallback to Exa
return await self._search_exa(query, limit, config)
async def _search_bird(self, query: str, limit: int, bird: str, config=None) -> List[SearchResult]:
try:
result = subprocess.run(
[bird, "search", query, "-n", str(limit)],
capture_output=True, timeout=30,
encoding='utf-8', errors='replace',
env=_bird_env(config),
)
if result.returncode != 0:
stderr = (result.stderr or "").strip()
if "fetch failed" in stderr.lower() or "fetch failed" in (result.stdout or "").lower():
# bird can't connect — fall back to Exa silently
return await self._search_exa(query, limit, config)
return await self._search_exa(query, limit, config)
parsed = self._parse_bird_output(result.stdout)
if not parsed:
# bird returned nothing — try Exa
return await self._search_exa(query, limit, config)
return parsed
except (subprocess.TimeoutExpired, FileNotFoundError):
return await self._search_exa(query, limit, config)
def _parse_bird_output(self, text: str) -> List[SearchResult]:
"""Parse bird text output into SearchResults."""
results = []
current = {}
text_lines = []
for line in text.strip().split("\n"):
line = line.strip()
if line.startswith(""):
if current:
current["text"] = "\n".join(text_lines).strip()
results.append(SearchResult(
title=current.get("text", "")[:80],
url=current.get("url", ""),
snippet=current.get("text", ""),
author=current.get("author", ""),
date=current.get("date", ""),
))
current = {}
text_lines = []
continue
if line.startswith("@") and line.endswith(":") and "(" in line:
current["author"] = line.split()[0]
continue
if line.startswith("date:"):
current["date"] = line[5:].strip()
continue
if line.startswith("url:"):
current["url"] = line[4:].strip()
continue
if current is not None:
text_lines.append(line)
if current and text_lines:
current["text"] = "\n".join(text_lines).strip()
results.append(SearchResult(
title=current.get("text", "")[:80],
url=current.get("url", ""),
snippet=current.get("text", ""),
author=current.get("author", ""),
date=current.get("date", ""),
))
return results
async def _search_exa(self, query: str, limit: int, config=None) -> List[SearchResult]:
from agent_reach.channels.exa_search import ExaSearchChannel
exa = ExaSearchChannel()
return await exa.search(f"site:x.com {query}", config=config, limit=limit)
return "warn", "bird CLI 已安装但连接失败"

View file

@ -1,49 +1,17 @@
# -*- coding: utf-8 -*-
"""Web pages — via Jina Reader API (free, no config needed).
"""Web — any URL via Jina Reader. Always available."""
Backend: Jina Reader (https://r.jina.ai)
Swap to: Firecrawl, Trafilatura, or any other reader API
"""
import requests
from .base import Channel, ReadResult
from .base import Channel
class WebChannel(Channel):
name = "web"
description = "网页(任意 URL"
backends = ["Jina Reader API"]
description = "任意网页"
backends = ["Jina Reader"]
tier = 0
JINA_URL = "https://r.jina.ai/"
def can_handle(self, url: str) -> bool:
# Fallback — handles any URL not matched by other channels
return True
return True # Fallback — handles any URL
async def read(self, url: str, config=None) -> ReadResult:
resp = requests.get(
f"{self.JINA_URL}{url}",
headers={"Accept": "text/markdown"},
timeout=15,
)
resp.raise_for_status()
text = resp.text
# Extract title from first markdown heading
title = url
for line in text.split("\n"):
line = line.strip()
if line.startswith("# "):
title = line[2:].strip()
break
if line.startswith("Title:"):
title = line[6:].strip()
break
return ReadResult(
title=title,
content=text,
url=url,
platform="web",
)
def check(self, config=None):
return "ok", "通过 Jina Reader 读取任意网页curl https://r.jina.ai/URL"

View file

@ -1,16 +1,9 @@
# -*- coding: utf-8 -*-
"""XiaoHongShu (小红书) — via mcporter + xiaohongshu MCP server.
"""XiaoHongShu — check if mcporter + xiaohongshu MCP is available."""
Backend: xiaohongshu-mcp server (internal API, reliable)
Requires: mcporter CLI + xiaohongshu MCP server running
"""
import json
import shutil
import subprocess
from urllib.parse import urlparse, parse_qs, urlencode
from .base import Channel, ReadResult, SearchResult
from typing import List, Optional
from .base import Channel
class XiaoHongShuChannel(Channel):
@ -19,30 +12,8 @@ class XiaoHongShuChannel(Channel):
backends = ["xiaohongshu-mcp"]
tier = 2
def _mcporter_ok(self) -> bool:
"""Check if mcporter + xiaohongshu MCP is available."""
if not shutil.which("mcporter"):
return False
try:
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
return "xiaohongshu" in r.stdout
except Exception:
return False
def _call(self, expr: str, timeout: int = 30) -> str:
r = subprocess.run(
["mcporter", "call", expr],
capture_output=True, text=True, timeout=timeout,
)
if r.returncode != 0:
raise RuntimeError(r.stderr or r.stdout)
return r.stdout
# ── Channel interface ──
def can_handle(self, url: str) -> bool:
from urllib.parse import urlparse
d = urlparse(url).netloc.lower()
return "xiaohongshu.com" in d or "xhslink.com" in d
@ -55,190 +26,25 @@ class XiaoHongShuChannel(Channel):
" 3. mcporter config add xiaohongshu http://localhost:18060/mcp\n"
" 详见 https://github.com/xpzouying/xiaohongshu-mcp"
)
if not self._mcporter_ok():
return "off", (
"mcporter 已装但小红书 MCP 未配置。运行:\n"
" docker run -d --name xiaohongshu-mcp -p 18060:18060 xpzouying/xiaohongshu-mcp\n"
" mcporter config add xiaohongshu http://localhost:18060/mcp"
)
try:
out = self._call("xiaohongshu.check_login_status()", timeout=10)
if "已登录" in out or "logged" in out.lower():
r = subprocess.run(
["mcporter", "list"], capture_output=True, text=True, timeout=10
)
if "xiaohongshu" not in r.stdout:
return "off", (
"mcporter 已装但小红书 MCP 未配置。运行:\n"
" docker run -d --name xiaohongshu-mcp -p 18060:18060 xpzouying/xiaohongshu-mcp\n"
" mcporter config add xiaohongshu http://localhost:18060/mcp"
)
except Exception:
return "off", "mcporter 连接异常"
try:
r = subprocess.run(
["mcporter", "call", "xiaohongshu.check_login_status()"],
capture_output=True, text=True, timeout=10
)
if "已登录" in r.stdout or "logged" in r.stdout.lower():
return "ok", "完整可用(阅读、搜索、发帖、评论、点赞)"
return "warn", "MCP 已连接但未登录,需扫码登录"
except Exception:
return "warn", "MCP 连接异常,检查 xiaohongshu-mcp 服务是否在运行"
async def read(self, url: str, config=None) -> ReadResult:
if not self._mcporter_ok():
return ReadResult(
title="XiaoHongShu",
content=(
"⚠️ 小红书需要 mcporter + xiaohongshu-mcp 才能使用。\n\n"
"安装步骤:\n"
"1. npm install -g mcporter\n"
"2. docker run -d --name xiaohongshu-mcp -p 18060:18060 xpzouying/xiaohongshu-mcp\n"
"3. mcporter config add xiaohongshu http://localhost:18060/mcp\n"
"4. 运行 agent-reach doctor 检查状态\n\n"
"详见 https://github.com/xpzouying/xiaohongshu-mcp"
),
url=url, platform="xiaohongshu",
)
note_id = self._extract_note_id(url)
if not note_id:
return ReadResult(
title="XiaoHongShu",
content=f"⚠️ 无法从 URL 提取笔记 ID: {url}",
url=url, platform="xiaohongshu",
)
# Step 1: try xsec_token from URL query param (e.g. from search results)
xsec_token = self._extract_token_from_url(url)
# Step 2: try homepage feeds
if not xsec_token:
xsec_token = self._find_token_in_feeds(note_id)
# Step 3: search for the note to get a fresh token
if not xsec_token:
xsec_token = self._find_token_by_search(note_id)
# If no token found, fallback to Jina Reader
if not xsec_token:
return await self._read_jina(url)
# Get detail via MCP
out = self._call(
f'xiaohongshu.get_feed_detail(feed_id: "{note_id}", xsec_token: "{xsec_token}")',
timeout=15,
)
return ReadResult(
title=self._extract_title(out) or f"XHS {note_id}",
content=out.strip(),
url=url, platform="xiaohongshu",
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
if not self._mcporter_ok():
raise ValueError(
"小红书搜索需要 mcporter + xiaohongshu-mcp。\n"
"安装: npm install -g mcporter && mcporter config add xiaohongshu http://localhost:18060/mcp"
)
limit = kwargs.get("limit", 10)
safe_q = query.replace('"', '\\"')
out = self._call(f'xiaohongshu.search_feeds(keyword: "{safe_q}")', timeout=30)
results = []
try:
data = json.loads(out)
for item in data.get("feeds", [])[:limit]:
card = item.get("noteCard", {})
user = card.get("user", {})
interact = card.get("interactInfo", {})
note_id = item.get("id", "")
xsec_token = item.get("xsecToken", "")
note_url = f"https://www.xiaohongshu.com/explore/{note_id}"
if xsec_token:
note_url += f"?xsec_token={xsec_token}"
results.append(SearchResult(
title=card.get("displayTitle", ""),
url=note_url,
snippet=f"👤 {user.get('nickname', '')} · ❤ {interact.get('likedCount', '0')}",
score=0,
))
except (json.JSONDecodeError, KeyError):
pass
return results
# ── Helpers ──
def _extract_note_id(self, url: str) -> str:
"""Extract note ID from URL path, ignoring query params."""
path = urlparse(url).path.strip("/").split("/")
return path[-1] if path else ""
def _extract_token_from_url(self, url: str) -> Optional[str]:
"""Extract xsec_token from URL query parameter if present."""
qs = parse_qs(urlparse(url).query)
tokens = qs.get("xsec_token", [])
return tokens[0] if tokens else None
def _find_token_in_feeds(self, note_id: str) -> Optional[str]:
"""Try to find xsec_token for a note from homepage feeds."""
try:
out = self._call("xiaohongshu.list_feeds()", timeout=15)
data = json.loads(out)
for feed in data.get("feeds", []):
if feed.get("id") == note_id:
return feed.get("xsecToken") or None
except Exception:
pass
return None
def _find_token_by_search(self, note_id: str) -> Optional[str]:
"""Search for the note ID to get a fresh xsec_token."""
try:
out = self._call(
f'xiaohongshu.search_feeds(keyword: "{note_id}")', timeout=20
)
data = json.loads(out)
for feed in data.get("feeds", []):
if feed.get("id") == note_id:
return feed.get("xsecToken") or None
# If exact match not found but results exist, try the first one
# (search by note_id sometimes returns the note with a different key)
except Exception:
pass
return None
def _extract_title(self, text: str) -> str:
for line in text.split("\n"):
line = line.strip()
if line and not line.startswith(("{", "[", "#", "http")):
return line[:80]
return ""
async def _read_jina(self, url: str) -> ReadResult:
"""Fallback: read XHS note via Jina Reader when xsec_token unavailable."""
import requests
try:
resp = requests.get(
f"https://r.jina.ai/{url}",
headers={"Accept": "text/markdown"},
timeout=15,
)
resp.raise_for_status()
text = resp.text
if len(text.strip()) < 50 or "登录" in text[:200]:
return ReadResult(
title="XiaoHongShu",
content=(
f"⚠️ 无法获取笔记详情: {url}\n\n"
"小红书需要 xsec_token 才能通过 MCP 读取笔记。\n"
"请尝试先搜索相关关键词,再从结果中读取。"
),
url=url, platform="xiaohongshu",
)
title = ""
for line in text.split("\n"):
line = line.strip()
if line and not line.startswith(("#", "http", "![", "[")):
title = line[:80]
break
return ReadResult(
title=title or "XiaoHongShu",
content=text.strip(),
url=url, platform="xiaohongshu",
)
except Exception:
return ReadResult(
title="XiaoHongShu",
content=(
f"⚠️ 无法获取笔记详情: {url}\n\n"
"小红书需要 xsec_token 才能通过 MCP 读取笔记。\n"
"请尝试先搜索相关关键词,再从结果中读取。"
),
url=url, platform="xiaohongshu",
)

View file

@ -1,125 +1,22 @@
# -*- coding: utf-8 -*-
"""YouTube — via yt-dlp (video info, subtitles, and search).
"""YouTube — check if yt-dlp is available."""
Backend: yt-dlp (https://github.com/yt-dlp/yt-dlp)
Supports: read (info + subtitles), search (ytsearch)
"""
import json
import shutil
import subprocess
import tempfile
from pathlib import Path
from urllib.parse import urlparse
from .base import Channel, ReadResult, SearchResult
from typing import List
from .base import Channel
class YouTubeChannel(Channel):
name = "youtube"
description = "YouTube 视频字幕"
description = "YouTube 视频和字幕"
backends = ["yt-dlp"]
requires_tools = ["yt-dlp"]
tier = 0
def can_handle(self, url: str) -> bool:
from urllib.parse import urlparse
d = urlparse(url).netloc.lower()
return "youtube.com" in d or "youtu.be" in d
async def read(self, url: str, config=None) -> ReadResult:
if not shutil.which("yt-dlp"):
raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp")
with tempfile.TemporaryDirectory() as tmpdir:
info = self._get_info(url)
title = info.get("title", url)
author = info.get("uploader", "")
transcript = self._get_subtitles(url, tmpdir)
if not transcript:
transcript = f"[Video: {title}]\n[No subtitles available.]"
return ReadResult(
title=title, content=transcript, url=url,
author=author, platform="youtube",
extra={
"duration": info.get("duration_string"),
"view_count": info.get("view_count"),
"upload_date": info.get("upload_date"),
},
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
"""Search YouTube via yt-dlp's ytsearch."""
if not shutil.which("yt-dlp"):
raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp")
limit = kwargs.get("limit", 10)
try:
r = subprocess.run(
["yt-dlp", "--dump-json", "--flat-playlist",
f"ytsearch{limit}:{query}"],
capture_output=True, text=True, timeout=30,
)
results = []
for line in r.stdout.strip().split("\n"):
if not line.strip():
continue
try:
d = json.loads(line)
vid = d.get("id", "")
results.append(SearchResult(
title=d.get("title", ""),
url=f"https://youtube.com/watch?v={vid}" if vid else "",
snippet=(
f"👤 {d.get('channel', '?')} · "
f"{d.get('duration_string', '?')} · "
f"👁 {d.get('view_count', '?')}"
),
extra={
"channel": d.get("channel"),
"duration": d.get("duration_string"),
"view_count": d.get("view_count"),
},
))
except json.JSONDecodeError:
continue
return results
except subprocess.TimeoutExpired:
return []
def _get_info(self, url: str) -> dict:
try:
r = subprocess.run(
["yt-dlp", "--dump-json", "--no-download", url],
capture_output=True, text=True, timeout=30,
)
if r.returncode == 0:
return json.loads(r.stdout)
except (subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return {}
def _get_subtitles(self, url: str, tmpdir: str) -> str:
try:
subprocess.run(
["yt-dlp", "--write-auto-sub", "--write-sub",
"--sub-lang", "en,zh-Hans,zh",
"--skip-download", "--sub-format", "vtt",
"-o", f"{tmpdir}/%(id)s.%(ext)s", url],
capture_output=True, text=True, timeout=30,
)
for f in Path(tmpdir).glob("*.vtt"):
text = f.read_text(errors="replace")
lines = []
for line in text.split("\n"):
line = line.strip()
if not line or line.startswith("WEBVTT") or "-->" in line or line.isdigit():
continue
if line not in lines[-1:]:
lines.append(line)
return "\n".join(lines)
except subprocess.TimeoutExpired:
pass
return ""
def check(self, config=None):
if shutil.which("yt-dlp"):
return "ok", "可提取视频信息和字幕"
return "off", "yt-dlp 未安装。安装pip install yt-dlp"

View file

@ -1,20 +1,15 @@
# -*- coding: utf-8 -*-
"""
Agent Reach CLI command-line interface.
Agent Reach CLI installer, doctor, and configuration tool.
Usage:
agent-reach read <url>
agent-reach search <query>
agent-reach search-reddit <query> [--sub <subreddit>]
agent-reach search-github <query> [--lang <language>]
agent-reach search-twitter <query>
agent-reach setup
agent-reach install --env=auto
agent-reach doctor
agent-reach version
agent-reach configure twitter-cookies "auth_token=xxx; ct0=yyy"
agent-reach setup
"""
import sys
import asyncio
import argparse
import json
import os
@ -48,57 +43,6 @@ def main():
sub = parser.add_subparsers(dest="command", help="Available commands")
# ── read ──
p_read = sub.add_parser("read", help="Read content from a URL")
p_read.add_argument("url", help="URL to read")
p_read.add_argument("--json", dest="as_json", action="store_true", help="Output as JSON")
# ── search ──
p_search = sub.add_parser("search", help="Search the web (Exa)")
p_search.add_argument("query", nargs="+", help="Search query")
p_search.add_argument("-n", "--num", type=int, default=5, help="Number of results")
# ── search-reddit ──
p_sr = sub.add_parser("search-reddit", help="Search Reddit")
p_sr.add_argument("query", nargs="+", help="Search query")
p_sr.add_argument("--sub", help="Subreddit filter")
p_sr.add_argument("-n", "--num", type=int, default=10, help="Number of results")
# ── search-github ──
p_sg = sub.add_parser("search-github", help="Search GitHub")
p_sg.add_argument("query", nargs="+", help="Search query")
p_sg.add_argument("--lang", help="Language filter")
p_sg.add_argument("-n", "--num", type=int, default=5, help="Number of results")
# ── search-twitter ──
p_st = sub.add_parser("search-twitter", help="Search Twitter")
p_st.add_argument("query", nargs="+", help="Search query")
p_st.add_argument("-n", "--num", type=int, default=10, help="Number of results")
# ── search-youtube ──
p_sy = sub.add_parser("search-youtube", help="Search YouTube")
p_sy.add_argument("query", nargs="+", help="Search query")
p_sy.add_argument("-n", "--num", type=int, default=5, help="Number of results")
# ── search-bilibili ──
p_sb = sub.add_parser("search-bilibili", help="Search Bilibili")
p_sb.add_argument("query", nargs="+", help="Search query")
p_sb.add_argument("-n", "--num", type=int, default=5, help="Number of results")
# ── search-xhs ──
p_sx = sub.add_parser("search-xhs", help="Search XiaoHongShu")
p_sx.add_argument("query", nargs="+", help="Search query")
p_sx.add_argument("-n", "--num", type=int, default=10, help="Number of results")
# ── search-linkedin ──
p_sl = sub.add_parser("search-linkedin", help="Search LinkedIn")
p_sl.add_argument("query", nargs="+", help="Search query")
p_sl.add_argument("-n", "--num", type=int, default=10, help="Number of results")
# ── search-bosszhipin ──
p_sbz = sub.add_parser("search-bosszhipin", help="Search Boss直聘")
p_sbz.add_argument("query", nargs="+", help="Search query")
p_sbz.add_argument("-n", "--num", type=int, default=10, help="Number of results")
# ── setup ──
sub.add_parser("setup", help="Interactive configuration wizard")
@ -161,10 +105,6 @@ def main():
_cmd_install(args)
elif args.command == "configure":
_cmd_configure(args)
elif args.command == "read":
asyncio.run(_cmd_read(args))
elif args.command.startswith("search"):
asyncio.run(_cmd_search(args))
# ── Command handlers ────────────────────────────────
@ -849,98 +789,6 @@ def _cmd_setup():
print()
async def _cmd_read(args):
from agent_reach.core import AgentReach
eyes = AgentReach()
try:
result = await eyes.read(args.url)
if args.as_json:
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
print(f"\n📖 {result.get('title', 'Untitled')}")
print(f"🔗 {result.get('url', '')}")
if result.get("author"):
print(f"👤 {result['author']}")
print(f"\n{result.get('content', '')}")
except Exception as e:
error_str = str(e)
if "400" in error_str and "Bad Request" in error_str:
print(f"❌ Invalid URL: {args.url}", file=sys.stderr)
print(" Please provide a valid URL (e.g., https://example.com)", file=sys.stderr)
elif "ConnectionError" in type(e).__name__ or "Timeout" in type(e).__name__:
print(f"❌ Could not connect to: {args.url}", file=sys.stderr)
print(" Check your internet connection or the URL.", file=sys.stderr)
else:
print(f"❌ Error: {e}", file=sys.stderr)
sys.exit(1)
async def _cmd_search(args):
from agent_reach.core import AgentReach
eyes = AgentReach()
query = " ".join(args.query).strip()
num = args.num
if not query:
print("Please provide a search query.", file=sys.stderr)
sys.exit(1)
try:
if args.command == "search":
results = await eyes.search(query, num_results=num)
elif args.command == "search-reddit":
results = await eyes.search_reddit(query, subreddit=getattr(args, "sub", None), limit=num)
elif args.command == "search-github":
results = await eyes.search_github(query, language=getattr(args, "lang", None), limit=num)
elif args.command == "search-twitter":
results = await eyes.search_twitter(query, limit=num)
elif args.command == "search-youtube":
results = await eyes.search_youtube(query, limit=num)
elif args.command == "search-bilibili":
results = await eyes.search_bilibili(query, limit=num)
elif args.command == "search-xhs":
results = await eyes.search_xhs(query, limit=num)
elif args.command == "search-linkedin":
results = await eyes.search_linkedin(query, limit=num)
elif args.command == "search-bosszhipin":
results = await eyes.search_bosszhipin(query, limit=num)
else:
print(f"Unknown command: {args.command}", file=sys.stderr)
sys.exit(1)
except Exception as e:
error_str = str(e)
if "401" in error_str or "Unauthorized" in error_str:
print("⚠️ Exa API key not configured or invalid.")
print("Get a free key at https://exa.ai (1000 searches/month free)")
print("Then run: agent-reach configure exa-key YOUR_KEY")
sys.exit(1)
elif "exa" in error_str.lower() or "api_key" in error_str.lower():
print("⚠️ Exa API key not configured.")
print("Get a free key at https://exa.ai")
print("Then run: agent-reach configure exa-key YOUR_KEY")
sys.exit(1)
else:
print(f"❌ Error: {e}", file=sys.stderr)
sys.exit(1)
if not results:
print("No results found.")
return
for i, r in enumerate(results, 1):
title = r.get("title") or r.get("name") or r.get("text", "")[:60]
url = r.get("url", "")
snippet = r.get("snippet") or r.get("description") or r.get("text", "")
print(f"\n{i}. {title}")
print(f" 🔗 {url}")
if snippet:
print(f" {snippet[:200]}")
# Extra info for GitHub
extra = r.get("extra", {})
if extra.get("stars"):
print(f"{extra['stars']} 🍴 {extra.get('forks', 0)} 📝 {extra.get('language', '')}")
def _cmd_check_update():
"""Check for newer versions on GitHub."""
import requests

View file

@ -1,120 +1,36 @@
# -*- coding: utf-8 -*-
"""
AgentReach the unified entry point.
AgentReach installer, doctor, and configuration tool.
Pure glue: routes URLs to the right channel, routes searches to the right engine.
Every channel is a thin wrapper around an external tool. Swap any backend anytime.
Agent Reach helps AI agents install and configure upstream platform tools
(bird CLI, yt-dlp, mcporter, gh CLI, etc.). After installation, agents
call the upstream tools directly no wrapper layer needed.
Usage:
from agent_reach import AgentReach
from agent_reach.doctor import check_all, format_report
from agent_reach.config import Config
eyes = AgentReach()
content = await eyes.read("https://github.com/openai/gpt-4")
results = await eyes.search("AI agent framework")
config = Config()
results = check_all(config)
print(format_report(results))
"""
import asyncio
from typing import Any, Dict, List, Optional
from typing import Dict, Optional
from agent_reach.config import Config
from agent_reach.channels import get_channel_for_url, get_channel, get_all_channels
class AgentReach:
"""Give your AI Agent eyes to see the entire internet."""
"""Give your AI Agent eyes to see the entire internet.
This class provides health-check functionality.
For reading/searching, use the upstream tools directly
(see SKILL.md for commands).
"""
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
# ── Reading ─────────────────────────────────────────
async def read(self, url: str) -> Dict[str, Any]:
"""
Read content from any URL. Auto-detects platform.
Supported: Web, GitHub, Reddit, Twitter, YouTube,
Bilibili, RSS, and more.
Returns:
Dict with title, content, url, author, platform, etc.
"""
if not url.startswith(("http://", "https://")):
url = f"https://{url}"
channel = get_channel_for_url(url)
result = await channel.read(url, config=self.config)
return result.to_dict()
async def read_batch(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Read multiple URLs concurrently."""
tasks = [self.read(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
def detect_platform(self, url: str) -> str:
"""Detect what platform a URL belongs to."""
channel = get_channel_for_url(url)
return channel.name
# ── Searching ───────────────────────────────────────
async def search(self, query: str, num_results: int = 5) -> List[Dict[str, Any]]:
"""Semantic web search via Exa."""
ch = get_channel("exa_search")
results = await ch.search(query, config=self.config, limit=num_results)
return [r.to_dict() for r in results]
async def search_reddit(self, query: str, subreddit: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
"""Search Reddit via Exa (bypasses IP blocks)."""
ch = get_channel("exa_search")
q = f"site:reddit.com/r/{subreddit} {query}" if subreddit else f"site:reddit.com {query}"
results = await ch.search(q, config=self.config, limit=limit)
return [r.to_dict() for r in results]
async def search_github(self, query: str, language: Optional[str] = None, limit: int = 5) -> List[Dict[str, Any]]:
"""Search GitHub repositories."""
ch = get_channel("github")
results = await ch.search(query, config=self.config, language=language, limit=limit)
return [r.to_dict() for r in results]
async def search_twitter(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search Twitter. Uses bird CLI if available, else Exa."""
ch = get_channel("twitter")
results = await ch.search(query, config=self.config, limit=limit)
return [r.to_dict() for r in results]
async def search_youtube(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Search YouTube via yt-dlp."""
ch = get_channel("youtube")
results = await ch.search(query, config=self.config, limit=limit)
return [r.to_dict() for r in results]
async def search_bilibili(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Search Bilibili. Tries yt-dlp first, falls back to Exa."""
ch = get_channel("bilibili")
results = await ch.search(query, config=self.config, limit=limit)
return [r.to_dict() for r in results]
async def search_xhs(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search XiaoHongShu via mcporter."""
ch = get_channel("xiaohongshu")
results = await ch.search(query, config=self.config, limit=limit)
return [r.to_dict() for r in results]
async def search_linkedin(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search LinkedIn via MCP or Exa."""
ch = get_channel("linkedin")
results = await ch.search(query, config=self.config, limit=limit)
return [r.to_dict() for r in results]
async def search_bosszhipin(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search Boss直聘 via MCP or Exa."""
ch = get_channel("bosszhipin")
results = await ch.search(query, config=self.config, limit=limit)
return [r.to_dict() for r in results]
# ── Health ──────────────────────────────────────────
def doctor(self) -> Dict[str, dict]:
"""Check all channel availability."""
from agent_reach.doctor import check_all
@ -124,13 +40,3 @@ class AgentReach:
"""Get formatted health report."""
from agent_reach.doctor import check_all, format_report
return format_report(check_all(self.config))
# ── Sync wrappers ───────────────────────────────────
def read_sync(self, url: str) -> Dict[str, Any]:
"""Synchronous version of read()."""
return asyncio.run(self.read(url))
def search_sync(self, query: str, num_results: int = 5) -> List[Dict[str, Any]]:
"""Synchronous version of search()."""
return asyncio.run(self.search(query, num_results))

View file

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
"""
Agent Reach MCP Server expose all capabilities as MCP tools.
Agent Reach MCP Server expose doctor/status as MCP tool.
Run: python -m agent_reach.integrations.mcp_server
8 tools for any MCP-compatible AI Agent.
Agent Reach is an installer + doctor tool. For actual reading/searching,
agents should call upstream tools directly (bird, yt-dlp, mcporter, etc.).
"""
import asyncio
@ -35,50 +36,15 @@ def create_server():
@server.list_tools()
async def list_tools():
return [
Tool(name="read_url",
description="Read content from any URL. Supports: web, GitHub, Reddit, Twitter, YouTube, Bilibili, RSS.",
inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}),
Tool(name="read_batch",
description="Read multiple URLs concurrently.",
inputSchema={"type": "object", "properties": {"urls": {"type": "array", "items": {"type": "string"}}}, "required": ["urls"]}),
Tool(name="detect_platform",
description="Detect what platform a URL belongs to.",
inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}),
Tool(name="search",
description="Semantic web search via Exa.",
inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "num_results": {"type": "integer", "default": 5}}, "required": ["query"]}),
Tool(name="search_reddit",
description="Search Reddit posts.",
inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "subreddit": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}),
Tool(name="search_github",
description="Search GitHub repositories.",
inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "language": {"type": "string"}, "limit": {"type": "integer", "default": 5}}, "required": ["query"]}),
Tool(name="search_twitter",
description="Search Twitter/X posts.",
inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}),
Tool(name="get_status",
description="Get Agent Reach status: which channels are active.",
description="Get Agent Reach status: which channels are installed and active.",
inputSchema={"type": "object", "properties": {}}),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
try:
if name == "read_url":
result = await eyes.read(arguments["url"])
elif name == "read_batch":
result = await eyes.read_batch(arguments["urls"])
elif name == "detect_platform":
result = eyes.detect_platform(arguments["url"])
elif name == "search":
result = await eyes.search(arguments["query"], arguments.get("num_results", 5))
elif name == "search_reddit":
result = await eyes.search_reddit(arguments["query"], arguments.get("subreddit"), arguments.get("limit", 10))
elif name == "search_github":
result = await eyes.search_github(arguments["query"], arguments.get("language"), arguments.get("limit", 5))
elif name == "search_twitter":
result = await eyes.search_twitter(arguments["query"], arguments.get("limit", 10))
elif name == "get_status":
if name == "get_status":
result = eyes.doctor_report()
else:
result = f"Unknown tool: {name}"

View file

@ -1,72 +1,39 @@
---
name: agent-reach
description: >
Give your AI agent eyes to see the entire internet. Read and search across
Twitter/X, Reddit, YouTube, GitHub, Bilibili, XiaoHongShu, LinkedIn,
Boss直聘, RSS, and any web page — all from a single CLI.
Use when: (1) reading content from URLs (tweets, Reddit posts, articles, videos),
(2) searching across platforms (web, Twitter, Reddit, GitHub, YouTube, Bilibili,
XiaoHongShu, LinkedIn, Boss直聘),
(3) user asks to configure/enable a platform channel,
(4) checking channel health or updating Agent Reach.
Triggers: "search Twitter/Reddit/YouTube", "read this URL", "find posts about",
"搜索", "读取", "查一下", "看看这个链接",
"帮我配", "帮我添加", "帮我安装".
Give your AI agent eyes to see the entire internet. Install and configure
upstream tools for Twitter/X, Reddit, YouTube, GitHub, Bilibili, XiaoHongShu,
LinkedIn, Boss直聘, RSS, and any web page — then call them directly.
Use when: (1) setting up platform access tools for the first time,
(2) checking which platforms are available,
(3) user asks to configure/enable a platform channel.
Triggers: "帮我配", "帮我添加", "帮我安装", "agent reach", "install channels",
"configure twitter", "enable reddit".
---
# Agent Reach
Read and search the internet across 12+ platforms via unified CLI.
Install and configure upstream tools for 12+ platforms. After setup, call them directly — no wrapper layer.
## Setup
First check if agent-reach is installed:
```bash
agent-reach doctor
```
If command not found, install it:
```bash
pip install https://github.com/Panniantong/agent-reach/archive/main.zip
agent-reach install --env=auto
agent-reach doctor
```
`install` auto-detects your environment and installs core dependencies (Node.js, mcporter, bird CLI, gh CLI). Read the output and run `agent-reach doctor` to see what's active.
`install` auto-detects your environment and installs core dependencies (Node.js, mcporter, bird CLI, gh CLI, yt-dlp, feedparser). Run `doctor` to see what's active.
## Commands
### Read any URL
```bash
agent-reach read <url>
agent-reach read <url> --json # structured output
```
Handles: tweets, Reddit posts, articles, YouTube/Bilibili (transcripts), GitHub repos, LinkedIn profiles, Boss直聘 jobs, XiaoHongShu notes, RSS feeds, and any web page.
### Search
```bash
agent-reach search "query" # web search (Exa)
agent-reach search-twitter "query" # Twitter/X
agent-reach search-reddit "query" # Reddit (--sub <subreddit>)
agent-reach search-github "query" # GitHub (--lang <language>)
agent-reach search-youtube "query" # YouTube
agent-reach search-bilibili "query" # Bilibili (B站)
agent-reach search-xhs "query" # XiaoHongShu (小红书)
agent-reach search-linkedin "query" # LinkedIn
agent-reach search-bosszhipin "query" # Boss直聘
```
All search commands support `-n <count>` for number of results.
### Management
## Management
```bash
agent-reach doctor # channel status overview
agent-reach watch # quick health + update check (for scheduled tasks)
agent-reach watch # quick health + update check
agent-reach check-update # check for new versions
```
### Configure channels
## Configure channels
```bash
agent-reach configure twitter-cookies "auth_token=xxx; ct0=yyy"
@ -76,45 +43,188 @@ agent-reach configure --from-browser chrome # auto-extract cookies from local
## Configuring a channel ("帮我配 XXX")
All channels follow the same flow. When a user asks to configure/enable any channel:
When a user asks to configure/enable any channel:
1. Run `agent-reach doctor`
2. Find the channel in the output — it shows the current status (✅/⚠️/⬜) and **what to do next**
3. Execute whatever you can automatically (install packages, start services, register MCP)
4. For steps that require human action (scan QR code, paste cookies, login), tell the user exactly what to do
5. After the user completes their part, run `agent-reach doctor` again to verify
2. Find the channel — it shows status (✅/⚠️/⬜) and **what to do next**
3. Execute what you can automatically (install packages, start services)
4. For human-required steps (scan QR, paste cookies), tell the user what to do
5. Run `agent-reach doctor` again to verify
**Do NOT memorize per-channel steps.** Always rely on `doctor` output — it stays up-to-date even when backends change.
**Do NOT memorize per-channel steps.** Always rely on `doctor` output.
### Common human actions
These are things only the user can do. When `doctor` indicates one is needed, explain it clearly:
- **Paste cookies:** User installs [Cookie-Editor](https://chromewebstore.google.com/detail/cookie-editor/hlkenndednhfkekhgcdicdfddnkalmdm) → goes to the website → exports Header String → sends it to you → you run `agent-reach configure <platform>-cookies "..."`
- **Scan QR code:** User opens the URL on their phone/browser and scans with the platform's app
- **Proxy:** Reddit/Bilibili/XiaoHongShu may block server IPs — suggest a residential proxy if on a server
- **Paste cookies:** User installs [Cookie-Editor](https://chromewebstore.google.com/detail/cookie-editor/hlkenndednhfkekhgcdicdfddnkalmdm) Chrome extension → goes to the website → exports Header String → sends it to you → you run `agent-reach configure <platform>-cookies "..."`
- **Scan QR code:** User opens the URL shown in `doctor` output on their phone/browser and scans with the platform's app
- **Browser login:** Some MCP services need a one-time browser login; on servers without a display, user may need VNC
- **Proxy:** Reddit/Bilibili/XiaoHongShu block server IPs — suggest a residential proxy (~$1/month) if on a server
---
## Tips
## Using Upstream Tools Directly
- Always try `agent-reach read <url>` first for any URL — it auto-detects the platform
- If a channel is ⬜ but the user hasn't asked for it, don't push — let them opt in
- If a channel breaks, run `agent-reach doctor` to diagnose
- LinkedIn and Boss直聘 have Jina Reader fallback even without full setup
- Twitter search 在 bird 失败时会自动 fallback 到 Exa 搜索
After `agent-reach install`, call the upstream tools directly. No need for `agent-reach read` or `agent-reach search`.
### Twitter/X (bird CLI)
```bash
# Search tweets
bird search "query" --json -n 10
# Read a specific tweet
bird read https://x.com/user/status/123 --json
# Read a user's timeline
bird timeline @username --json -n 20
```
### YouTube (yt-dlp)
```bash
# Get video metadata
yt-dlp --dump-json "https://www.youtube.com/watch?v=xxx"
# Download subtitles only
yt-dlp --write-sub --write-auto-sub --sub-lang "zh-Hans,zh,en" --skip-download -o "/tmp/%(id)s" "URL"
# Then read the .vtt file
# Search (yt-dlp ytsearch)
yt-dlp --dump-json "ytsearch5:query"
```
### Bilibili (yt-dlp)
```bash
# Get video metadata
yt-dlp --dump-json "https://www.bilibili.com/video/BVxxx"
# Download subtitles
yt-dlp --write-sub --write-auto-sub --sub-lang "zh-Hans,zh,en" --convert-subs vtt --skip-download -o "/tmp/%(id)s" "URL"
```
### Reddit (JSON API)
```bash
# Read a subreddit
curl -s "https://www.reddit.com/r/python/hot.json?limit=10" -H "User-Agent: agent-reach/1.0"
# Read a post with comments
curl -s "https://www.reddit.com/r/python/comments/POST_ID.json" -H "User-Agent: agent-reach/1.0"
# Search
curl -s "https://www.reddit.com/search.json?q=query&limit=10" -H "User-Agent: agent-reach/1.0"
```
Note: On servers, Reddit may block your IP. Use proxy or search via Exa instead.
### 小红书 / XiaoHongShu (mcporter + xiaohongshu-mcp)
```bash
# Search notes
mcporter call 'xiaohongshu.search_feeds(keyword: "query")'
# Read a note
mcporter call 'xiaohongshu.get_feed_detail(feed_id: "xxx", xsec_token: "yyy")'
# Get comments
mcporter call 'xiaohongshu.get_feed_comments(feed_id: "xxx", xsec_token: "yyy")'
# Post a note
mcporter call 'xiaohongshu.create_image_feed(title: "标题", desc: "内容", image_paths: ["/path/to/img.jpg"])'
```
### GitHub (gh CLI)
```bash
# Search repos
gh search repos "query" --sort stars --limit 10
# View a repo
gh repo view owner/repo
# Search code
gh search code "query" --language python
# List issues
gh issue list -R owner/repo --state open
# View a specific issue/PR
gh issue view 123 -R owner/repo
```
### Web — Any URL (Jina Reader)
```bash
# Read any webpage as markdown
curl -s "https://r.jina.ai/URL" -H "Accept: text/markdown"
# Search the web
curl -s "https://s.jina.ai/query" -H "Accept: text/markdown"
```
### Exa Search (mcporter + exa MCP)
```bash
# Web search
mcporter call 'exa.web_search_exa(query: "query", numResults: 5)'
# Code search (GitHub, StackOverflow, docs)
mcporter call 'exa.get_code_context_exa(query: "how to parse JSON in Python", tokensNum: 3000)'
# Company research
mcporter call 'exa.company_research_exa(companyName: "OpenAI")'
```
### LinkedIn (mcporter + linkedin-scraper-mcp)
```bash
# View a profile
mcporter call 'linkedin.get_person_profile(linkedin_url: "https://linkedin.com/in/username")'
# Search people
mcporter call 'linkedin.search_people(keyword: "AI engineer", limit: 10)'
# View company
mcporter call 'linkedin.get_company_profile(linkedin_url: "https://linkedin.com/company/xxx")'
```
Fallback: `curl -s "https://r.jina.ai/https://linkedin.com/in/username"`
### Boss直聘 (mcporter + mcp-bosszp)
```bash
# Browse recommended jobs
mcporter call 'bosszhipin.get_recommend_jobs_tool(page: 1)'
# Search jobs
mcporter call 'bosszhipin.search_jobs_tool(keyword: "Python", city: "北京", page: 1)'
# View job details
mcporter call 'bosszhipin.get_job_detail_tool(job_url: "https://www.zhipin.com/job_detail/xxx")'
```
Fallback: `curl -s "https://r.jina.ai/https://www.zhipin.com/job_detail/xxx"`
### RSS (feedparser)
```python
python3 -c "
import feedparser
d = feedparser.parse('https://example.com/feed')
for e in d.entries[:5]:
print(f'{e.title} — {e.link}')
"
```
## Troubleshooting
### Twitter "fetch failed"
bird CLI 用 Node.js 原生 `fetch()`,不走 `HTTP_PROXY`/`HTTPS_PROXY` 环境变量。
Agent Reach 已内置解决方案:通过 `undici``EnvHttpProxyAgent` 自动注入代理。
bird CLI uses Node.js native `fetch()`, which doesn't respect `HTTP_PROXY`. Solutions:
1. Ensure `undici` is installed: `npm install -g undici`
2. Configure proxy: `agent-reach configure proxy http://user:pass@ip:port`
3. If still failing, use transparent proxy (Clash TUN, Proxifier)
**你Agent需要确认**
1. `undici` 已全局安装:`npm list -g undici`,没有就装:`npm install -g undici`
2. 用户已配置代理:`agent-reach configure proxy http://user:pass@ip:port`
3. Agent Reach 会自动处理注入,不需要用户额外操作
### Channel broken?
如果以上都没问题仍然 "fetch failed",建议用户使用透明代理(如 Clash TUN 模式、Proxifier
详见 https://github.com/Panniantong/Agent-Reach/blob/main/docs/troubleshooting.md
Run `agent-reach doctor` — it shows what's wrong and how to fix it.