v2.0.0 — Pure glue architecture: zero copied code, pluggable channels

BREAKING: Complete architectural rewrite.

Before: Copied x-reader's fetcher code into readers/ (1205 lines of borrowed code)
After: Pluggable channel system where each channel is a thin wrapper (~50 lines)
       around the best external tool for that platform. Zero copied code.

Architecture:
- channels/base.py — Universal Channel interface (read, search, check)
- channels/web.py — Jina Reader API (swappable)
- channels/github.py — GitHub API (swappable)
- channels/twitter.py — birdx + Jina fallback (swappable)
- channels/youtube.py — yt-dlp (swappable)
- channels/reddit.py — Reddit JSON API + proxy (swappable)
- channels/rss.py — feedparser (swappable)
- channels/bilibili.py — Bilibili API (swappable)
- channels/exa_search.py — Exa semantic search (swappable)

Key design: every backend can be swapped by changing ONE file.
YouTube dies? Change youtube.py. Exa sucks? Swap exa_search.py for Tavily.
Nothing else changes.

Removed: reader.py, schema.py, readers/, search/, utils/ (all x-reader code)
Tests: 36/36 passing
This commit is contained in:
Panniantong 2026-02-24 05:38:21 +01:00
parent 7e4cd961ee
commit 74c3df5c3d
38 changed files with 1155 additions and 2725 deletions

View file

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
"""
Channel registry routes URLs to the right channel.
This is the core of Agent Eyes' pluggable architecture.
Add a new channel: just create a file and register it here.
Swap a backend: just change the implementation inside the channel file.
"""
from typing import Dict, List, Optional
from .base import Channel, ReadResult, SearchResult
# Import all channels
from .web import WebChannel
from .github import GitHubChannel
from .twitter import TwitterChannel
from .youtube import YouTubeChannel
from .reddit import RedditChannel
from .rss import RSSChannel
from .bilibili import BilibiliChannel
from .exa_search import ExaSearchChannel
# Channel registry — order matters (first match wins, web is last as fallback)
ALL_CHANNELS: List[Channel] = [
GitHubChannel(),
TwitterChannel(),
YouTubeChannel(),
RedditChannel(),
BilibiliChannel(),
RSSChannel(),
ExaSearchChannel(),
WebChannel(), # Fallback — handles any URL
]
# Search-capable channels
SEARCH_CHANNELS: Dict[str, Channel] = {
ch.name: ch for ch in ALL_CHANNELS if ch.can_search()
}
def get_channel_for_url(url: str) -> Channel:
"""Find the right channel for a URL."""
for channel in ALL_CHANNELS:
if channel.can_handle(url):
return channel
return WebChannel() # Should never reach here, but just in case
def get_channel(name: str) -> Optional[Channel]:
"""Get a channel by name."""
for ch in ALL_CHANNELS:
if ch.name == name:
return ch
return None
def get_all_channels() -> List[Channel]:
"""Get all registered channels."""
return ALL_CHANNELS
__all__ = [
"Channel", "ReadResult", "SearchResult",
"ALL_CHANNELS", "SEARCH_CHANNELS",
"get_channel_for_url", "get_channel", "get_all_channels",
]

140
agent_eyes/channels/base.py Normal file
View file

@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
"""
Channel base class the universal interface for all platforms.
Every channel (YouTube, Twitter, GitHub, etc.) implements this interface.
The backend tool can be swapped anytime without changing anything else.
Example:
class YouTubeChannel(Channel):
name = "youtube"
backends = ["yt-dlp"] # current backend, can be swapped
async def read(self, url, config):
# Just call yt-dlp, return standardized dict
...
"""
import shutil
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
@dataclass
class ReadResult:
"""Standardized read result. Every channel returns this."""
title: str
content: str
url: str
author: str = ""
date: str = ""
platform: str = ""
extra: dict = None
def __post_init__(self):
self.extra = self.extra or {}
def to_dict(self) -> dict:
d = {
"title": self.title,
"content": self.content,
"url": self.url,
"platform": self.platform,
}
if self.author:
d["author"] = self.author
if self.date:
d["date"] = self.date
if self.extra:
d["extra"] = self.extra
return d
@dataclass
class SearchResult:
"""Standardized search result."""
title: str
url: str
snippet: str = ""
author: str = ""
date: str = ""
score: float = 0
extra: dict = None
def __post_init__(self):
self.extra = self.extra or {}
def to_dict(self) -> dict:
d = {
"title": self.title,
"url": self.url,
"snippet": self.snippet,
}
if self.author:
d["author"] = self.author
if self.date:
d["date"] = self.date
if self.extra:
d["extra"] = self.extra
return d
class Channel(ABC):
"""
Base class for all channels.
Subclasses just need to implement:
- read(url, config) ReadResult
- can_handle(url) bool
- check(config) (status, message)
Optionally:
- search(query, config, **kwargs) list[SearchResult]
"""
name: str = "" # e.g. "youtube"
description: str = "" # e.g. "YouTube video transcripts"
backends: List[str] = [] # e.g. ["yt-dlp"] — what external tool is used
requires_config: List[str] = [] # e.g. ["reddit_proxy"]
requires_tools: List[str] = [] # e.g. ["yt-dlp"]
tier: int = 0 # 0=zero-config, 1=needs free key, 2=needs setup
@abstractmethod
async def read(self, url: str, config=None) -> ReadResult:
"""Read content from a URL. Must return ReadResult."""
...
@abstractmethod
def can_handle(self, url: str) -> bool:
"""Check if this channel can handle this URL."""
...
def check(self, config=None) -> Tuple[str, str]:
"""
Check if this channel is available.
Returns (status, message) where status is 'ok'/'warn'/'off'/'error'.
"""
# Check required tools
for tool in self.requires_tools:
if not shutil.which(tool):
return "off", f"Install: pip install {tool}"
# Check required config
for key in self.requires_config:
if config and not config.get(key):
return "off", f"Need config: {key}. Run: agent-eyes setup"
return "ok", f"{', '.join(self.backends) if self.backends else 'built-in'}"
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
"""Search this platform. Override if supported."""
raise NotImplementedError(f"{self.name} does not support search")
def can_search(self) -> bool:
"""Whether this channel supports search."""
try:
# Check if search is overridden
return type(self).search is not Channel.search
except:
return False

View file

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
"""Bilibili — via public API (free, no config needed).
Backend: Bilibili public API
Swap to: any Bilibili access method
"""
import requests
from urllib.parse import urlparse, parse_qs
from .base import Channel, ReadResult
class BilibiliChannel(Channel):
name = "bilibili"
description = "Bilibili video info and subtitles"
backends = ["Bilibili API"]
tier = 0
def can_handle(self, url: str) -> bool:
domain = urlparse(url).netloc.lower()
return "bilibili.com" in domain or "b23.tv" in domain
async def read(self, url: str, config=None) -> ReadResult:
# Extract BV id from URL
path = urlparse(url).path
bv_id = ""
for part in path.split("/"):
if part.startswith("BV"):
bv_id = part
break
if not bv_id:
# Fallback to Jina Reader
from agent_eyes.channels.web import WebChannel
return await WebChannel().read(url, config)
# Get video info
resp = requests.get(
"https://api.bilibili.com/x/web-interface/view",
params={"bvid": bv_id},
headers={"User-Agent": "Mozilla/5.0"},
timeout=15,
)
resp.raise_for_status()
data = resp.json().get("data", {})
title = data.get("title", "")
desc = data.get("desc", "")
author = data.get("owner", {}).get("name", "")
# Try to get subtitles
subtitle_text = ""
subtitle_list = data.get("subtitle", {}).get("list", [])
if subtitle_list:
sub_url = subtitle_list[0].get("subtitle_url", "")
if sub_url:
if sub_url.startswith("//"):
sub_url = "https:" + sub_url
sr = requests.get(sub_url, timeout=10)
if sr.ok:
sub_data = sr.json()
lines = [item.get("content", "") for item in sub_data.get("body", [])]
subtitle_text = "\n".join(lines)
content = desc
if subtitle_text:
content += f"\n\n## Transcript\n{subtitle_text}"
return ReadResult(
title=title,
content=content,
url=url,
author=author,
platform="bilibili",
extra={"view": data.get("stat", {}).get("view", 0),
"like": data.get("stat", {}).get("like", 0)},
)

View file

@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
"""Exa semantic search — the search backbone for Agent Eyes.
Backend: Exa API (https://exa.ai) free 1000 searches/month
Swap to: Tavily, SerpAPI, or any search API
"""
import os
import requests
from .base import Channel, SearchResult
from typing import List
class ExaSearchChannel(Channel):
name = "exa_search"
description = "Semantic web search (powers Reddit/Twitter search too)"
backends = ["Exa API"]
requires_config = ["exa_api_key"]
tier = 1
API_URL = "https://api.exa.ai/search"
def can_handle(self, url: str) -> bool:
return False # Search-only channel, doesn't read URLs
async def read(self, url: str, config=None) -> None:
raise NotImplementedError("Exa is a search engine, not a reader")
def _get_key(self, config=None) -> str:
if config:
key = config.get("exa_api_key")
if key:
return key
key = os.environ.get("EXA_API_KEY")
if key:
return key
raise ValueError(
"Exa API key not configured.\n"
"Get a free key at https://exa.ai (1000 searches/month free)\n"
"Then run: agent-eyes setup"
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
api_key = self._get_key(config)
limit = kwargs.get("limit", 5)
resp = requests.post(
self.API_URL,
headers={"Content-Type": "application/json", "x-api-key": api_key},
json={
"query": query,
"numResults": min(limit, 10),
"type": "auto",
"contents": {"text": {"maxCharacters": 500}},
},
timeout=15,
)
resp.raise_for_status()
results = []
for item in resp.json().get("results", []):
results.append(SearchResult(
title=item.get("title", ""),
url=item.get("url", ""),
snippet=item.get("text", ""),
date=item.get("publishedDate", ""),
score=item.get("score", 0),
))
return results

View file

@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
"""GitHub — via GitHub REST API (free, no config needed).
Backend: GitHub API v3
Swap to: gh CLI, or any GitHub API wrapper
"""
import requests
from urllib.parse import urlparse
from .base import Channel, ReadResult, SearchResult
from typing import List
class GitHubChannel(Channel):
name = "github"
description = "GitHub repos, issues, PRs, code"
backends = ["GitHub API"]
tier = 0
API = "https://api.github.com"
def _headers(self, config=None):
h = {"Accept": "application/vnd.github+json"}
token = config.get("github_token") if config else None
if token:
h["Authorization"] = f"Bearer {token}"
return h
def can_handle(self, url: str) -> bool:
domain = urlparse(url).netloc.lower()
return "github.com" in domain
async def read(self, url: str, config=None) -> ReadResult:
path = urlparse(url).path.strip("/").split("/")
if len(path) < 2:
raise ValueError(f"Invalid GitHub URL: {url}")
owner, repo = path[0], path[1]
headers = self._headers(config)
# Issues/PRs
if len(path) >= 4 and path[2] in ("issues", "pull"):
num = path[3]
resp = requests.get(f"{self.API}/repos/{owner}/{repo}/issues/{num}", headers=headers, timeout=15)
resp.raise_for_status()
data = resp.json()
# Get comments
comments_text = ""
if data.get("comments", 0) > 0:
cr = requests.get(f"{self.API}/repos/{owner}/{repo}/issues/{num}/comments",
headers=headers, params={"per_page": 20}, timeout=15)
if cr.ok:
for c in cr.json():
comments_text += f"\n\n---\n**{c.get('user', {}).get('login', '')}** ({c.get('created_at', '')}):\n{c.get('body', '')}"
return ReadResult(
title=data.get("title", ""),
content=(data.get("body", "") or "") + comments_text,
url=url,
author=data.get("user", {}).get("login", ""),
date=data.get("created_at", ""),
platform="github",
extra={"state": data.get("state"), "comments": data.get("comments", 0),
"reactions": data.get("reactions", {}).get("total_count", 0)},
)
# Repo
resp = requests.get(f"{self.API}/repos/{owner}/{repo}", headers=headers, timeout=15)
resp.raise_for_status()
data = resp.json()
# Get README
readme_text = ""
rr = requests.get(f"{self.API}/repos/{owner}/{repo}/readme", headers=headers, timeout=15)
if rr.ok:
import base64
readme_data = rr.json()
if readme_data.get("encoding") == "base64":
readme_text = base64.b64decode(readme_data["content"]).decode("utf-8", errors="replace")
return ReadResult(
title=f"{owner}/{repo}",
content=readme_text or data.get("description", ""),
url=url,
author=owner,
platform="github",
extra={"stars": data.get("stargazers_count", 0), "forks": data.get("forks_count", 0),
"language": data.get("language", ""), "description": data.get("description", "")},
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
language = kwargs.get("language")
limit = kwargs.get("limit", 5)
q = query
if language:
q += f" language:{language}"
resp = requests.get(
f"{self.API}/search/repositories",
headers=self._headers(config),
params={"q": q, "sort": "stars", "per_page": min(limit, 30)},
timeout=15,
)
resp.raise_for_status()
results = []
for repo in resp.json().get("items", []):
results.append(SearchResult(
title=repo.get("full_name", ""),
url=repo.get("html_url", ""),
snippet=repo.get("description", ""),
date=repo.get("updated_at", ""),
extra={"stars": repo.get("stargazers_count", 0),
"forks": repo.get("forks_count", 0),
"language": repo.get("language", "")},
))
return results

View file

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
"""Reddit — via Reddit JSON API + optional proxy.
Backend: Reddit public JSON API (append .json to any URL)
Swap to: any Reddit access method
"""
import requests
from urllib.parse import urlparse
from .base import Channel, ReadResult
class RedditChannel(Channel):
name = "reddit"
description = "Reddit posts and comments"
backends = ["Reddit JSON API"]
requires_config = ["reddit_proxy"]
tier = 2
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
def can_handle(self, url: str) -> bool:
domain = urlparse(url).netloc.lower()
return "reddit.com" in domain or "redd.it" in domain
async def read(self, url: str, config=None) -> ReadResult:
proxy = config.get("reddit_proxy") if config else None
proxies = {"http": proxy, "https": proxy} if proxy else None
# Ensure URL ends with .json
json_url = url.rstrip("/")
if not json_url.endswith(".json"):
json_url += ".json"
resp = requests.get(
json_url,
headers={"User-Agent": self.USER_AGENT},
proxies=proxies,
params={"limit": 50},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
if isinstance(data, list) and len(data) >= 1:
# Post page: [post_listing, comments_listing]
post = data[0]["data"]["children"][0]["data"]
title = post.get("title", "")
author = post.get("author", "")
selftext = post.get("selftext", "")
score = post.get("score", 0)
subreddit = post.get("subreddit", "")
# Extract comments
comments_text = ""
if len(data) >= 2:
comments_text = self._extract_comments(data[1])
content = selftext
if comments_text:
content += f"\n\n---\n## Comments\n{comments_text}"
return ReadResult(
title=title,
content=content,
url=url,
author=f"u/{author}",
platform="reddit",
extra={"subreddit": subreddit, "score": score},
)
raise ValueError(f"Could not parse Reddit response for: {url}")
def _extract_comments(self, comments_data: dict, depth: int = 0, max_depth: int = 3) -> str:
"""Recursively extract comments."""
lines = []
children = comments_data.get("data", {}).get("children", [])
for child in children:
if child.get("kind") != "t1":
continue
data = child.get("data", {})
author = data.get("author", "[deleted]")
body = data.get("body", "")
score = data.get("score", 0)
indent = " " * depth
lines.append(f"{indent}**u/{author}** ({score} points):")
lines.append(f"{indent}{body}")
lines.append("")
# Recurse into replies
if depth < max_depth and data.get("replies") and isinstance(data["replies"], dict):
lines.append(self._extract_comments(data["replies"], depth + 1, max_depth))
return "\n".join(lines)

View file

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
"""RSS feeds — via feedparser (free, pip dependency).
Backend: feedparser (https://github.com/kurtmckee/feedparser)
Swap to: any RSS parser
"""
import feedparser
from urllib.parse import urlparse
from .base import Channel, ReadResult
class RSSChannel(Channel):
name = "rss"
description = "RSS and Atom feeds"
backends = ["feedparser"]
tier = 0
def can_handle(self, url: str) -> bool:
lower = url.lower()
domain = urlparse(url).netloc.lower()
return (lower.endswith(".xml") or "/rss" in lower or "/feed" in lower
or "/atom" in lower or "rss" in domain)
async def read(self, url: str, config=None) -> ReadResult:
feed = feedparser.parse(url)
if feed.bozo and not feed.entries:
raise ValueError(f"Failed to parse RSS feed: {url}")
if not feed.entries:
raise ValueError(f"No entries in RSS feed: {url}")
# Return latest entry
entry = feed.entries[0]
content = entry.get("summary", "") or entry.get("description", "")
# If multiple entries, summarize all
if len(feed.entries) > 1:
lines = [f"# {feed.feed.get('title', 'RSS Feed')}\n"]
for i, e in enumerate(feed.entries[:20], 1):
title = e.get("title", "Untitled")
link = e.get("link", "")
summary = e.get("summary", "")[:200]
lines.append(f"## {i}. {title}")
lines.append(f"🔗 {link}")
if summary:
lines.append(summary)
lines.append("")
content = "\n".join(lines)
return ReadResult(
title=feed.feed.get("title", entry.get("title", url)),
content=content,
url=url,
platform="rss",
)

View file

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
"""Twitter/X — via birdx CLI (free) or Jina Reader fallback.
Backend: birdx (https://github.com/runesleo/birdx) for search/timeline
Jina Reader for single tweets
Swap to: any Twitter access tool
"""
import shutil
import subprocess
from urllib.parse import urlparse
from .base import Channel, ReadResult, SearchResult
from typing import List
import requests
class TwitterChannel(Channel):
name = "twitter"
description = "Twitter/X posts, search, timelines"
backends = ["birdx", "Jina Reader"]
tier = 0 # Single tweet reading is zero-config
def can_handle(self, url: str) -> bool:
domain = urlparse(url).netloc.lower()
return "x.com" in domain or "twitter.com" in domain
def check(self, config=None):
# Basic reading always works (Jina fallback)
if shutil.which("birdx"):
return "ok", "birdx (full: search + timeline + threads)"
return "ok", "Jina Reader (single tweets only)"
async def read(self, url: str, config=None) -> ReadResult:
# Try birdx first
if shutil.which("birdx"):
return await self._read_birdx(url)
# Fallback: Jina Reader
return await self._read_jina(url)
async def _read_birdx(self, url: str) -> ReadResult:
result = subprocess.run(
["birdx", "read", url],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return await self._read_jina(url)
text = result.stdout.strip()
# Extract author from first line
author = ""
lines = text.split("\n")
if lines and lines[0].startswith("@"):
author = lines[0].split()[0]
return ReadResult(
title=text[:100],
content=text,
url=url,
author=author,
platform="twitter",
)
async def _read_jina(self, url: str) -> ReadResult:
resp = requests.get(
f"https://r.jina.ai/{url}",
headers={"Accept": "text/markdown"},
timeout=15,
)
resp.raise_for_status()
text = resp.text
title = text[:100] if text else url
return ReadResult(
title=title,
content=text,
url=url,
platform="twitter",
)
async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]:
limit = kwargs.get("limit", 10)
if shutil.which("birdx"):
return await self._search_birdx(query, limit)
# Fallback to Exa
return await self._search_exa(query, limit, config)
async def _search_birdx(self, query: str, limit: int) -> List[SearchResult]:
try:
result = subprocess.run(
["birdx", "search", query, "-n", str(limit)],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return []
return self._parse_birdx_output(result.stdout)
except (subprocess.TimeoutExpired, FileNotFoundError):
return []
def _parse_birdx_output(self, text: str) -> List[SearchResult]:
"""Parse birdx text output into SearchResults."""
results = []
current = {}
text_lines = []
for line in text.strip().split("\n"):
line = line.strip()
if line.startswith(""):
if current:
current["text"] = "\n".join(text_lines).strip()
results.append(SearchResult(
title=current.get("text", "")[:80],
url=current.get("url", ""),
snippet=current.get("text", ""),
author=current.get("author", ""),
date=current.get("date", ""),
))
current = {}
text_lines = []
continue
if line.startswith("@") and line.endswith(":") and "(" in line:
current["author"] = line.split()[0]
continue
if line.startswith("date:"):
current["date"] = line[5:].strip()
continue
if line.startswith("url:"):
current["url"] = line[4:].strip()
continue
if current is not None:
text_lines.append(line)
if current and text_lines:
current["text"] = "\n".join(text_lines).strip()
results.append(SearchResult(
title=current.get("text", "")[:80],
url=current.get("url", ""),
snippet=current.get("text", ""),
author=current.get("author", ""),
date=current.get("date", ""),
))
return results
async def _search_exa(self, query: str, limit: int, config=None) -> List[SearchResult]:
from agent_eyes.channels.exa_search import ExaSearchChannel
exa = ExaSearchChannel()
return await exa.search(f"site:x.com {query}", config=config, limit=limit)

View file

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
"""Web pages — via Jina Reader API (free, no config needed).
Backend: Jina Reader (https://r.jina.ai)
Swap to: Firecrawl, Trafilatura, or any other reader API
"""
import requests
from .base import Channel, ReadResult
class WebChannel(Channel):
name = "web"
description = "Web pages (any URL)"
backends = ["Jina Reader API"]
tier = 0
JINA_URL = "https://r.jina.ai/"
def can_handle(self, url: str) -> bool:
# Fallback — handles any URL not matched by other channels
return True
async def read(self, url: str, config=None) -> ReadResult:
resp = requests.get(
f"{self.JINA_URL}{url}",
headers={"Accept": "text/markdown"},
timeout=15,
)
resp.raise_for_status()
text = resp.text
# Extract title from first markdown heading
title = url
for line in text.split("\n"):
line = line.strip()
if line.startswith("# "):
title = line[2:].strip()
break
if line.startswith("Title:"):
title = line[6:].strip()
break
return ReadResult(
title=title,
content=text,
url=url,
platform="web",
)

View file

@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
"""YouTube — via yt-dlp (free, pip install yt-dlp).
Backend: yt-dlp (https://github.com/yt-dlp/yt-dlp)
Swap to: any YouTube subtitle extractor
"""
import json
import shutil
import subprocess
import tempfile
from pathlib import Path
from urllib.parse import urlparse, parse_qs
from .base import Channel, ReadResult
class YouTubeChannel(Channel):
name = "youtube"
description = "YouTube video transcripts"
backends = ["yt-dlp"]
requires_tools = ["yt-dlp"]
tier = 0
def can_handle(self, url: str) -> bool:
domain = urlparse(url).netloc.lower()
return "youtube.com" in domain or "youtu.be" in domain
async def read(self, url: str, config=None) -> ReadResult:
if not shutil.which("yt-dlp"):
raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp")
with tempfile.TemporaryDirectory() as tmpdir:
# Get video info
info = self._get_info(url)
title = info.get("title", url)
author = info.get("uploader", "")
# Try to get subtitles
transcript = self._get_subtitles(url, tmpdir)
if not transcript:
transcript = f"[Video: {title}]\n[No subtitles available. Use Groq Whisper for transcription.]"
return ReadResult(
title=title,
content=transcript,
url=url,
author=author,
platform="youtube",
extra={
"duration": info.get("duration"),
"view_count": info.get("view_count"),
"upload_date": info.get("upload_date"),
},
)
def _get_info(self, url: str) -> dict:
try:
result = subprocess.run(
["yt-dlp", "--dump-json", "--no-download", url],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
return json.loads(result.stdout)
except (subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return {}
def _get_subtitles(self, url: str, tmpdir: str) -> str:
"""Extract subtitles using yt-dlp."""
try:
subprocess.run(
["yt-dlp", "--write-auto-sub", "--write-sub",
"--sub-lang", "en,zh-Hans,zh",
"--skip-download", "--sub-format", "vtt",
"-o", f"{tmpdir}/%(id)s.%(ext)s", url],
capture_output=True, text=True, timeout=30,
)
# Find and read subtitle file
for f in Path(tmpdir).glob("*.vtt"):
text = f.read_text(errors="replace")
# Strip VTT headers and timestamps
lines = []
for line in text.split("\n"):
line = line.strip()
if not line or line.startswith("WEBVTT") or "-->" in line or line.isdigit():
continue
if line not in lines[-1:]: # deduplicate
lines.append(line)
return "\n".join(lines)
except subprocess.TimeoutExpired:
pass
return ""

View file

@ -2,6 +2,9 @@
"""
AgentEyes the unified entry point.
Pure glue: routes URLs to the right channel, routes searches to the right engine.
Every channel is a thin wrapper around an external tool. Swap any backend anytime.
Usage:
from agent_eyes import AgentEyes
@ -14,7 +17,7 @@ import asyncio
from typing import Any, Dict, List, Optional
from agent_eyes.config import Config
from agent_eyes.reader import UniversalReader
from agent_eyes.channels import get_channel_for_url, get_channel, get_all_channels
class AgentEyes:
@ -22,7 +25,6 @@ class AgentEyes:
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
self.reader = UniversalReader()
# ── Reading ─────────────────────────────────────────
@ -31,89 +33,60 @@ class AgentEyes:
Read content from any URL. Auto-detects platform.
Supported: Web, GitHub, Reddit, Twitter, YouTube,
Bilibili, WeChat, XiaoHongShu, RSS, Telegram, etc.
Bilibili, RSS, and more.
Returns:
Dict with title, content, url, author, etc.
Dict with title, content, url, author, platform, etc.
"""
content = await self.reader.read(url)
return content.to_dict()
if not url.startswith(("http://", "https://")):
url = f"https://{url}"
channel = get_channel_for_url(url)
result = await channel.read(url, config=self.config)
return result.to_dict()
async def read_batch(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Read multiple URLs concurrently."""
contents = await self.reader.read_batch(urls)
return [c.to_dict() for c in contents]
tasks = [self.read(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
def detect_platform(self, url: str) -> str:
"""Detect what platform a URL belongs to."""
return self.reader._detect_platform(url)
channel = get_channel_for_url(url)
return channel.name
# ── Searching ───────────────────────────────────────
async def search(self, query: str, num_results: int = 5) -> List[Dict[str, Any]]:
"""
Semantic web search via Exa. Requires Exa API key.
"""Semantic web search via Exa."""
ch = get_channel("exa_search")
results = await ch.search(query, config=self.config, limit=num_results)
return [r.to_dict() for r in results]
Args:
query: Search query
num_results: Number of results (max 10)
"""
from agent_eyes.search.exa import search_web
return await search_web(query, num_results=num_results, config=self.config)
async def search_reddit(self, query: str, subreddit: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
"""Search Reddit via Exa (bypasses IP blocks)."""
ch = get_channel("exa_search")
q = f"site:reddit.com/r/{subreddit} {query}" if subreddit else f"site:reddit.com {query}"
results = await ch.search(q, config=self.config, limit=limit)
return [r.to_dict() for r in results]
async def search_reddit(
self,
query: str,
subreddit: Optional[str] = None,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
Search Reddit via Exa (bypasses IP blocks).
async def search_github(self, query: str, language: Optional[str] = None, limit: int = 5) -> List[Dict[str, Any]]:
"""Search GitHub repositories."""
ch = get_channel("github")
results = await ch.search(query, config=self.config, language=language, limit=limit)
return [r.to_dict() for r in results]
Args:
query: Search query
subreddit: Optional subreddit filter
limit: Number of results
"""
from agent_eyes.search.reddit import search_reddit
return await search_reddit(query, subreddit=subreddit, num_results=limit, config=self.config)
async def search_github(
self,
query: str,
language: Optional[str] = None,
limit: int = 5,
) -> List[Dict[str, Any]]:
"""
Search GitHub repositories.
Args:
query: Search query
language: Filter by language
limit: Number of results
"""
from agent_eyes.search.github import search_github
return await search_github(query, language=language, limit=limit, config=self.config)
async def search_twitter(
self,
query: str,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
Search Twitter. Uses birdx if available, else Exa.
Args:
query: Search query
limit: Number of results
"""
from agent_eyes.search.twitter import search_twitter
return await search_twitter(query, limit=limit, config=self.config)
async def search_twitter(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search Twitter. Uses birdx if available, else Exa."""
ch = get_channel("twitter")
results = await ch.search(query, config=self.config, limit=limit)
return [r.to_dict() for r in results]
# ── Health ──────────────────────────────────────────
def doctor(self) -> Dict[str, dict]:
"""Check all platform/feature availability."""
"""Check all channel availability."""
from agent_eyes.doctor import check_all
return check_all(self.config)

View file

@ -1,187 +1,26 @@
# -*- coding: utf-8 -*-
"""Environment health checker for Agent Eyes.
"""Environment health checker — powered by channels.
Checks all platforms, tools, and API keys.
Outputs a rich-formatted status report.
Each channel knows how to check itself. Doctor just collects the results.
"""
import shutil
import subprocess
from typing import Dict
from agent_eyes.config import Config
STATUS_OK = "ok"
STATUS_WARN = "warn"
STATUS_OFF = "off"
STATUS_ERROR = "error"
def _check_command(cmd: str) -> bool:
"""Check if a command exists on PATH."""
return shutil.which(cmd) is not None
def _check_python_import(module: str) -> bool:
"""Check if a Python module is importable."""
try:
__import__(module)
return True
except ImportError:
return False
from agent_eyes.channels import get_all_channels
def check_all(config: Config) -> Dict[str, dict]:
"""Check all features and return status dict."""
"""Check all channels and return status dict."""
results = {}
# === Zero-config (always available) ===
results["web"] = {
"status": STATUS_OK,
"name": "Web Pages",
"message": "Jina Reader (built-in)",
"tier": 0,
}
results["github_read"] = {
"status": STATUS_OK,
"name": "GitHub",
"message": "Public API (built-in)",
"tier": 0,
}
results["bilibili"] = {
"status": STATUS_OK,
"name": "Bilibili",
"message": "Public API (built-in)",
"tier": 0,
}
results["rss"] = {
"status": STATUS_OK,
"name": "RSS",
"message": "feedparser (built-in)",
"tier": 0,
}
results["tweet_read"] = {
"status": STATUS_OK,
"name": "Tweet (single)",
"message": "Jina Reader (built-in)",
"tier": 0,
}
# YouTube — needs yt-dlp
if _check_command("yt-dlp"):
results["youtube"] = {
"status": STATUS_OK,
"name": "YouTube",
"message": "yt-dlp found",
"tier": 0,
for ch in get_all_channels():
status, message = ch.check(config)
results[ch.name] = {
"status": status,
"name": ch.description,
"message": message,
"tier": ch.tier,
"backends": ch.backends,
}
else:
results["youtube"] = {
"status": STATUS_WARN,
"name": "YouTube",
"message": "Install yt-dlp: pip install yt-dlp",
"tier": 0,
}
# === Needs API key (Tier 1) ===
exa_key = config.get("exa_api_key")
if exa_key:
results["search_web"] = {
"status": STATUS_OK,
"name": "Web Search",
"message": "Exa API configured",
"tier": 1,
}
results["search_reddit"] = {
"status": STATUS_OK,
"name": "Reddit Search",
"message": "Via Exa (site:reddit.com)",
"tier": 1,
}
results["search_twitter"] = {
"status": STATUS_OK,
"name": "Twitter Search",
"message": "Via Exa (site:x.com)",
"tier": 1,
}
else:
for key in ("search_web", "search_reddit", "search_twitter"):
results[key] = {
"status": STATUS_OFF,
"name": {"search_web": "Web Search", "search_reddit": "Reddit Search",
"search_twitter": "Twitter Search"}[key],
"message": "Need Exa API key (free). Run: agent-eyes setup",
"tier": 1,
}
# GitHub search (always works, token optional for higher limits)
github_token = config.get("github_token")
results["search_github"] = {
"status": STATUS_OK,
"name": "GitHub Search",
"message": f"API ({'authenticated' if github_token else 'public, 60 req/hr'})",
"tier": 0,
}
# === Optional tools (Tier 2) ===
# Twitter advanced (birdx)
if _check_command("birdx"):
results["twitter_advanced"] = {
"status": STATUS_OK,
"name": "Twitter Advanced",
"message": "birdx found",
"tier": 2,
}
else:
results["twitter_advanced"] = {
"status": STATUS_OFF,
"name": "Twitter Advanced",
"message": "Install birdx for timeline/deep search",
"tier": 2,
}
# Reddit full reader
reddit_proxy = config.get("reddit_proxy")
if reddit_proxy:
results["reddit_full"] = {
"status": STATUS_OK,
"name": "Reddit Reader",
"message": "Proxy configured",
"tier": 2,
}
else:
results["reddit_full"] = {
"status": STATUS_OFF,
"name": "Reddit Reader",
"message": "Need proxy for full post reading",
"tier": 2,
}
# WeChat / XHS (playwright)
if _check_python_import("playwright"):
results["wechat"] = {
"status": STATUS_OK,
"name": "WeChat",
"message": "Playwright available",
"tier": 2,
}
results["xhs"] = {
"status": STATUS_OK,
"name": "XiaoHongShu",
"message": "Playwright available",
"tier": 2,
}
else:
for key in ("wechat", "xhs"):
results[key] = {
"status": STATUS_OFF,
"name": "WeChat" if key == "wechat" else "XiaoHongShu",
"message": "pip install agent-eyes[browser]",
"tier": 2,
}
return results
@ -191,35 +30,39 @@ def format_report(results: Dict[str, dict]) -> str:
lines.append("👁️ Agent Eyes Status")
lines.append("=" * 40)
# Count stats
ok_count = sum(1 for r in results.values() if r["status"] == STATUS_OK)
ok_count = sum(1 for r in results.values() if r["status"] == "ok")
total = len(results)
# Group by tier
# Tier 0 — zero config
lines.append("")
lines.append("✅ Ready (no setup needed):")
for key, r in results.items():
if r["tier"] == 0 and r["status"] == STATUS_OK:
lines.append(f"{r['name']}")
elif r["tier"] == 0 and r["status"] == STATUS_WARN:
if r["tier"] == 0 and r["status"] == "ok":
backends = ", ".join(r["backends"]) if r["backends"] else "built-in"
lines.append(f"{r['name']} [{backends}]")
elif r["tier"] == 0 and r["status"] in ("warn", "off"):
lines.append(f" ⚠️ {r['name']}{r['message']}")
lines.append("")
lines.append("🔍 Search (need free Exa API key):")
for key, r in results.items():
if r["tier"] == 1:
icon = "" if r["status"] == STATUS_OK else ""
# Tier 1 — needs free key
tier1 = {k: r for k, r in results.items() if r["tier"] == 1}
if tier1:
lines.append("")
lines.append("🔍 Search (need free Exa API key):")
for key, r in tier1.items():
icon = "" if r["status"] == "ok" else ""
lines.append(f" {icon} {r['name']}")
lines.append("")
lines.append("🔧 Optional (advanced setup):")
for key, r in results.items():
if r["tier"] == 2:
icon = "" if r["status"] == STATUS_OK else ""
# Tier 2 — optional setup
tier2 = {k: r for k, r in results.items() if r["tier"] == 2}
if tier2:
lines.append("")
lines.append("🔧 Optional (advanced setup):")
for key, r in tier2.items():
icon = "" if r["status"] == "ok" else ""
lines.append(f" {icon} {r['name']}{r['message']}")
lines.append("")
lines.append(f"Status: {ok_count}/{total} platforms active")
lines.append(f"Status: {ok_count}/{total} channels active")
if ok_count < total:
lines.append("Run `agent-eyes setup` to unlock more!")

View file

@ -3,9 +3,8 @@
Agent Eyes MCP Server expose all capabilities as MCP tools.
Run: python -m agent_eyes.integrations.mcp_server
Or: agent-eyes serve (after pip install)
10 tools for any MCP-compatible AI Agent.
8 tools for any MCP-compatible AI Agent.
"""
import asyncio
@ -25,7 +24,6 @@ except ImportError:
def create_server():
"""Create and configure the MCP server."""
if not HAS_MCP:
print("MCP not installed. Install: pip install agent-eyes[mcp]", file=sys.stderr)
sys.exit(1)
@ -37,97 +35,30 @@ def create_server():
@server.list_tools()
async def list_tools():
return [
Tool(
name="read_url",
description="Read content from any URL. Supports: web pages, GitHub, Reddit, Twitter, YouTube, Bilibili, WeChat, XiaoHongShu, RSS, Telegram.",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to read"},
},
"required": ["url"],
},
),
Tool(
name="read_batch",
description="Read multiple URLs concurrently.",
inputSchema={
"type": "object",
"properties": {
"urls": {"type": "array", "items": {"type": "string"}, "description": "List of URLs"},
},
"required": ["urls"],
},
),
Tool(
name="detect_platform",
description="Detect what platform a URL belongs to (github, reddit, twitter, youtube, etc).",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to detect"},
},
"required": ["url"],
},
),
Tool(
name="search",
description="Semantic web search using Exa. Find any information on the internet.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"num_results": {"type": "integer", "description": "Number of results (1-10)", "default": 5},
},
"required": ["query"],
},
),
Tool(
name="search_reddit",
description="Search Reddit posts and discussions. Works even when Reddit blocks your IP.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"subreddit": {"type": "string", "description": "Optional subreddit filter (e.g. 'LocalLLaMA')"},
"limit": {"type": "integer", "description": "Number of results", "default": 10},
},
"required": ["query"],
},
),
Tool(
name="search_github",
description="Search GitHub repositories by topic, keyword, or technology.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"language": {"type": "string", "description": "Filter by language (e.g. 'python')"},
"limit": {"type": "integer", "description": "Number of results", "default": 5},
},
"required": ["query"],
},
),
Tool(
name="search_twitter",
description="Search Twitter/X posts. Uses birdx if available, otherwise Exa.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Number of results", "default": 10},
},
"required": ["query"],
},
),
Tool(
name="get_status",
description="Get Agent Eyes status: which platforms are active, which need configuration.",
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(name="read_url",
description="Read content from any URL. Supports: web, GitHub, Reddit, Twitter, YouTube, Bilibili, RSS.",
inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}),
Tool(name="read_batch",
description="Read multiple URLs concurrently.",
inputSchema={"type": "object", "properties": {"urls": {"type": "array", "items": {"type": "string"}}}, "required": ["urls"]}),
Tool(name="detect_platform",
description="Detect what platform a URL belongs to.",
inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}),
Tool(name="search",
description="Semantic web search via Exa.",
inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "num_results": {"type": "integer", "default": 5}}, "required": ["query"]}),
Tool(name="search_reddit",
description="Search Reddit posts.",
inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "subreddit": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}),
Tool(name="search_github",
description="Search GitHub repositories.",
inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "language": {"type": "string"}, "limit": {"type": "integer", "default": 5}}, "required": ["query"]}),
Tool(name="search_twitter",
description="Search Twitter/X posts.",
inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}),
Tool(name="get_status",
description="Get Agent Eyes status: which channels are active.",
inputSchema={"type": "object", "properties": {}}),
]
@server.call_tool()
@ -135,53 +66,25 @@ def create_server():
try:
if name == "read_url":
result = await eyes.read(arguments["url"])
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
elif name == "read_batch":
results = await eyes.read_batch(arguments["urls"])
return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))]
result = await eyes.read_batch(arguments["urls"])
elif name == "detect_platform":
platform = eyes.detect_platform(arguments["url"])
return [TextContent(type="text", text=f"Platform: {platform}")]
result = eyes.detect_platform(arguments["url"])
elif name == "search":
results = await eyes.search(
arguments["query"],
num_results=arguments.get("num_results", 5),
)
return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))]
result = await eyes.search(arguments["query"], arguments.get("num_results", 5))
elif name == "search_reddit":
results = await eyes.search_reddit(
arguments["query"],
subreddit=arguments.get("subreddit"),
limit=arguments.get("limit", 10),
)
return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))]
result = await eyes.search_reddit(arguments["query"], arguments.get("subreddit"), arguments.get("limit", 10))
elif name == "search_github":
results = await eyes.search_github(
arguments["query"],
language=arguments.get("language"),
limit=arguments.get("limit", 5),
)
return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))]
result = await eyes.search_github(arguments["query"], arguments.get("language"), arguments.get("limit", 5))
elif name == "search_twitter":
results = await eyes.search_twitter(
arguments["query"],
limit=arguments.get("limit", 10),
)
return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))]
result = await eyes.search_twitter(arguments["query"], arguments.get("limit", 10))
elif name == "get_status":
report = eyes.doctor_report()
return [TextContent(type="text", text=report)]
result = eyes.doctor_report()
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
result = f"Unknown tool: {name}"
text = json.dumps(result, ensure_ascii=False, indent=2) if isinstance(result, (dict, list)) else str(result)
return [TextContent(type="text", text=text)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]

View file

@ -1,184 +0,0 @@
# -*- coding: utf-8 -*-
"""
Universal Reader routes any URL to the right fetcher.
The core dispatcher: give it a URL, get back structured content.
"""
import asyncio
from urllib.parse import urlparse
from loguru import logger
from typing import Dict, Any, Optional
from agent_eyes.schema import (
UnifiedContent, UnifiedInbox, SourceType, MediaType,
from_bilibili, from_twitter, from_wechat,
from_xiaohongshu, from_youtube, from_rss, from_telegram,
)
from agent_eyes.readers.jina import fetch_via_jina
class UniversalReader:
"""
Routes URLs to platform-specific fetchers.
Falls back to Jina Reader for unknown platforms.
"""
def __init__(self, inbox: Optional[UnifiedInbox] = None):
self.inbox = inbox
def _detect_platform(self, url: str) -> str:
"""Detect platform from URL."""
domain = urlparse(url).netloc.lower()
if "mp.weixin.qq.com" in domain:
return "wechat"
if "x.com" in domain or "twitter.com" in domain:
return "twitter"
if "youtube.com" in domain or "youtu.be" in domain:
return "youtube"
if "xiaohongshu.com" in domain or "xhslink.com" in domain:
return "xhs"
if "bilibili.com" in domain or "b23.tv" in domain:
return "bilibili"
if "xiaoyuzhoufm.com" in domain:
return "podcast"
if "podcasts.apple.com" in domain:
return "podcast"
if "t.me" in domain or "telegram.org" in domain:
return "telegram"
if "reddit.com" in domain or "redd.it" in domain:
return "reddit"
if "github.com" in domain:
return "github"
if url.endswith(".xml") or "/rss" in url or "/feed" in url or "/atom" in url or "rss" in domain:
return "rss"
return "generic"
async def read(self, url: str) -> UnifiedContent:
"""
Fetch content from any URL and return as UnifiedContent.
The main entry point give it a URL, get back structured content.
"""
# Ensure URL has scheme
if not url.startswith(("http://", "https://")):
url = f"https://{url}"
platform = self._detect_platform(url)
logger.info(f"[{platform}] {url[:60]}...")
try:
content = await self._fetch(platform, url)
# Save to inbox if configured
if self.inbox:
if self.inbox.add(content):
self.inbox.save()
logger.info(f"Saved to inbox: {content.title[:50]}")
# Save to markdown output if configured
from agent_eyes.utils.storage import save_to_markdown
save_to_markdown(content)
return content
except Exception as e:
logger.error(f"[{platform}] Failed: {e}")
raise
async def _fetch(self, platform: str, url: str) -> UnifiedContent:
"""Dispatch to platform-specific fetcher."""
if platform == "bilibili":
from agent_eyes.readers.bilibili import fetch_bilibili
data = await fetch_bilibili(url)
return from_bilibili(data)
if platform == "twitter":
from agent_eyes.readers.twitter import fetch_twitter
data = await fetch_twitter(url)
return from_twitter(data)
if platform == "wechat":
from agent_eyes.readers.wechat import fetch_wechat
data = await fetch_wechat(url)
return from_wechat(data)
if platform == "xhs":
from agent_eyes.readers.xhs import fetch_xhs
data = await fetch_xhs(url)
return from_xiaohongshu(data)
if platform == "youtube":
from agent_eyes.readers.youtube import fetch_youtube
data = await fetch_youtube(url)
return from_youtube(data)
if platform == "rss":
from agent_eyes.readers.rss import fetch_rss
articles = await fetch_rss(url, limit=1)
if articles:
return from_rss(articles[0])
raise ValueError(f"No articles found in RSS feed: {url}")
if platform == "reddit":
from agent_eyes.readers.reddit import fetch_reddit
data = await fetch_reddit(url)
return UnifiedContent(
source_type=SourceType.REDDIT,
source_name=f"r/{data.get('subreddit', '')}",
title=data["title"],
content=data.get("content", ""),
url=data["url"],
media_type=MediaType.TEXT,
extra={"author": data.get("author", ""), "score": data.get("score", 0), "num_comments": data.get("num_comments", 0)},
)
if platform == "github":
from agent_eyes.readers.github import fetch_github
data = await fetch_github(url)
return UnifiedContent(
source_type=SourceType.GITHUB,
source_name=data.get("title", ""),
title=data["title"],
content=data.get("content", ""),
url=data["url"],
media_type=MediaType.TEXT,
extra={k: v for k, v in data.items() if k not in ("title", "content", "url", "platform")},
)
if platform == "telegram":
from agent_eyes.readers.telegram import fetch_telegram
# Extract channel username from t.me URL
path = urlparse(url).path.strip("/").split("/")[0]
channel = path if path else url
messages = await fetch_telegram(channel, limit=1)
if messages:
return from_telegram(messages[0], channel, channel)
raise ValueError(f"No messages from Telegram channel: {url}")
# Fallback: Jina Reader for any unknown URL
logger.info(f"Using Jina fallback for: {url}")
data = fetch_via_jina(url)
return UnifiedContent(
source_type=SourceType.MANUAL,
source_name=urlparse(url).netloc,
title=data["title"],
content=data["content"],
url=url,
)
async def read_batch(self, urls: list[str]) -> list[UnifiedContent]:
"""Fetch multiple URLs concurrently."""
tasks = [self.read(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
contents = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
logger.error(f"Batch failed for {url}: {result}")
else:
contents.append(result)
return contents

View file

@ -1,46 +0,0 @@
# -*- coding: utf-8 -*-
"""Bilibili video fetcher — uses official web API."""
import re
import requests
from loguru import logger
from typing import Dict, Any
API_URL = "https://api.bilibili.com/x/web-interface/view"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
}
async def fetch_bilibili(url_or_bv: str) -> Dict[str, Any]:
"""Fetch Bilibili video metadata via official API."""
logger.info(f"Fetching Bilibili: {url_or_bv}")
bv_id = url_or_bv
if "bilibili.com" in url_or_bv or "b23.tv" in url_or_bv:
match = re.search(r'BV\w+', url_or_bv)
if match:
bv_id = match.group()
else:
raise ValueError(f"Cannot extract BV ID from: {url_or_bv}")
resp = requests.get(API_URL, params={"bvid": bv_id}, headers=HEADERS, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
raise ValueError(f"Bilibili API error: {data.get('message')}")
video = data["data"]
return {
"title": video.get("title", ""),
"description": video.get("desc", ""),
"author": video.get("owner", {}).get("name", ""),
"url": f"https://www.bilibili.com/video/{bv_id}",
"cover": video.get("pic", ""),
"bvid": bv_id,
"duration": video.get("duration", 0),
"view_count": video.get("stat", {}).get("view", 0),
"platform": "bilibili",
}

View file

@ -1,88 +0,0 @@
# -*- coding: utf-8 -*-
"""
Playwright browser fetcher headless Chromium fallback for anti-scraping sites.
Used when Jina Reader fails (403/451/timeout). Supports persistent login
sessions via Playwright's storage_state for platforms requiring authentication.
Install: pip install "x-reader[browser]" && playwright install chromium
"""
from loguru import logger
from pathlib import Path
SESSION_DIR = Path.home() / ".x-reader" / "sessions"
TIMEOUT_MS = 30_000
async def fetch_via_browser(url: str, storage_state: str = None) -> dict:
"""
Fetch a URL using headless Chromium via Playwright.
Args:
url: Target URL to fetch.
storage_state: Path to a Playwright storage state JSON file (cookies/localStorage).
If provided, the browser context will load this session.
Returns:
dict with keys: title, content, url, author
"""
try:
from playwright.async_api import async_playwright
except ImportError:
raise RuntimeError(
"Playwright is not installed. Run:\n"
' pip install "x-reader[browser]"\n'
" playwright install chromium"
)
logger.info(f"Browser fetch: {url}")
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context_kwargs = {}
if storage_state and Path(storage_state).exists():
context_kwargs["storage_state"] = storage_state
logger.info(f"Using session: {storage_state}")
context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36",
**context_kwargs,
)
page = await context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded", timeout=TIMEOUT_MS)
# Extra wait for JS-heavy pages
await page.wait_for_timeout(2000)
title = await page.title()
# Extract main text content, stripping scripts/styles
content = await page.evaluate("""() => {
const el = document.querySelector('article')
|| document.querySelector('main')
|| document.querySelector('.content')
|| document.body;
return el ? el.innerText : '';
}""")
result = {
"title": (title or "").strip()[:200],
"content": (content or "").strip(),
"url": url,
"author": "",
}
logger.info(f"Browser fetch OK: {title[:60]}")
return result
finally:
await context.close()
await browser.close()
def get_session_path(platform: str) -> str:
"""Get the session file path for a platform."""
return str(SESSION_DIR / f"{platform}.json")

View file

@ -1,190 +0,0 @@
# -*- coding: utf-8 -*-
"""GitHub fetcher — extracts repo info, issues, PRs, and README content.
Uses GitHub public API (no token needed for public repos).
For higher rate limits, set GITHUB_TOKEN env var.
"""
import os
import re
import base64
import requests
from loguru import logger
from typing import Dict, Any, List, Optional
API_BASE = "https://api.github.com"
def _get_headers() -> Dict[str, str]:
"""Get request headers, optionally with auth token."""
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "AgentEyes/1.0",
}
token = os.environ.get("GITHUB_TOKEN")
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def _parse_github_url(url: str) -> Dict[str, str]:
"""Parse GitHub URL into components."""
# Match: github.com/owner/repo[/type/number]
match = re.search(
r'github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(issues|pull|tree|blob)/(.+))?/?$',
url
)
if not match:
raise ValueError(f"Cannot parse GitHub URL: {url}")
return {
"owner": match.group(1),
"repo": match.group(2),
"type": match.group(3), # issues, pull, tree, blob, or None
"ref": match.group(4), # issue number, branch, file path, or None
}
async def fetch_github(url: str) -> Dict[str, Any]:
"""Fetch content from a GitHub URL."""
logger.info(f"Fetching GitHub: {url}")
parsed = _parse_github_url(url)
owner = parsed["owner"]
repo = parsed["repo"]
content_type = parsed["type"]
ref = parsed["ref"]
headers = _get_headers()
if content_type == "issues" and ref:
return await _fetch_issue(owner, repo, ref, headers)
elif content_type == "pull" and ref:
return await _fetch_pull(owner, repo, ref, headers)
else:
return await _fetch_repo(owner, repo, headers)
async def _fetch_repo(owner: str, repo: str, headers: Dict) -> Dict[str, Any]:
"""Fetch repo info + README."""
# Get repo info
repo_resp = requests.get(f"{API_BASE}/repos/{owner}/{repo}", headers=headers, timeout=10)
repo_resp.raise_for_status()
repo_data = repo_resp.json()
# Get README
readme_content = ""
try:
readme_resp = requests.get(
f"{API_BASE}/repos/{owner}/{repo}/readme",
headers=headers, timeout=10,
)
if readme_resp.status_code == 200:
readme_data = readme_resp.json()
readme_content = base64.b64decode(readme_data.get("content", "")).decode("utf-8")
except Exception as e:
logger.warning(f"Could not fetch README: {e}")
return {
"title": f"{owner}/{repo}",
"content": readme_content or repo_data.get("description", ""),
"description": repo_data.get("description", ""),
"author": owner,
"url": repo_data.get("html_url", ""),
"stars": repo_data.get("stargazers_count", 0),
"forks": repo_data.get("forks_count", 0),
"language": repo_data.get("language", ""),
"topics": repo_data.get("topics", []),
"license": (repo_data.get("license") or {}).get("spdx_id", ""),
"platform": "github",
}
async def _fetch_issue(owner: str, repo: str, number: str, headers: Dict) -> Dict[str, Any]:
"""Fetch a GitHub issue with comments."""
issue_num = re.match(r'(\d+)', number).group(1)
# Get issue
resp = requests.get(
f"{API_BASE}/repos/{owner}/{repo}/issues/{issue_num}",
headers=headers, timeout=10,
)
resp.raise_for_status()
issue = resp.json()
# Get comments
comments_text = ""
if issue.get("comments", 0) > 0:
c_resp = requests.get(
f"{API_BASE}/repos/{owner}/{repo}/issues/{issue_num}/comments",
headers=headers, params={"per_page": 20}, timeout=10,
)
if c_resp.status_code == 200:
comments = c_resp.json()
parts = ["\n---\n## Comments\n"]
for c in comments:
parts.append(f"**@{c.get('user', {}).get('login', '?')}**:\n{c.get('body', '')}\n")
comments_text = "\n".join(parts)
return {
"title": f"[{owner}/{repo}#{issue_num}] {issue.get('title', '')}",
"content": (issue.get("body", "") or "") + comments_text,
"author": issue.get("user", {}).get("login", ""),
"url": issue.get("html_url", ""),
"state": issue.get("state", ""),
"labels": [l.get("name", "") for l in issue.get("labels", [])],
"platform": "github",
}
async def _fetch_pull(owner: str, repo: str, number: str, headers: Dict) -> Dict[str, Any]:
"""Fetch a GitHub pull request."""
pr_num = re.match(r'(\d+)', number).group(1)
resp = requests.get(
f"{API_BASE}/repos/{owner}/{repo}/pulls/{pr_num}",
headers=headers, timeout=10,
)
resp.raise_for_status()
pr = resp.json()
return {
"title": f"[{owner}/{repo}#{pr_num}] {pr.get('title', '')}",
"content": pr.get("body", "") or "",
"author": pr.get("user", {}).get("login", ""),
"url": pr.get("html_url", ""),
"state": pr.get("state", ""),
"merged": pr.get("merged", False),
"additions": pr.get("additions", 0),
"deletions": pr.get("deletions", 0),
"changed_files": pr.get("changed_files", 0),
"platform": "github",
}
async def search_github(query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Search GitHub repositories."""
logger.info(f"Searching GitHub: {query}")
resp = requests.get(
f"{API_BASE}/search/repositories",
headers=_get_headers(),
params={"q": query, "sort": "stars", "per_page": limit},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
results = []
for item in data.get("items", []):
results.append({
"title": item.get("full_name", ""),
"description": item.get("description", ""),
"url": item.get("html_url", ""),
"stars": item.get("stargazers_count", 0),
"language": item.get("language", ""),
"updated_at": item.get("updated_at", ""),
})
return results

View file

@ -1,63 +0,0 @@
# -*- coding: utf-8 -*-
"""
Jina Reader universal fallback for content extraction.
Uses https://r.jina.ai/{url} to extract markdown from any web page.
Free, no API key required, handles JS rendering and anti-scraping.
"""
import requests
from loguru import logger
JINA_BASE = "https://r.jina.ai"
TIMEOUT = 30
HEADERS = {
"Accept": "text/markdown",
"User-Agent": "x-reader/0.1",
}
def fetch_via_jina(url: str) -> dict:
"""
Fetch any URL via Jina Reader and return structured data.
Returns:
dict with keys: title, content, url, author (best-effort)
"""
jina_url = f"{JINA_BASE}/{url}"
logger.info(f"Jina fetch: {url}")
try:
resp = requests.get(jina_url, headers=HEADERS, timeout=TIMEOUT)
resp.raise_for_status()
text = resp.text
# Jina returns markdown; first line is usually the title
lines = text.strip().split("\n")
title = ""
content_lines = []
for line in lines:
if not title and line.strip():
# First non-empty line as title, strip markdown heading
title = line.lstrip("#").strip()
else:
content_lines.append(line)
content = "\n".join(content_lines).strip()
return {
"title": title[:200],
"content": content,
"url": url,
"author": "",
}
except requests.Timeout:
logger.error(f"Jina timeout: {url}")
raise
except requests.RequestException as e:
logger.error(f"Jina fetch failed: {url}{e}")
raise

View file

@ -1,132 +0,0 @@
# -*- coding: utf-8 -*-
"""Reddit fetcher — extracts posts and comments via JSON API.
Supports optional proxy via REDDIT_PROXY env var (many IPs are blocked by Reddit).
Example: REDDIT_PROXY=http://user:pass@host:port
"""
import os
import re
import requests
from loguru import logger
from typing import Dict, Any, List, Optional
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
}
def _get_proxies() -> Optional[Dict[str, str]]:
"""Get proxy config from env."""
proxy = os.environ.get("REDDIT_PROXY")
if proxy:
return {"http": proxy, "https": proxy}
return None
def _extract_post(post_data: Dict) -> Dict[str, Any]:
"""Extract post info from Reddit JSON."""
data = post_data.get("data", {})
return {
"title": data.get("title", ""),
"author": data.get("author", "[deleted]"),
"selftext": data.get("selftext", ""),
"score": data.get("score", 0),
"num_comments": data.get("num_comments", 0),
"url": f"https://www.reddit.com{data.get('permalink', '')}",
"created_utc": data.get("created_utc", 0),
"subreddit": data.get("subreddit", ""),
}
def _extract_comments(comments_data: Dict, limit: int = 20) -> List[Dict[str, str]]:
"""Extract top-level comments."""
comments = []
children = comments_data.get("data", {}).get("children", [])
for child in children[:limit]:
if child.get("kind") != "t1":
continue
data = child.get("data", {})
comments.append({
"author": data.get("author", "[deleted]"),
"body": data.get("body", ""),
"score": data.get("score", 0),
})
return comments
async def fetch_reddit(url: str) -> Dict[str, Any]:
"""Fetch Reddit post + comments via JSON API."""
logger.info(f"Fetching Reddit: {url}")
# Normalize URL and append .json
clean_url = re.sub(r'\?.*$', '', url.rstrip('/'))
json_url = f"{clean_url}.json"
resp = requests.get(
json_url,
headers=HEADERS,
proxies=_get_proxies(),
timeout=15,
)
resp.raise_for_status()
data = resp.json()
# Reddit returns [post_listing, comments_listing]
if not isinstance(data, list) or len(data) < 2:
raise ValueError(f"Unexpected Reddit response format")
post_listing = data[0].get("data", {}).get("children", [])
if not post_listing:
raise ValueError("No post found")
post = _extract_post(post_listing[0])
comments = _extract_comments(data[1])
# Build readable content
content_parts = [post["selftext"]] if post["selftext"] else []
if comments:
content_parts.append("\n---\n## Top Comments\n")
for c in comments:
content_parts.append(f"**u/{c['author']}** ({c['score']} pts):\n{c['body']}\n")
return {
"title": post["title"],
"content": "\n".join(content_parts),
"author": f"u/{post['author']}",
"url": post["url"],
"subreddit": post["subreddit"],
"score": post["score"],
"num_comments": post["num_comments"],
"platform": "reddit",
}
async def search_reddit(query: str, subreddit: str = None, limit: int = 10) -> List[Dict[str, Any]]:
"""Search Reddit posts."""
logger.info(f"Searching Reddit: {query} (sub={subreddit})")
if subreddit:
search_url = f"https://www.reddit.com/r/{subreddit}/search.json"
params = {"q": query, "restrict_sr": "on", "limit": limit, "sort": "relevance"}
else:
search_url = "https://www.reddit.com/search.json"
params = {"q": query, "limit": limit, "sort": "relevance"}
resp = requests.get(
search_url,
headers=HEADERS,
params=params,
proxies=_get_proxies(),
timeout=15,
)
resp.raise_for_status()
data = resp.json()
results = []
for child in data.get("data", {}).get("children", []):
results.append(_extract_post(child))
return results

View file

@ -1,47 +0,0 @@
# -*- coding: utf-8 -*-
"""RSS feed fetcher — uses feedparser."""
import feedparser
from loguru import logger
from typing import Dict, Any, List
async def fetch_rss(url: str, limit: int = 20) -> List[Dict[str, Any]]:
"""
Fetch and parse an RSS/Atom feed.
Args:
url: RSS feed URL
limit: Max number of entries to return
Returns:
List of article dicts with: title, summary, url, source, published
"""
logger.info(f"Fetching RSS: {url}")
feed = feedparser.parse(url)
if feed.bozo and not feed.entries:
raise ValueError(f"Failed to parse RSS feed: {feed.bozo_exception}")
source_name = feed.feed.get("title", url)
articles = []
for entry in feed.entries[:limit]:
summary = ""
if hasattr(entry, "summary"):
summary = entry.summary
elif hasattr(entry, "content"):
summary = entry.content[0].get("value", "")
articles.append({
"title": entry.get("title", ""),
"summary": summary,
"url": entry.get("link", ""),
"source": source_name,
"published": entry.get("published", ""),
"platform": "rss",
})
logger.info(f"RSS: {len(articles)} articles from {source_name}")
return articles

View file

@ -1,71 +0,0 @@
# -*- coding: utf-8 -*-
"""
Telegram channel fetcher uses Telethon.
Requires: pip install x-reader[telegram]
Requires: TG_API_ID + TG_API_HASH in .env
"""
import os
from datetime import datetime, timedelta, timezone
from loguru import logger
from typing import Dict, Any, List
async def fetch_telegram(
channel: str,
limit: int = 20,
hours: int = 24,
session_path: str = None,
) -> List[Dict[str, Any]]:
"""
Fetch recent messages from a Telegram channel.
Args:
channel: Channel username (e.g. 'predictionmkt')
limit: Max messages per channel
hours: Only fetch messages from the last N hours
session_path: Path to Telethon session file
Returns:
List of message dicts
"""
try:
from telethon import TelegramClient
from telethon.tl.types import Message
except ImportError:
raise ImportError(
"Telethon is required for Telegram fetching. "
"Install with: pip install x-reader[telegram]"
)
api_id = os.getenv("TG_API_ID", "")
api_hash = os.getenv("TG_API_HASH", "")
if not api_id or not api_hash:
raise ValueError("TG_API_ID and TG_API_HASH must be set in .env")
session = session_path or os.getenv("TG_SESSION_PATH", "./tg_session")
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
messages = []
async with TelegramClient(session, int(api_id), api_hash) as client:
logger.info(f"Fetching TG channel: {channel}")
entity = await client.get_entity(channel)
async for msg in client.iter_messages(entity, limit=limit):
if not isinstance(msg, Message) or not msg.text:
continue
if msg.date < cutoff:
break
messages.append({
"text": msg.text,
"views": msg.views or 0,
"date": msg.date.isoformat(),
"url": f"https://t.me/{channel}/{msg.id}",
"platform": "telegram",
})
logger.info(f"TG {channel}: {len(messages)} messages")
return messages

View file

@ -1,217 +0,0 @@
# -*- coding: utf-8 -*-
"""
X/Twitter fetcher three-tier fallback:
1. X oEmbed API (fast, reliable for individual tweets, no login needed)
2. Jina Reader (handles non-tweet X pages like profiles)
3. Playwright + saved session (handles login-required content)
Install browser tier: pip install "x-reader[browser]" && playwright install chromium
Save X session: x-reader login twitter
"""
import re
import requests
from loguru import logger
from typing import Dict, Any
from agent_eyes.readers.jina import fetch_via_jina
OEMBED_URL = "https://publish.twitter.com/oembed"
def _extract_author(url: str) -> str:
"""Extract @username from tweet URL."""
match = re.search(r'x\.com/(\w+)/status', url)
return f"@{match.group(1)}" if match else ""
def _is_tweet_url(url: str) -> bool:
"""Check if this is a direct tweet/status URL (vs profile or other X page)."""
return bool(re.search(r'x\.com/\w+/status/\d+', url))
def _fetch_via_oembed(url: str) -> Dict[str, Any]:
"""
Fetch tweet text via X's oEmbed API.
Free, reliable, no auth needed. Works for public tweets.
Note: oEmbed requires twitter.com URLs (not x.com).
"""
# oEmbed API requires twitter.com format
oembed_query_url = url.replace("x.com", "twitter.com")
resp = requests.get(
OEMBED_URL,
params={"url": oembed_query_url, "omit_script": "true"},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
# Strip HTML tags from the embedded HTML to get clean text
html = data.get("html", "")
text = re.sub(r'<[^>]+>', ' ', html)
text = re.sub(r'\s+', ' ', text).strip()
return {
"text": text,
"author": data.get("author_name", ""),
"author_url": data.get("author_url", ""),
"title": text[:100] if text else "",
}
async def _fetch_via_playwright(url: str) -> Dict[str, Any]:
"""
Fetch tweet via Playwright with X-specific DOM selectors.
Uses saved login session if available (~/.x-reader/sessions/twitter.json).
"""
try:
from playwright.async_api import async_playwright
except ImportError:
raise RuntimeError(
"Playwright not installed. Run:\n"
' pip install "x-reader[browser]"\n'
" playwright install chromium"
)
from agent_eyes.readers.browser import get_session_path
from pathlib import Path
session_path = get_session_path("twitter")
has_session = Path(session_path).exists()
if has_session:
logger.info(f"Using saved X session: {session_path}")
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context_kwargs = {}
if has_session:
context_kwargs["storage_state"] = session_path
context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36",
**context_kwargs,
)
page = await context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded", timeout=30_000)
# Wait for tweet text to render (X is a SPA, needs JS execution)
try:
await page.wait_for_selector(
'[data-testid="tweetText"]', timeout=10_000
)
except Exception:
pass # May not appear if login required
# Extract tweet content with X-specific selectors
tweet_text = await page.evaluate("""() => {
// Priority 1: tweet text element
const tweetEl = document.querySelector('[data-testid="tweetText"]');
if (tweetEl) return tweetEl.innerText;
// Priority 2: article element (thread view)
const article = document.querySelector('article');
if (article) return article.innerText;
// Priority 3: main content area
const main = document.querySelector('main');
if (main) return main.innerText;
return '';
}""")
title = await page.title()
return {
"text": (tweet_text or "").strip(),
"title": (title or "").strip()[:200],
}
finally:
await context.close()
await browser.close()
async def fetch_twitter(url: str) -> Dict[str, Any]:
"""
Fetch a tweet or X post with three-tier fallback.
Args:
url: Tweet URL (x.com or twitter.com)
Returns:
Dict with: text, author, url, title, platform
"""
url = url.replace("twitter.com", "x.com")
author = _extract_author(url)
# Tier 1: oEmbed API (best for individual tweets)
if _is_tweet_url(url):
try:
logger.info(f"[Twitter] Tier 1 — oEmbed: {url}")
data = _fetch_via_oembed(url)
if data.get("text") and len(data["text"].strip()) > 20:
return {
"text": data["text"],
"author": author or data.get("author", ""),
"url": url,
"title": data.get("title", ""),
"platform": "twitter",
}
logger.warning("[Twitter] oEmbed returned thin content")
except Exception as e:
logger.warning(f"[Twitter] oEmbed failed ({e})")
# Tier 2: Jina Reader (handles profiles, threads, non-tweet pages)
try:
logger.info(f"[Twitter] Tier 2 — Jina: {url}")
data = fetch_via_jina(url)
content = data.get("content", "")
title = data.get("title", "")
jina_ok = (
content
and len(content.strip()) > 100
and "not yet fully loaded" not in content.lower()
and title.lower() not in ("x", "title: x", "")
)
if jina_ok:
return {
"text": content,
"author": author,
"url": url,
"title": title,
"platform": "twitter",
}
logger.warning("[Twitter] Jina returned unusable content")
except Exception as e:
logger.warning(f"[Twitter] Jina failed ({e})")
# Tier 3: Playwright + session with X-specific extraction
try:
logger.info(f"[Twitter] Tier 3 — Playwright: {url}")
data = await _fetch_via_playwright(url)
content = data.get("text", "")
if content and len(content.strip()) > 20:
return {
"text": content,
"author": author,
"url": url,
"title": data.get("title", ""),
"platform": "twitter",
}
logger.warning("[Twitter] Playwright returned empty content")
except RuntimeError:
raise
except Exception as e:
logger.error(f"[Twitter] All methods failed: {e}")
raise RuntimeError(
f"❌ All Twitter fetch methods failed for: {url}\n"
f" Try: x-reader login twitter (to save session for browser fallback)\n"
f" Then retry: x-reader {url}"
)

View file

@ -1,62 +0,0 @@
# -*- coding: utf-8 -*-
"""
WeChat article fetcher two-tier fallback:
1. Jina Reader (fast, no deps)
2. Playwright headless (no login needed for public articles)
"""
from loguru import logger
from typing import Dict, Any
async def fetch_wechat(url: str) -> Dict[str, Any]:
"""
Fetch a WeChat public account article with fallback.
Args:
url: mp.weixin.qq.com article URL
Returns:
Dict with: title, content, author, url, platform
"""
# Tier 1: Jina Reader
try:
logger.info(f"[WeChat] Tier 1 — Jina: {url}")
from agent_eyes.readers.jina import fetch_via_jina
data = fetch_via_jina(url)
if data.get("content"):
return {
"title": data["title"],
"content": data["content"],
"author": data.get("author", ""),
"url": url,
"platform": "wechat",
}
logger.warning("[WeChat] Jina returned empty content, falling back to browser")
except Exception as e:
logger.warning(f"[WeChat] Jina failed ({e}), falling back to browser")
# Tier 2: Playwright headless (no session needed)
try:
logger.info(f"[WeChat] Tier 2 — Playwright headless: {url}")
from agent_eyes.readers.browser import fetch_via_browser
data = await fetch_via_browser(url)
return {
"title": data["title"],
"content": data["content"],
"author": data.get("author", ""),
"url": url,
"platform": "wechat",
}
except RuntimeError:
# Playwright not installed — re-raise with original Jina error context
raise
except Exception as e:
logger.error(f"[WeChat] Browser fetch also failed: {e}")
raise RuntimeError(
f"❌ All WeChat fetch methods failed.\n"
f" Last error: {e}"
)

View file

@ -1,78 +0,0 @@
# -*- coding: utf-8 -*-
"""
Xiaohongshu (RED) note fetcher three-tier fallback:
1. Jina Reader (fast, no deps)
2. Playwright + saved session (handles 451/403)
3. Error with login instructions
Install browser tier: pip install "x-reader[browser]" && playwright install chromium
"""
from loguru import logger
from typing import Dict, Any
from pathlib import Path
from agent_eyes.readers.jina import fetch_via_jina
async def fetch_xhs(url: str) -> Dict[str, Any]:
"""
Fetch a Xiaohongshu note with three-tier fallback.
Args:
url: xiaohongshu.com or xhslink.com URL
Returns:
Dict with: title, content, author, url, platform
"""
# Tier 1: Jina Reader
try:
logger.info(f"[XHS] Tier 1 — Jina: {url}")
data = fetch_via_jina(url)
if data.get("content"):
return {
"title": data["title"],
"content": data["content"],
"author": data.get("author", ""),
"url": url,
"platform": "xhs",
}
logger.warning("[XHS] Jina returned empty content, falling back to browser")
except Exception as e:
logger.warning(f"[XHS] Jina failed ({e}), falling back to browser")
# Tier 2: Playwright with session
from agent_eyes.readers.browser import get_session_path, SESSION_DIR
session_path = get_session_path("xhs")
if not Path(session_path).exists():
# Tier 3: No session — guide user
raise RuntimeError(
f"❌ XHS blocked Jina and no saved session found.\n"
f" Run: x-reader login xhs\n"
f" Then retry this URL."
)
try:
logger.info(f"[XHS] Tier 2 — Playwright with session: {url}")
from agent_eyes.readers.browser import fetch_via_browser
data = await fetch_via_browser(url, storage_state=session_path)
return {
"title": data["title"],
"content": data["content"],
"author": data.get("author", ""),
"url": url,
"platform": "xhs",
}
except RuntimeError:
# Playwright not installed
raise
except Exception as e:
logger.error(f"[XHS] Browser fetch also failed: {e}")
raise RuntimeError(
f"❌ All XHS fetch methods failed.\n"
f" Last error: {e}\n"
f" Try: x-reader login xhs (to refresh session)"
)

View file

@ -1,211 +0,0 @@
# -*- coding: utf-8 -*-
"""
YouTube video fetcher three-tier content extraction:
1. yt-dlp auto-subtitles (fastest, best quality for subtitled videos)
2. yt-dlp audio download Groq Whisper API transcription (for non-subtitled videos)
3. Jina Reader fallback (page description only)
Requires: yt-dlp installed (brew install yt-dlp / pip install yt-dlp)
Optional: GROQ_API_KEY env var for Whisper transcription
"""
import re
import os
import subprocess
import tempfile
from loguru import logger
from typing import Dict, Any
from agent_eyes.readers.jina import fetch_via_jina
def _extract_video_id(url: str) -> str:
"""Extract video ID from YouTube URL."""
match = re.search(r'(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})', url)
return match.group(1) if match else ""
def _get_subtitles_via_ytdlp(url: str, lang: str = "en") -> str:
"""
Download auto-generated subtitles using yt-dlp.
Returns subtitle text, or empty string if unavailable.
"""
with tempfile.TemporaryDirectory() as tmpdir:
output_path = os.path.join(tmpdir, "sub")
cmd = [
"yt-dlp",
"--write-auto-sub",
"--write-sub",
"--sub-lang", lang,
"--sub-format", "srt",
"--skip-download",
"-o", output_path,
url,
]
try:
subprocess.run(cmd, capture_output=True, text=True, timeout=60)
except FileNotFoundError:
logger.warning("yt-dlp not found. Install with: brew install yt-dlp")
return ""
except subprocess.TimeoutExpired:
logger.warning("yt-dlp subtitle download timed out")
return ""
for ext in [f".{lang}.srt", f".{lang}.vtt"]:
sub_file = output_path + ext
if os.path.exists(sub_file):
return _parse_srt(sub_file)
return ""
def _parse_srt(filepath: str) -> str:
"""Parse SRT file into clean text (strip timestamps and sequence numbers)."""
with open(filepath, 'r', encoding='utf-8') as f:
lines = f.readlines()
text_lines = []
seen = set()
for line in lines:
line = line.strip()
if not line or line.isdigit() or '-->' in line:
continue
if line.startswith('[') and line.endswith(']'):
continue
if line not in seen:
seen.add(line)
text_lines.append(line)
return " ".join(text_lines)
def _transcribe_via_whisper(url: str) -> str:
"""
Download audio with yt-dlp and transcribe via Groq Whisper API.
Requires: GROQ_API_KEY env var + yt-dlp + ffmpeg installed.
Groq Whisper limit: 25MB audio file.
Returns transcript text, or empty string if unavailable.
"""
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
logger.info("GROQ_API_KEY not set, skipping Whisper transcription")
return ""
with tempfile.TemporaryDirectory() as tmpdir:
output_template = os.path.join(tmpdir, "audio.%(ext)s")
cmd = [
"yt-dlp",
"-x",
"--audio-format", "m4a",
"--audio-quality", "5",
"-o", output_template,
"--no-playlist",
url,
]
try:
subprocess.run(cmd, capture_output=True, text=True, timeout=180)
except FileNotFoundError:
logger.warning("yt-dlp not found for audio download")
return ""
except subprocess.TimeoutExpired:
logger.warning("yt-dlp audio download timed out")
return ""
# Find the downloaded audio file
audio_path = os.path.join(tmpdir, "audio.m4a")
if not os.path.exists(audio_path):
for f in os.listdir(tmpdir):
if f.startswith("audio."):
audio_path = os.path.join(tmpdir, f)
break
else:
logger.warning("No audio file downloaded")
return ""
file_size = os.path.getsize(audio_path)
if file_size > 25 * 1024 * 1024:
logger.warning(f"Audio file too large ({file_size // 1024 // 1024}MB > 25MB limit)")
return ""
logger.info(f"Transcribing {file_size // 1024}KB audio via Groq Whisper...")
import requests
try:
with open(audio_path, "rb") as f:
response = requests.post(
"https://api.groq.com/openai/v1/audio/transcriptions",
headers={"Authorization": f"Bearer {api_key}"},
files={"file": (os.path.basename(audio_path), f, "audio/mp4")},
data={"model": "whisper-large-v3", "response_format": "text"},
timeout=120,
)
if response.status_code == 200:
transcript = response.text.strip()
logger.info(f"Whisper transcript: {len(transcript)} chars")
return transcript
else:
logger.warning(f"Groq Whisper API error: {response.status_code} {response.text[:200]}")
return ""
except Exception as e:
logger.warning(f"Whisper transcription failed: {e}")
return ""
async def fetch_youtube(url: str, sub_lang: str = "en") -> Dict[str, Any]:
"""
Fetch YouTube video content with three-tier extraction.
Strategy:
1. yt-dlp auto-subtitles (full transcript, fastest)
2. yt-dlp audio + Groq Whisper API (for non-subtitled videos)
3. Jina Reader fallback (page description only)
Args:
url: YouTube video URL
sub_lang: Subtitle language code (default: "en")
Returns:
Dict with: title, description, author, url, video_id, has_transcript, platform
"""
logger.info(f"Fetching YouTube: {url}")
video_id = _extract_video_id(url)
# Step 1: Get metadata via Jina (fast, always works)
jina_data = fetch_via_jina(url)
title = jina_data["title"]
# Step 2: Try yt-dlp auto-subtitles
logger.info(f"Extracting subtitles ({sub_lang})...")
transcript = _get_subtitles_via_ytdlp(url, lang=sub_lang)
# Step 3: No subtitles? Try Whisper transcription
if not transcript:
logger.info("No subtitles available, trying Whisper transcription...")
transcript = _transcribe_via_whisper(url)
if transcript:
logger.info(f"Got transcript: {len(transcript)} chars")
content = transcript
has_transcript = True
else:
logger.info("No transcript available, using page description")
content = jina_data["content"]
has_transcript = False
return {
"title": title,
"description": content,
"author": jina_data.get("author", ""),
"url": url,
"video_id": video_id,
"has_transcript": has_transcript,
"platform": "youtube",
}

View file

@ -1,283 +0,0 @@
# -*- coding: utf-8 -*-
"""
Unified content schema for Agent Eyes.
Defines the standard data format for all content sources:
- Telegram channels
- RSS feeds
- Bilibili videos
- Xiaohongshu (RED) notes
- WeChat articles
- X/Twitter posts
- YouTube videos
- Reddit posts
- GitHub repos/issues/PRs
- Web search results
- Manual input
"""
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from typing import Optional, List
from enum import Enum
import hashlib
import json
class SourceType(str, Enum):
"""Content source types."""
TELEGRAM = "telegram"
RSS = "rss"
BILIBILI = "bilibili"
XIAOHONGSHU = "xhs"
TWITTER = "twitter"
WECHAT = "wechat"
YOUTUBE = "youtube"
REDDIT = "reddit"
GITHUB = "github"
SEARCH = "search"
MANUAL = "manual"
class MediaType(str, Enum):
"""Media types."""
TEXT = "text"
VIDEO = "video"
AUDIO = "audio"
IMAGE = "image"
class Priority(str, Enum):
"""Content priority levels."""
HOT = "hot"
QUALITY = "quality"
DEEP = "deep"
NORMAL = "normal"
LOW = "low"
@dataclass
class UnifiedContent:
"""Unified content format across all platforms."""
# === Required ===
source_type: SourceType
source_name: str
title: str
content: str
url: str
# === Auto-generated ===
id: str = ""
fetched_at: str = ""
# === Media ===
media_type: MediaType = MediaType.TEXT
media_url: Optional[str] = None
# === Scoring ===
score: int = 0
priority: Priority = Priority.NORMAL
category: str = ""
tags: List[str] = field(default_factory=list)
# === Processing state ===
processed: bool = False
digest_date: Optional[str] = None
# === Translation ===
title_cn: Optional[str] = None
content_cn: Optional[str] = None
# === Metadata ===
extra: dict = field(default_factory=dict)
def __post_init__(self):
if not self.id:
self.id = hashlib.md5(self.url.encode()).hexdigest()[:12]
if not self.fetched_at:
self.fetched_at = datetime.now().isoformat()
def to_dict(self) -> dict:
d = asdict(self)
d['source_type'] = self.source_type.value
d['media_type'] = self.media_type.value
d['priority'] = self.priority.value
return d
@classmethod
def from_dict(cls, data: dict) -> 'UnifiedContent':
if isinstance(data.get('source_type'), str):
data['source_type'] = SourceType(data['source_type'])
if isinstance(data.get('media_type'), str):
data['media_type'] = MediaType(data['media_type'])
if isinstance(data.get('priority'), str):
data['priority'] = Priority(data['priority'])
known = {f.name for f in cls.__dataclass_fields__.values()}
data = {k: v for k, v in data.items() if k in known}
return cls(**data)
# =============================================================================
# Converters: platform-specific dict → UnifiedContent
# =============================================================================
def from_telegram(msg: dict, channel_name: str, channel_username: str) -> UnifiedContent:
return UnifiedContent(
source_type=SourceType.TELEGRAM,
source_name=channel_name,
title=msg.get('text', '')[:100],
content=msg.get('text', ''),
url=msg.get('url', f"https://t.me/{channel_username}"),
extra={"views": msg.get('views', 0), "channel_username": channel_username},
)
def from_rss(article: dict) -> UnifiedContent:
return UnifiedContent(
source_type=SourceType.RSS,
source_name=article.get('source', ''),
title=article.get('title', ''),
content=article.get('summary', ''),
url=article.get('url', article.get('link', '')),
score=article.get('score', 0),
category=article.get('category', ''),
title_cn=article.get('title_cn'),
content_cn=article.get('summary_cn'),
)
def from_bilibili(video: dict) -> UnifiedContent:
return UnifiedContent(
source_type=SourceType.BILIBILI,
source_name=video.get('author', ''),
title=video.get('title', ''),
content=video.get('description', ''),
url=video.get('url', ''),
media_type=MediaType.VIDEO,
media_url=video.get('cover', ''),
extra={
"bvid": video.get('bvid', ''),
"duration": video.get('duration', 0),
"view_count": video.get('view_count', 0),
},
)
def from_twitter(data: dict) -> UnifiedContent:
return UnifiedContent(
source_type=SourceType.TWITTER,
source_name=data.get('author', ''),
title=data.get('text', '')[:100],
content=data.get('text', ''),
url=data.get('url', ''),
extra={
"likes": data.get('likes', 0),
"retweets": data.get('retweets', 0),
},
)
def from_wechat(article: dict) -> UnifiedContent:
return UnifiedContent(
source_type=SourceType.WECHAT,
source_name=article.get('author', ''),
title=article.get('title', ''),
content=article.get('content', ''),
url=article.get('url', ''),
)
def from_xiaohongshu(note: dict) -> UnifiedContent:
return UnifiedContent(
source_type=SourceType.XIAOHONGSHU,
source_name=note.get('author', ''),
title=note.get('title', ''),
content=note.get('content', ''),
url=note.get('url', ''),
media_type=MediaType.IMAGE if note.get('images') else MediaType.TEXT,
extra={
"likes": note.get('likes', 0),
"collects": note.get('collects', 0),
},
)
def from_youtube(video: dict) -> UnifiedContent:
return UnifiedContent(
source_type=SourceType.YOUTUBE,
source_name=video.get('author', ''),
title=video.get('title', ''),
content=video.get('description', ''),
url=video.get('url', ''),
media_type=MediaType.VIDEO,
extra={
"duration": video.get('duration', ''),
"view_count": video.get('view_count', 0),
},
)
def from_manual(title: str, content: str, url: str = "") -> UnifiedContent:
return UnifiedContent(
source_type=SourceType.MANUAL,
source_name="manual",
title=title,
content=content,
url=url or f"manual://{hashlib.md5(title.encode()).hexdigest()[:8]}",
)
# =============================================================================
# Unified Inbox
# =============================================================================
class UnifiedInbox:
"""JSON-based content inbox with dedup."""
def __init__(self, filepath: str = "unified_inbox.json"):
self.filepath = filepath
self.items: List[UnifiedContent] = []
self.load()
def load(self):
import os
if os.path.exists(self.filepath):
try:
with open(self.filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
self.items = [UnifiedContent.from_dict(d) for d in data]
except (json.JSONDecodeError, IOError):
self.items = []
def save(self):
with open(self.filepath, 'w', encoding='utf-8') as f:
json.dump([item.to_dict() for item in self.items], f,
ensure_ascii=False, indent=2)
def add(self, item: UnifiedContent) -> bool:
if any(i.id == item.id for i in self.items):
return False
self.items.append(item)
return True
def add_batch(self, items: List[UnifiedContent]) -> int:
return sum(1 for item in items if self.add(item))
def get_unprocessed(self) -> List[UnifiedContent]:
return [i for i in self.items if not i.processed]
def get_by_source(self, source_type: SourceType) -> List[UnifiedContent]:
return [i for i in self.items if i.source_type == source_type]
def mark_processed(self, item_id: str, digest_date: str = None):
for item in self.items:
if item.id == item_id:
item.processed = True
if digest_date:
item.digest_date = digest_date
break
def clear_old(self, days: int = 7):
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
self.items = [i for i in self.items if i.fetched_at > cutoff]

View file

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
"""Search module init."""
from agent_eyes.search.exa import search_web
from agent_eyes.search.reddit import search_reddit
from agent_eyes.search.github import search_github, search_github_issues
from agent_eyes.search.twitter import search_twitter, get_user_tweets
__all__ = [
"search_web",
"search_reddit",
"search_github",
"search_github_issues",
"search_twitter",
"get_user_tweets",
]

View file

@ -1,79 +0,0 @@
# -*- coding: utf-8 -*-
"""Exa semantic web search.
Get a free API key at https://exa.ai (1000 searches/month free).
"""
import os
import requests
from loguru import logger
from typing import Any, Dict, List, Optional
EXA_API_URL = "https://api.exa.ai/search"
def _get_api_key(config=None) -> str:
"""Get Exa API key from config or env."""
if config:
key = config.get("exa_api_key")
if key:
return key
key = os.environ.get("EXA_API_KEY")
if key:
return key
raise ValueError(
"Exa API key not configured.\n"
"Get a free key at https://exa.ai (1000 searches/month free)\n"
"Then run: agent-eyes setup"
)
async def search_web(
query: str,
num_results: int = 5,
search_type: str = "auto",
config=None,
) -> List[Dict[str, Any]]:
"""
Semantic web search via Exa.
Args:
query: Search query (supports site: prefix)
num_results: Number of results (default 5, max 10)
search_type: "auto" / "neural" / "keyword"
config: Optional Config instance
Returns:
List of {title, url, snippet, published_date, score}
"""
api_key = _get_api_key(config)
logger.info(f"Exa search: {query} (n={num_results})")
resp = requests.post(
EXA_API_URL,
headers={
"Content-Type": "application/json",
"x-api-key": api_key,
},
json={
"query": query,
"numResults": min(num_results, 10),
"type": search_type,
"contents": {"text": {"maxCharacters": 500}},
},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
results = []
for item in data.get("results", []):
results.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": item.get("text", ""),
"published_date": item.get("publishedDate", ""),
"score": item.get("score", 0),
})
return results

View file

@ -1,118 +0,0 @@
# -*- coding: utf-8 -*-
"""GitHub search via public API (no key required)."""
import os
import requests
from loguru import logger
from typing import Any, Dict, List, Optional
GITHUB_API = "https://api.github.com"
def _get_headers(config=None) -> dict:
"""Get GitHub API headers with optional auth token."""
headers = {"Accept": "application/vnd.github+json"}
token = None
if config:
token = config.get("github_token")
if not token:
token = os.environ.get("GITHUB_TOKEN")
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
async def search_github(
query: str,
language: Optional[str] = None,
sort: str = "stars",
limit: int = 5,
config=None,
) -> List[Dict[str, Any]]:
"""
Search GitHub repositories.
Args:
query: Search query
language: Filter by language (e.g. "python")
sort: Sort by "stars" / "forks" / "updated"
limit: Number of results (max 30)
config: Optional Config instance
Returns:
List of {name, url, description, stars, language, updated}
"""
q = query
if language:
q += f" language:{language}"
logger.info(f"GitHub search: {q} (n={limit})")
resp = requests.get(
f"{GITHUB_API}/search/repositories",
headers=_get_headers(config),
params={"q": q, "sort": sort, "per_page": min(limit, 30)},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
results = []
for repo in data.get("items", []):
results.append({
"name": repo.get("full_name", ""),
"url": repo.get("html_url", ""),
"description": repo.get("description", ""),
"stars": repo.get("stargazers_count", 0),
"forks": repo.get("forks_count", 0),
"language": repo.get("language", ""),
"updated": repo.get("updated_at", ""),
"topics": repo.get("topics", []),
})
return results
async def search_github_issues(
query: str,
repo: Optional[str] = None,
state: str = "open",
limit: int = 5,
config=None,
) -> List[Dict[str, Any]]:
"""
Search GitHub issues and discussions.
Args:
query: Search query
repo: Filter by repo (e.g. "owner/repo")
state: "open" / "closed"
limit: Number of results
config: Optional Config instance
"""
q = query
if repo:
q += f" repo:{repo}"
q += f" state:{state}"
resp = requests.get(
f"{GITHUB_API}/search/issues",
headers=_get_headers(config),
params={"q": q, "sort": "reactions", "per_page": min(limit, 30)},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
results = []
for issue in data.get("items", []):
results.append({
"title": issue.get("title", ""),
"url": issue.get("html_url", ""),
"body": (issue.get("body", "") or "")[:500],
"state": issue.get("state", ""),
"comments": issue.get("comments", 0),
"reactions": issue.get("reactions", {}).get("total_count", 0),
"created": issue.get("created_at", ""),
})
return results

View file

@ -1,30 +0,0 @@
# -*- coding: utf-8 -*-
"""Reddit search via Exa (bypasses Reddit IP blocks)."""
from typing import Any, Dict, List, Optional
from agent_eyes.search.exa import search_web
async def search_reddit(
query: str,
subreddit: Optional[str] = None,
num_results: int = 10,
config=None,
) -> List[Dict[str, Any]]:
"""
Search Reddit content via Exa semantic search.
Args:
query: Search query
subreddit: Optional subreddit (e.g. "LocalLLaMA")
num_results: Number of results
config: Optional Config instance
Returns:
List of {title, url, snippet, published_date, score}
"""
if subreddit:
full_query = f"site:reddit.com/r/{subreddit} {query}"
else:
full_query = f"site:reddit.com {query}"
return await search_web(full_query, num_results=num_results, config=config)

View file

@ -1,140 +0,0 @@
# -*- coding: utf-8 -*-
"""Twitter search — uses birdx if available, falls back to Exa."""
import json
import shutil
import subprocess
from loguru import logger
from typing import Any, Dict, List, Optional
async def search_twitter(
query: str,
limit: int = 10,
config=None,
) -> List[Dict[str, Any]]:
"""
Search Twitter/X content.
Strategy:
1. If birdx is installed use it (full search, timeline, threads)
2. Otherwise use Exa with site:x.com (basic search)
Args:
query: Search query
limit: Number of results
config: Optional Config instance
Returns:
List of {author, text, url, likes, retweets, date}
"""
if shutil.which("birdx"):
return await _search_birdx(query, limit)
else:
return await _search_exa(query, limit, config)
async def _search_birdx(query: str, limit: int) -> List[Dict[str, Any]]:
"""Search Twitter via birdx CLI."""
logger.info(f"birdx search: {query} (n={limit})")
try:
# birdx --json returns [] for search, so use plain text output
result = subprocess.run(
["birdx", "search", query, "-n", str(limit)],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
logger.error(f"birdx search failed: {result.stderr}")
return []
return _parse_birdx_text(result.stdout)
except (subprocess.TimeoutExpired, FileNotFoundError) as e:
logger.error(f"birdx search failed: {e}")
return []
def _parse_birdx_text(text: str) -> List[Dict[str, Any]]:
"""Parse birdx plain text output into structured data.
Format:
@handle (Display Name):
Tweet text here
possibly multiple lines
date: Mon Feb 24 12:00:00 +0000 2026
url: https://x.com/handle/status/123
"""
results = []
current: Dict[str, Any] = {}
text_lines = []
for line in text.strip().split("\n"):
line = line.strip()
# Separator between tweets
if line.startswith(""):
if current:
if text_lines:
current["text"] = "\n".join(text_lines).strip()
results.append(current)
current = {}
text_lines = []
continue
# Author line: @handle (Display Name):
if line.startswith("@") and line.endswith(":") and "(" in line:
handle = line.split()[0]
current["author"] = handle
continue
# Date line
if line.startswith("date:"):
current["date"] = line[5:].strip()
continue
# URL line
if line.startswith("url:"):
current["url"] = line[4:].strip()
continue
# Content line
if current:
text_lines.append(line)
# Last tweet
if current:
if text_lines:
current["text"] = "\n".join(text_lines).strip()
results.append(current)
return results
async def _search_exa(query: str, limit: int, config=None) -> List[Dict[str, Any]]:
"""Search Twitter via Exa (site:x.com)."""
from agent_eyes.search.exa import search_web
return await search_web(
f"site:x.com {query}",
num_results=limit,
config=config,
)
async def get_user_tweets(
username: str,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""Get recent tweets from a user (requires birdx)."""
if not shutil.which("birdx"):
raise RuntimeError(
"birdx not installed. Install: pip install birdx\n"
"Then configure cookies: agent-eyes setup"
)
try:
result = subprocess.run(
["birdx", "user-tweets", f"@{username.lstrip('@')}", "-n", str(limit)],
capture_output=True, text=True, timeout=30,
)
return _parse_birdx_text(result.stdout)
except subprocess.TimeoutExpired:
logger.error("birdx timed out")
return []

View file

@ -1,88 +0,0 @@
# -*- coding: utf-8 -*-
"""
Storage utilities save content to JSON inbox and optional Markdown file.
Implements the "atomic archiving" from the tweet:
- unified_inbox.json (for AI/programmatic use)
- markdown file (for human reading, e.g. Obsidian)
"""
import json
import os
from datetime import datetime
from pathlib import Path
from loguru import logger
from agent_eyes.schema import UnifiedContent
def save_to_json(item: UnifiedContent, filepath: str = "unified_inbox.json"):
"""Append content to JSON inbox file."""
path = Path(filepath)
data = []
if path.exists():
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
except (json.JSONDecodeError, IOError):
data = []
data.append(item.to_dict())
# Keep last 500 entries to prevent unbounded growth
data = data[-500:]
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"Saved to JSON: {path}")
def save_to_markdown(item: UnifiedContent, filepath: str = None):
"""
Append content to a Markdown file (e.g. Obsidian vault).
Supports two output modes:
- OUTPUT_DIR: Write to {OUTPUT_DIR}/content_hub.md
- OBSIDIAN_VAULT: Write to {OBSIDIAN_VAULT}/01-收集箱/x-reader-inbox.md
If neither is set, skips markdown output.
"""
if not filepath:
# Priority 1: Obsidian vault
vault_path = os.getenv("OBSIDIAN_VAULT", "")
if vault_path:
filepath = os.path.join(vault_path, "01-收集箱", "x-reader-inbox.md")
else:
# Priority 2: generic output dir
output_dir = os.getenv("OUTPUT_DIR", "")
if not output_dir:
return
filepath = os.path.join(output_dir, "content_hub.md")
path = Path(filepath)
path.parent.mkdir(parents=True, exist_ok=True)
emoji = {
"telegram": "📢", "rss": "📰", "bilibili": "🎬",
"xhs": "📕", "twitter": "🐦", "wechat": "💬",
"youtube": "▶️", "manual": "✏️",
}.get(item.source_type.value, "📄")
with open(path, 'a', encoding='utf-8') as f:
f.write(f"\n## {emoji} {item.title}\n")
f.write(f"- Source: {item.source_name} ({item.source_type.value})\n")
f.write(f"- URL: {item.url}\n")
f.write(f"- Fetched: {item.fetched_at[:16]}\n\n")
f.write(f"{item.content[:2000]}\n")
f.write("\n---\n")
logger.info(f"Saved to Markdown: {path}")
def save_content(item: UnifiedContent, json_path: str = None, md_path: str = None):
"""Save content to both JSON and Markdown."""
inbox_file = json_path or os.getenv("INBOX_FILE", "unified_inbox.json")
save_to_json(item, inbox_file)
save_to_markdown(item, md_path)

114
tests/test_channels.py Normal file
View file

@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
"""Tests for the channel system."""
import pytest
from unittest.mock import patch, MagicMock
from agent_eyes.channels import get_channel_for_url, get_channel, get_all_channels
from agent_eyes.channels.base import ReadResult, SearchResult
class TestChannelRouting:
def test_github_url(self):
ch = get_channel_for_url("https://github.com/openai/gpt-4")
assert ch.name == "github"
def test_twitter_url(self):
ch = get_channel_for_url("https://x.com/elonmusk/status/123")
assert ch.name == "twitter"
def test_youtube_url(self):
ch = get_channel_for_url("https://youtube.com/watch?v=abc")
assert ch.name == "youtube"
def test_reddit_url(self):
ch = get_channel_for_url("https://reddit.com/r/test")
assert ch.name == "reddit"
def test_bilibili_url(self):
ch = get_channel_for_url("https://bilibili.com/video/BV1xx")
assert ch.name == "bilibili"
def test_rss_url(self):
ch = get_channel_for_url("https://example.com/feed.xml")
assert ch.name == "rss"
def test_generic_url_fallback(self):
ch = get_channel_for_url("https://example.com")
assert ch.name == "web"
def test_get_channel_by_name(self):
ch = get_channel("github")
assert ch is not None
assert ch.name == "github"
def test_all_channels_registered(self):
channels = get_all_channels()
names = [ch.name for ch in channels]
assert "web" in names
assert "github" in names
assert "twitter" in names
class TestReadResult:
def test_to_dict(self):
r = ReadResult(title="Test", content="Body", url="https://example.com", platform="web")
d = r.to_dict()
assert d["title"] == "Test"
assert d["content"] == "Body"
assert d["platform"] == "web"
def test_to_dict_optional_fields(self):
r = ReadResult(title="T", content="C", url="u", author="A", date="2025-01-01")
d = r.to_dict()
assert d["author"] == "A"
assert d["date"] == "2025-01-01"
class TestSearchResult:
def test_to_dict(self):
r = SearchResult(title="Test", url="https://example.com", snippet="A snippet")
d = r.to_dict()
assert d["title"] == "Test"
assert d["snippet"] == "A snippet"
class TestGitHubChannel:
@patch("agent_eyes.channels.github.requests.get")
@pytest.mark.asyncio
async def test_search(self, mock_get):
mock_resp = MagicMock()
mock_resp.json.return_value = {
"items": [{"full_name": "test/repo", "html_url": "https://github.com/test/repo",
"description": "A test", "stargazers_count": 100, "forks_count": 10,
"language": "Python", "updated_at": "2025-01-01"}]
}
mock_resp.raise_for_status = MagicMock()
mock_get.return_value = mock_resp
ch = get_channel("github")
results = await ch.search("test query")
assert len(results) == 1
assert results[0].title == "test/repo"
class TestExaSearch:
@patch("agent_eyes.channels.exa_search.requests.post")
@pytest.mark.asyncio
async def test_search(self, mock_post):
from agent_eyes.config import Config
config = Config(config_path="/tmp/test-exa-config.yaml")
config.set("exa_api_key", "test-key")
mock_resp = MagicMock()
mock_resp.json.return_value = {
"results": [{"title": "Result", "url": "https://example.com",
"text": "snippet", "publishedDate": "", "score": 0.9}]
}
mock_resp.raise_for_status = MagicMock()
mock_post.return_value = mock_resp
ch = get_channel("exa_search")
results = await ch.search("test", config=config)
assert len(results) == 1
assert results[0].title == "Result"

View file

@ -2,9 +2,6 @@
"""Tests for AgentEyes core class."""
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
from pathlib import Path
from agent_eyes.config import Config
from agent_eyes.core import AgentEyes
@ -18,22 +15,20 @@ def eyes(tmp_path):
class TestAgentEyes:
def test_init(self, eyes):
assert eyes.config is not None
assert eyes.reader is not None
def test_detect_platform(self, eyes):
assert eyes.detect_platform("https://github.com/openai/gpt-4") == "github"
assert eyes.detect_platform("https://github.com/test/repo") == "github"
assert eyes.detect_platform("https://reddit.com/r/test") == "reddit"
assert eyes.detect_platform("https://x.com/elonmusk/status/123") == "twitter"
assert eyes.detect_platform("https://x.com/user/status/123") == "twitter"
assert eyes.detect_platform("https://youtube.com/watch?v=abc") == "youtube"
assert eyes.detect_platform("https://bilibili.com/video/BV1xx") == "bilibili"
assert eyes.detect_platform("https://mp.weixin.qq.com/s/abc") == "wechat"
assert eyes.detect_platform("https://example.com") == "generic"
assert eyes.detect_platform("https://example.com") == "web"
def test_doctor(self, eyes):
results = eyes.doctor()
assert isinstance(results, dict)
assert "web" in results
assert "github_read" in results
assert "github" in results
def test_doctor_report(self, eyes):
report = eyes.doctor_report()

View file

@ -1,11 +1,9 @@
# -*- coding: utf-8 -*-
"""Tests for Agent Eyes doctor module."""
"""Tests for doctor module."""
import pytest
from unittest.mock import patch, MagicMock
from agent_eyes.config import Config
from agent_eyes.doctor import check_all, format_report, STATUS_OK, STATUS_OFF
from agent_eyes.doctor import check_all, format_report
@pytest.fixture
@ -14,54 +12,25 @@ def tmp_config(tmp_path):
class TestDoctor:
def test_zero_config_platforms_always_ok(self, tmp_config):
def test_zero_config_channels_ok(self, tmp_config):
results = check_all(tmp_config)
assert results["web"]["status"] == STATUS_OK
assert results["github_read"]["status"] == STATUS_OK
assert results["bilibili"]["status"] == STATUS_OK
assert results["rss"]["status"] == STATUS_OK
assert results["tweet_read"]["status"] == STATUS_OK
assert results["web"]["status"] == "ok"
assert results["github"]["status"] == "ok"
assert results["bilibili"]["status"] == "ok"
assert results["rss"]["status"] == "ok"
def test_search_off_without_exa_key(self, tmp_config):
def test_exa_off_without_key(self, tmp_config):
results = check_all(tmp_config)
assert results["search_web"]["status"] == STATUS_OFF
assert results["search_reddit"]["status"] == STATUS_OFF
assert results["search_twitter"]["status"] == STATUS_OFF
assert results["exa_search"]["status"] == "off"
def test_search_on_with_exa_key(self, tmp_config):
def test_exa_on_with_key(self, tmp_config):
tmp_config.set("exa_api_key", "test-key")
results = check_all(tmp_config)
assert results["search_web"]["status"] == STATUS_OK
assert results["search_reddit"]["status"] == STATUS_OK
assert results["search_twitter"]["status"] == STATUS_OK
assert results["exa_search"]["status"] == "ok"
def test_github_search_always_on(self, tmp_config):
results = check_all(tmp_config)
assert results["search_github"]["status"] == STATUS_OK
def test_reddit_full_off_without_proxy(self, tmp_config):
results = check_all(tmp_config)
assert results["reddit_full"]["status"] == STATUS_OFF
def test_reddit_full_on_with_proxy(self, tmp_config):
tmp_config.set("reddit_proxy", "http://user:pass@ip:port")
results = check_all(tmp_config)
assert results["reddit_full"]["status"] == STATUS_OK
@patch("agent_eyes.doctor._check_command")
def test_birdx_detection(self, mock_cmd, tmp_config):
mock_cmd.return_value = True
results = check_all(tmp_config)
assert results["twitter_advanced"]["status"] == STATUS_OK
def test_format_report_is_string(self, tmp_config):
def test_format_report(self, tmp_config):
results = check_all(tmp_config)
report = format_report(results)
assert isinstance(report, str)
assert "Agent Eyes" in report
assert "" in report
def test_format_report_shows_count(self, tmp_config):
results = check_all(tmp_config)
report = format_report(results)
assert "platforms active" in report
assert "channels active" in report

View file

@ -1,142 +0,0 @@
# -*- coding: utf-8 -*-
"""Tests for Agent Eyes search modules."""
import pytest
from unittest.mock import patch, MagicMock
from agent_eyes.config import Config
@pytest.fixture
def tmp_config(tmp_path):
c = Config(config_path=tmp_path / "config.yaml")
c.set("exa_api_key", "test-key")
return c
class TestExaSearch:
@patch("agent_eyes.search.exa.requests.post")
@pytest.mark.asyncio
async def test_search_web(self, mock_post, tmp_config):
mock_resp = MagicMock()
mock_resp.json.return_value = {
"results": [
{
"title": "Test Result",
"url": "https://example.com",
"text": "This is a test snippet",
"publishedDate": "2025-01-01",
"score": 0.95,
}
]
}
mock_resp.raise_for_status = MagicMock()
mock_post.return_value = mock_resp
from agent_eyes.search.exa import search_web
results = await search_web("test query", config=tmp_config)
assert len(results) == 1
assert results[0]["title"] == "Test Result"
assert results[0]["url"] == "https://example.com"
assert results[0]["snippet"] == "This is a test snippet"
def test_search_web_requires_key(self):
from agent_eyes.search.exa import _get_api_key
with pytest.raises(ValueError, match="Exa API key"):
_get_api_key(Config(config_path="/tmp/nonexistent/config.yaml"))
class TestRedditSearch:
@patch("agent_eyes.search.exa.requests.post")
@pytest.mark.asyncio
async def test_search_reddit(self, mock_post, tmp_config):
mock_resp = MagicMock()
mock_resp.json.return_value = {"results": []}
mock_resp.raise_for_status = MagicMock()
mock_post.return_value = mock_resp
from agent_eyes.search.reddit import search_reddit
results = await search_reddit("test", config=tmp_config)
# Verify it searched site:reddit.com
call_args = mock_post.call_args
query = call_args[1]["json"]["query"]
assert "site:reddit.com" in query
@patch("agent_eyes.search.exa.requests.post")
@pytest.mark.asyncio
async def test_search_reddit_with_sub(self, mock_post, tmp_config):
mock_resp = MagicMock()
mock_resp.json.return_value = {"results": []}
mock_resp.raise_for_status = MagicMock()
mock_post.return_value = mock_resp
from agent_eyes.search.reddit import search_reddit
await search_reddit("test", subreddit="LocalLLaMA", config=tmp_config)
call_args = mock_post.call_args
query = call_args[1]["json"]["query"]
assert "site:reddit.com/r/LocalLLaMA" in query
class TestGitHubSearch:
@patch("agent_eyes.search.github.requests.get")
@pytest.mark.asyncio
async def test_search_github(self, mock_get):
mock_resp = MagicMock()
mock_resp.json.return_value = {
"items": [
{
"full_name": "owner/repo",
"html_url": "https://github.com/owner/repo",
"description": "A test repo",
"stargazers_count": 100,
"forks_count": 20,
"language": "Python",
"updated_at": "2025-01-01",
"topics": ["ai"],
}
]
}
mock_resp.raise_for_status = MagicMock()
mock_get.return_value = mock_resp
from agent_eyes.search.github import search_github
results = await search_github("test")
assert len(results) == 1
assert results[0]["name"] == "owner/repo"
assert results[0]["stars"] == 100
@patch("agent_eyes.search.github.requests.get")
@pytest.mark.asyncio
async def test_search_github_with_language(self, mock_get):
mock_resp = MagicMock()
mock_resp.json.return_value = {"items": []}
mock_resp.raise_for_status = MagicMock()
mock_get.return_value = mock_resp
from agent_eyes.search.github import search_github
await search_github("test", language="python")
call_args = mock_get.call_args
assert "language:python" in call_args[1]["params"]["q"]
class TestTwitterSearch:
@patch("agent_eyes.search.twitter.shutil.which", return_value=None)
@patch("agent_eyes.search.exa.requests.post")
@pytest.mark.asyncio
async def test_search_twitter_exa_fallback(self, mock_post, mock_which, tmp_config):
mock_resp = MagicMock()
mock_resp.json.return_value = {"results": []}
mock_resp.raise_for_status = MagicMock()
mock_post.return_value = mock_resp
from agent_eyes.search.twitter import search_twitter
results = await search_twitter("test", config=tmp_config)
call_args = mock_post.call_args
query = call_args[1]["json"]["query"]
assert "site:x.com" in query