diff --git a/agent_eyes/channels/__init__.py b/agent_eyes/channels/__init__.py new file mode 100644 index 0000000..999eb68 --- /dev/null +++ b/agent_eyes/channels/__init__.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +""" +Channel registry — routes URLs to the right channel. + +This is the core of Agent Eyes' pluggable architecture. +Add a new channel: just create a file and register it here. +Swap a backend: just change the implementation inside the channel file. +""" + +from typing import Dict, List, Optional +from .base import Channel, ReadResult, SearchResult + +# Import all channels +from .web import WebChannel +from .github import GitHubChannel +from .twitter import TwitterChannel +from .youtube import YouTubeChannel +from .reddit import RedditChannel +from .rss import RSSChannel +from .bilibili import BilibiliChannel +from .exa_search import ExaSearchChannel + + +# Channel registry — order matters (first match wins, web is last as fallback) +ALL_CHANNELS: List[Channel] = [ + GitHubChannel(), + TwitterChannel(), + YouTubeChannel(), + RedditChannel(), + BilibiliChannel(), + RSSChannel(), + ExaSearchChannel(), + WebChannel(), # Fallback — handles any URL +] + +# Search-capable channels +SEARCH_CHANNELS: Dict[str, Channel] = { + ch.name: ch for ch in ALL_CHANNELS if ch.can_search() +} + + +def get_channel_for_url(url: str) -> Channel: + """Find the right channel for a URL.""" + for channel in ALL_CHANNELS: + if channel.can_handle(url): + return channel + return WebChannel() # Should never reach here, but just in case + + +def get_channel(name: str) -> Optional[Channel]: + """Get a channel by name.""" + for ch in ALL_CHANNELS: + if ch.name == name: + return ch + return None + + +def get_all_channels() -> List[Channel]: + """Get all registered channels.""" + return ALL_CHANNELS + + +__all__ = [ + "Channel", "ReadResult", "SearchResult", + "ALL_CHANNELS", "SEARCH_CHANNELS", + "get_channel_for_url", "get_channel", "get_all_channels", +] diff --git a/agent_eyes/channels/base.py b/agent_eyes/channels/base.py new file mode 100644 index 0000000..ff62bc7 --- /dev/null +++ b/agent_eyes/channels/base.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +""" +Channel base class — the universal interface for all platforms. + +Every channel (YouTube, Twitter, GitHub, etc.) implements this interface. +The backend tool can be swapped anytime without changing anything else. + +Example: + class YouTubeChannel(Channel): + name = "youtube" + backends = ["yt-dlp"] # current backend, can be swapped + + async def read(self, url, config): + # Just call yt-dlp, return standardized dict + ... +""" + +import shutil +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple + + +@dataclass +class ReadResult: + """Standardized read result. Every channel returns this.""" + title: str + content: str + url: str + author: str = "" + date: str = "" + platform: str = "" + extra: dict = None + + def __post_init__(self): + self.extra = self.extra or {} + + def to_dict(self) -> dict: + d = { + "title": self.title, + "content": self.content, + "url": self.url, + "platform": self.platform, + } + if self.author: + d["author"] = self.author + if self.date: + d["date"] = self.date + if self.extra: + d["extra"] = self.extra + return d + + +@dataclass +class SearchResult: + """Standardized search result.""" + title: str + url: str + snippet: str = "" + author: str = "" + date: str = "" + score: float = 0 + extra: dict = None + + def __post_init__(self): + self.extra = self.extra or {} + + def to_dict(self) -> dict: + d = { + "title": self.title, + "url": self.url, + "snippet": self.snippet, + } + if self.author: + d["author"] = self.author + if self.date: + d["date"] = self.date + if self.extra: + d["extra"] = self.extra + return d + + +class Channel(ABC): + """ + Base class for all channels. + + Subclasses just need to implement: + - read(url, config) → ReadResult + - can_handle(url) → bool + - check(config) → (status, message) + + Optionally: + - search(query, config, **kwargs) → list[SearchResult] + """ + + name: str = "" # e.g. "youtube" + description: str = "" # e.g. "YouTube video transcripts" + backends: List[str] = [] # e.g. ["yt-dlp"] — what external tool is used + requires_config: List[str] = [] # e.g. ["reddit_proxy"] + requires_tools: List[str] = [] # e.g. ["yt-dlp"] + tier: int = 0 # 0=zero-config, 1=needs free key, 2=needs setup + + @abstractmethod + async def read(self, url: str, config=None) -> ReadResult: + """Read content from a URL. Must return ReadResult.""" + ... + + @abstractmethod + def can_handle(self, url: str) -> bool: + """Check if this channel can handle this URL.""" + ... + + def check(self, config=None) -> Tuple[str, str]: + """ + Check if this channel is available. + Returns (status, message) where status is 'ok'/'warn'/'off'/'error'. + """ + # Check required tools + for tool in self.requires_tools: + if not shutil.which(tool): + return "off", f"Install: pip install {tool}" + + # Check required config + for key in self.requires_config: + if config and not config.get(key): + return "off", f"Need config: {key}. Run: agent-eyes setup" + + return "ok", f"{', '.join(self.backends) if self.backends else 'built-in'}" + + async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: + """Search this platform. Override if supported.""" + raise NotImplementedError(f"{self.name} does not support search") + + def can_search(self) -> bool: + """Whether this channel supports search.""" + try: + # Check if search is overridden + return type(self).search is not Channel.search + except: + return False diff --git a/agent_eyes/channels/bilibili.py b/agent_eyes/channels/bilibili.py new file mode 100644 index 0000000..09eb39e --- /dev/null +++ b/agent_eyes/channels/bilibili.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +"""Bilibili — via public API (free, no config needed). + +Backend: Bilibili public API +Swap to: any Bilibili access method +""" + +import requests +from urllib.parse import urlparse, parse_qs +from .base import Channel, ReadResult + + +class BilibiliChannel(Channel): + name = "bilibili" + description = "Bilibili video info and subtitles" + backends = ["Bilibili API"] + tier = 0 + + def can_handle(self, url: str) -> bool: + domain = urlparse(url).netloc.lower() + return "bilibili.com" in domain or "b23.tv" in domain + + async def read(self, url: str, config=None) -> ReadResult: + # Extract BV id from URL + path = urlparse(url).path + bv_id = "" + for part in path.split("/"): + if part.startswith("BV"): + bv_id = part + break + + if not bv_id: + # Fallback to Jina Reader + from agent_eyes.channels.web import WebChannel + return await WebChannel().read(url, config) + + # Get video info + resp = requests.get( + "https://api.bilibili.com/x/web-interface/view", + params={"bvid": bv_id}, + headers={"User-Agent": "Mozilla/5.0"}, + timeout=15, + ) + resp.raise_for_status() + data = resp.json().get("data", {}) + + title = data.get("title", "") + desc = data.get("desc", "") + author = data.get("owner", {}).get("name", "") + + # Try to get subtitles + subtitle_text = "" + subtitle_list = data.get("subtitle", {}).get("list", []) + if subtitle_list: + sub_url = subtitle_list[0].get("subtitle_url", "") + if sub_url: + if sub_url.startswith("//"): + sub_url = "https:" + sub_url + sr = requests.get(sub_url, timeout=10) + if sr.ok: + sub_data = sr.json() + lines = [item.get("content", "") for item in sub_data.get("body", [])] + subtitle_text = "\n".join(lines) + + content = desc + if subtitle_text: + content += f"\n\n## Transcript\n{subtitle_text}" + + return ReadResult( + title=title, + content=content, + url=url, + author=author, + platform="bilibili", + extra={"view": data.get("stat", {}).get("view", 0), + "like": data.get("stat", {}).get("like", 0)}, + ) diff --git a/agent_eyes/channels/exa_search.py b/agent_eyes/channels/exa_search.py new file mode 100644 index 0000000..3b94edb --- /dev/null +++ b/agent_eyes/channels/exa_search.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +"""Exa semantic search — the search backbone for Agent Eyes. + +Backend: Exa API (https://exa.ai) — free 1000 searches/month +Swap to: Tavily, SerpAPI, or any search API +""" + +import os +import requests +from .base import Channel, SearchResult +from typing import List + + +class ExaSearchChannel(Channel): + name = "exa_search" + description = "Semantic web search (powers Reddit/Twitter search too)" + backends = ["Exa API"] + requires_config = ["exa_api_key"] + tier = 1 + + API_URL = "https://api.exa.ai/search" + + def can_handle(self, url: str) -> bool: + return False # Search-only channel, doesn't read URLs + + async def read(self, url: str, config=None) -> None: + raise NotImplementedError("Exa is a search engine, not a reader") + + def _get_key(self, config=None) -> str: + if config: + key = config.get("exa_api_key") + if key: + return key + key = os.environ.get("EXA_API_KEY") + if key: + return key + raise ValueError( + "Exa API key not configured.\n" + "Get a free key at https://exa.ai (1000 searches/month free)\n" + "Then run: agent-eyes setup" + ) + + async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: + api_key = self._get_key(config) + limit = kwargs.get("limit", 5) + + resp = requests.post( + self.API_URL, + headers={"Content-Type": "application/json", "x-api-key": api_key}, + json={ + "query": query, + "numResults": min(limit, 10), + "type": "auto", + "contents": {"text": {"maxCharacters": 500}}, + }, + timeout=15, + ) + resp.raise_for_status() + + results = [] + for item in resp.json().get("results", []): + results.append(SearchResult( + title=item.get("title", ""), + url=item.get("url", ""), + snippet=item.get("text", ""), + date=item.get("publishedDate", ""), + score=item.get("score", 0), + )) + return results diff --git a/agent_eyes/channels/github.py b/agent_eyes/channels/github.py new file mode 100644 index 0000000..000840d --- /dev/null +++ b/agent_eyes/channels/github.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +"""GitHub — via GitHub REST API (free, no config needed). + +Backend: GitHub API v3 +Swap to: gh CLI, or any GitHub API wrapper +""" + +import requests +from urllib.parse import urlparse +from .base import Channel, ReadResult, SearchResult +from typing import List + + +class GitHubChannel(Channel): + name = "github" + description = "GitHub repos, issues, PRs, code" + backends = ["GitHub API"] + tier = 0 + + API = "https://api.github.com" + + def _headers(self, config=None): + h = {"Accept": "application/vnd.github+json"} + token = config.get("github_token") if config else None + if token: + h["Authorization"] = f"Bearer {token}" + return h + + def can_handle(self, url: str) -> bool: + domain = urlparse(url).netloc.lower() + return "github.com" in domain + + async def read(self, url: str, config=None) -> ReadResult: + path = urlparse(url).path.strip("/").split("/") + + if len(path) < 2: + raise ValueError(f"Invalid GitHub URL: {url}") + + owner, repo = path[0], path[1] + headers = self._headers(config) + + # Issues/PRs + if len(path) >= 4 and path[2] in ("issues", "pull"): + num = path[3] + resp = requests.get(f"{self.API}/repos/{owner}/{repo}/issues/{num}", headers=headers, timeout=15) + resp.raise_for_status() + data = resp.json() + + # Get comments + comments_text = "" + if data.get("comments", 0) > 0: + cr = requests.get(f"{self.API}/repos/{owner}/{repo}/issues/{num}/comments", + headers=headers, params={"per_page": 20}, timeout=15) + if cr.ok: + for c in cr.json(): + comments_text += f"\n\n---\n**{c.get('user', {}).get('login', '')}** ({c.get('created_at', '')}):\n{c.get('body', '')}" + + return ReadResult( + title=data.get("title", ""), + content=(data.get("body", "") or "") + comments_text, + url=url, + author=data.get("user", {}).get("login", ""), + date=data.get("created_at", ""), + platform="github", + extra={"state": data.get("state"), "comments": data.get("comments", 0), + "reactions": data.get("reactions", {}).get("total_count", 0)}, + ) + + # Repo + resp = requests.get(f"{self.API}/repos/{owner}/{repo}", headers=headers, timeout=15) + resp.raise_for_status() + data = resp.json() + + # Get README + readme_text = "" + rr = requests.get(f"{self.API}/repos/{owner}/{repo}/readme", headers=headers, timeout=15) + if rr.ok: + import base64 + readme_data = rr.json() + if readme_data.get("encoding") == "base64": + readme_text = base64.b64decode(readme_data["content"]).decode("utf-8", errors="replace") + + return ReadResult( + title=f"{owner}/{repo}", + content=readme_text or data.get("description", ""), + url=url, + author=owner, + platform="github", + extra={"stars": data.get("stargazers_count", 0), "forks": data.get("forks_count", 0), + "language": data.get("language", ""), "description": data.get("description", "")}, + ) + + async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: + language = kwargs.get("language") + limit = kwargs.get("limit", 5) + + q = query + if language: + q += f" language:{language}" + + resp = requests.get( + f"{self.API}/search/repositories", + headers=self._headers(config), + params={"q": q, "sort": "stars", "per_page": min(limit, 30)}, + timeout=15, + ) + resp.raise_for_status() + + results = [] + for repo in resp.json().get("items", []): + results.append(SearchResult( + title=repo.get("full_name", ""), + url=repo.get("html_url", ""), + snippet=repo.get("description", ""), + date=repo.get("updated_at", ""), + extra={"stars": repo.get("stargazers_count", 0), + "forks": repo.get("forks_count", 0), + "language": repo.get("language", "")}, + )) + return results diff --git a/agent_eyes/channels/reddit.py b/agent_eyes/channels/reddit.py new file mode 100644 index 0000000..e854246 --- /dev/null +++ b/agent_eyes/channels/reddit.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +"""Reddit — via Reddit JSON API + optional proxy. + +Backend: Reddit public JSON API (append .json to any URL) +Swap to: any Reddit access method +""" + +import requests +from urllib.parse import urlparse +from .base import Channel, ReadResult + + +class RedditChannel(Channel): + name = "reddit" + description = "Reddit posts and comments" + backends = ["Reddit JSON API"] + requires_config = ["reddit_proxy"] + tier = 2 + + USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" + + def can_handle(self, url: str) -> bool: + domain = urlparse(url).netloc.lower() + return "reddit.com" in domain or "redd.it" in domain + + async def read(self, url: str, config=None) -> ReadResult: + proxy = config.get("reddit_proxy") if config else None + proxies = {"http": proxy, "https": proxy} if proxy else None + + # Ensure URL ends with .json + json_url = url.rstrip("/") + if not json_url.endswith(".json"): + json_url += ".json" + + resp = requests.get( + json_url, + headers={"User-Agent": self.USER_AGENT}, + proxies=proxies, + params={"limit": 50}, + timeout=15, + ) + resp.raise_for_status() + data = resp.json() + + if isinstance(data, list) and len(data) >= 1: + # Post page: [post_listing, comments_listing] + post = data[0]["data"]["children"][0]["data"] + title = post.get("title", "") + author = post.get("author", "") + selftext = post.get("selftext", "") + score = post.get("score", 0) + subreddit = post.get("subreddit", "") + + # Extract comments + comments_text = "" + if len(data) >= 2: + comments_text = self._extract_comments(data[1]) + + content = selftext + if comments_text: + content += f"\n\n---\n## Comments\n{comments_text}" + + return ReadResult( + title=title, + content=content, + url=url, + author=f"u/{author}", + platform="reddit", + extra={"subreddit": subreddit, "score": score}, + ) + + raise ValueError(f"Could not parse Reddit response for: {url}") + + def _extract_comments(self, comments_data: dict, depth: int = 0, max_depth: int = 3) -> str: + """Recursively extract comments.""" + lines = [] + children = comments_data.get("data", {}).get("children", []) + + for child in children: + if child.get("kind") != "t1": + continue + data = child.get("data", {}) + author = data.get("author", "[deleted]") + body = data.get("body", "") + score = data.get("score", 0) + indent = " " * depth + + lines.append(f"{indent}**u/{author}** ({score} points):") + lines.append(f"{indent}{body}") + lines.append("") + + # Recurse into replies + if depth < max_depth and data.get("replies") and isinstance(data["replies"], dict): + lines.append(self._extract_comments(data["replies"], depth + 1, max_depth)) + + return "\n".join(lines) diff --git a/agent_eyes/channels/rss.py b/agent_eyes/channels/rss.py new file mode 100644 index 0000000..8beee50 --- /dev/null +++ b/agent_eyes/channels/rss.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +"""RSS feeds — via feedparser (free, pip dependency). + +Backend: feedparser (https://github.com/kurtmckee/feedparser) +Swap to: any RSS parser +""" + +import feedparser +from urllib.parse import urlparse +from .base import Channel, ReadResult + + +class RSSChannel(Channel): + name = "rss" + description = "RSS and Atom feeds" + backends = ["feedparser"] + tier = 0 + + def can_handle(self, url: str) -> bool: + lower = url.lower() + domain = urlparse(url).netloc.lower() + return (lower.endswith(".xml") or "/rss" in lower or "/feed" in lower + or "/atom" in lower or "rss" in domain) + + async def read(self, url: str, config=None) -> ReadResult: + feed = feedparser.parse(url) + + if feed.bozo and not feed.entries: + raise ValueError(f"Failed to parse RSS feed: {url}") + + if not feed.entries: + raise ValueError(f"No entries in RSS feed: {url}") + + # Return latest entry + entry = feed.entries[0] + content = entry.get("summary", "") or entry.get("description", "") + + # If multiple entries, summarize all + if len(feed.entries) > 1: + lines = [f"# {feed.feed.get('title', 'RSS Feed')}\n"] + for i, e in enumerate(feed.entries[:20], 1): + title = e.get("title", "Untitled") + link = e.get("link", "") + summary = e.get("summary", "")[:200] + lines.append(f"## {i}. {title}") + lines.append(f"🔗 {link}") + if summary: + lines.append(summary) + lines.append("") + content = "\n".join(lines) + + return ReadResult( + title=feed.feed.get("title", entry.get("title", url)), + content=content, + url=url, + platform="rss", + ) diff --git a/agent_eyes/channels/twitter.py b/agent_eyes/channels/twitter.py new file mode 100644 index 0000000..99ccb26 --- /dev/null +++ b/agent_eyes/channels/twitter.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +"""Twitter/X — via birdx CLI (free) or Jina Reader fallback. + +Backend: birdx (https://github.com/runesleo/birdx) for search/timeline + Jina Reader for single tweets +Swap to: any Twitter access tool +""" + +import shutil +import subprocess +from urllib.parse import urlparse +from .base import Channel, ReadResult, SearchResult +from typing import List +import requests + + +class TwitterChannel(Channel): + name = "twitter" + description = "Twitter/X posts, search, timelines" + backends = ["birdx", "Jina Reader"] + tier = 0 # Single tweet reading is zero-config + + def can_handle(self, url: str) -> bool: + domain = urlparse(url).netloc.lower() + return "x.com" in domain or "twitter.com" in domain + + def check(self, config=None): + # Basic reading always works (Jina fallback) + if shutil.which("birdx"): + return "ok", "birdx (full: search + timeline + threads)" + return "ok", "Jina Reader (single tweets only)" + + async def read(self, url: str, config=None) -> ReadResult: + # Try birdx first + if shutil.which("birdx"): + return await self._read_birdx(url) + # Fallback: Jina Reader + return await self._read_jina(url) + + async def _read_birdx(self, url: str) -> ReadResult: + result = subprocess.run( + ["birdx", "read", url], + capture_output=True, text=True, timeout=30, + ) + if result.returncode != 0: + return await self._read_jina(url) + + text = result.stdout.strip() + # Extract author from first line + author = "" + lines = text.split("\n") + if lines and lines[0].startswith("@"): + author = lines[0].split()[0] + + return ReadResult( + title=text[:100], + content=text, + url=url, + author=author, + platform="twitter", + ) + + async def _read_jina(self, url: str) -> ReadResult: + resp = requests.get( + f"https://r.jina.ai/{url}", + headers={"Accept": "text/markdown"}, + timeout=15, + ) + resp.raise_for_status() + text = resp.text + title = text[:100] if text else url + + return ReadResult( + title=title, + content=text, + url=url, + platform="twitter", + ) + + async def search(self, query: str, config=None, **kwargs) -> List[SearchResult]: + limit = kwargs.get("limit", 10) + + if shutil.which("birdx"): + return await self._search_birdx(query, limit) + + # Fallback to Exa + return await self._search_exa(query, limit, config) + + async def _search_birdx(self, query: str, limit: int) -> List[SearchResult]: + try: + result = subprocess.run( + ["birdx", "search", query, "-n", str(limit)], + capture_output=True, text=True, timeout=30, + ) + if result.returncode != 0: + return [] + + return self._parse_birdx_output(result.stdout) + except (subprocess.TimeoutExpired, FileNotFoundError): + return [] + + def _parse_birdx_output(self, text: str) -> List[SearchResult]: + """Parse birdx text output into SearchResults.""" + results = [] + current = {} + text_lines = [] + + for line in text.strip().split("\n"): + line = line.strip() + if line.startswith("─"): + if current: + current["text"] = "\n".join(text_lines).strip() + results.append(SearchResult( + title=current.get("text", "")[:80], + url=current.get("url", ""), + snippet=current.get("text", ""), + author=current.get("author", ""), + date=current.get("date", ""), + )) + current = {} + text_lines = [] + continue + if line.startswith("@") and line.endswith(":") and "(" in line: + current["author"] = line.split()[0] + continue + if line.startswith("date:"): + current["date"] = line[5:].strip() + continue + if line.startswith("url:"): + current["url"] = line[4:].strip() + continue + if current is not None: + text_lines.append(line) + + if current and text_lines: + current["text"] = "\n".join(text_lines).strip() + results.append(SearchResult( + title=current.get("text", "")[:80], + url=current.get("url", ""), + snippet=current.get("text", ""), + author=current.get("author", ""), + date=current.get("date", ""), + )) + return results + + async def _search_exa(self, query: str, limit: int, config=None) -> List[SearchResult]: + from agent_eyes.channels.exa_search import ExaSearchChannel + exa = ExaSearchChannel() + return await exa.search(f"site:x.com {query}", config=config, limit=limit) diff --git a/agent_eyes/channels/web.py b/agent_eyes/channels/web.py new file mode 100644 index 0000000..c424d84 --- /dev/null +++ b/agent_eyes/channels/web.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +"""Web pages — via Jina Reader API (free, no config needed). + +Backend: Jina Reader (https://r.jina.ai) +Swap to: Firecrawl, Trafilatura, or any other reader API +""" + +import requests +from .base import Channel, ReadResult + + +class WebChannel(Channel): + name = "web" + description = "Web pages (any URL)" + backends = ["Jina Reader API"] + tier = 0 + + JINA_URL = "https://r.jina.ai/" + + def can_handle(self, url: str) -> bool: + # Fallback — handles any URL not matched by other channels + return True + + async def read(self, url: str, config=None) -> ReadResult: + resp = requests.get( + f"{self.JINA_URL}{url}", + headers={"Accept": "text/markdown"}, + timeout=15, + ) + resp.raise_for_status() + text = resp.text + + # Extract title from first markdown heading + title = url + for line in text.split("\n"): + line = line.strip() + if line.startswith("# "): + title = line[2:].strip() + break + if line.startswith("Title:"): + title = line[6:].strip() + break + + return ReadResult( + title=title, + content=text, + url=url, + platform="web", + ) diff --git a/agent_eyes/channels/youtube.py b/agent_eyes/channels/youtube.py new file mode 100644 index 0000000..f938ba4 --- /dev/null +++ b/agent_eyes/channels/youtube.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +"""YouTube — via yt-dlp (free, pip install yt-dlp). + +Backend: yt-dlp (https://github.com/yt-dlp/yt-dlp) +Swap to: any YouTube subtitle extractor +""" + +import json +import shutil +import subprocess +import tempfile +from pathlib import Path +from urllib.parse import urlparse, parse_qs +from .base import Channel, ReadResult + + +class YouTubeChannel(Channel): + name = "youtube" + description = "YouTube video transcripts" + backends = ["yt-dlp"] + requires_tools = ["yt-dlp"] + tier = 0 + + def can_handle(self, url: str) -> bool: + domain = urlparse(url).netloc.lower() + return "youtube.com" in domain or "youtu.be" in domain + + async def read(self, url: str, config=None) -> ReadResult: + if not shutil.which("yt-dlp"): + raise RuntimeError("yt-dlp not installed. Install: pip install yt-dlp") + + with tempfile.TemporaryDirectory() as tmpdir: + # Get video info + info = self._get_info(url) + title = info.get("title", url) + author = info.get("uploader", "") + + # Try to get subtitles + transcript = self._get_subtitles(url, tmpdir) + + if not transcript: + transcript = f"[Video: {title}]\n[No subtitles available. Use Groq Whisper for transcription.]" + + return ReadResult( + title=title, + content=transcript, + url=url, + author=author, + platform="youtube", + extra={ + "duration": info.get("duration"), + "view_count": info.get("view_count"), + "upload_date": info.get("upload_date"), + }, + ) + + def _get_info(self, url: str) -> dict: + try: + result = subprocess.run( + ["yt-dlp", "--dump-json", "--no-download", url], + capture_output=True, text=True, timeout=30, + ) + if result.returncode == 0: + return json.loads(result.stdout) + except (subprocess.TimeoutExpired, json.JSONDecodeError): + pass + return {} + + def _get_subtitles(self, url: str, tmpdir: str) -> str: + """Extract subtitles using yt-dlp.""" + try: + subprocess.run( + ["yt-dlp", "--write-auto-sub", "--write-sub", + "--sub-lang", "en,zh-Hans,zh", + "--skip-download", "--sub-format", "vtt", + "-o", f"{tmpdir}/%(id)s.%(ext)s", url], + capture_output=True, text=True, timeout=30, + ) + + # Find and read subtitle file + for f in Path(tmpdir).glob("*.vtt"): + text = f.read_text(errors="replace") + # Strip VTT headers and timestamps + lines = [] + for line in text.split("\n"): + line = line.strip() + if not line or line.startswith("WEBVTT") or "-->" in line or line.isdigit(): + continue + if line not in lines[-1:]: # deduplicate + lines.append(line) + return "\n".join(lines) + except subprocess.TimeoutExpired: + pass + return "" diff --git a/agent_eyes/core.py b/agent_eyes/core.py index 7293858..b45101c 100644 --- a/agent_eyes/core.py +++ b/agent_eyes/core.py @@ -2,6 +2,9 @@ """ AgentEyes — the unified entry point. +Pure glue: routes URLs to the right channel, routes searches to the right engine. +Every channel is a thin wrapper around an external tool. Swap any backend anytime. + Usage: from agent_eyes import AgentEyes @@ -14,7 +17,7 @@ import asyncio from typing import Any, Dict, List, Optional from agent_eyes.config import Config -from agent_eyes.reader import UniversalReader +from agent_eyes.channels import get_channel_for_url, get_channel, get_all_channels class AgentEyes: @@ -22,7 +25,6 @@ class AgentEyes: def __init__(self, config: Optional[Config] = None): self.config = config or Config() - self.reader = UniversalReader() # ── Reading ───────────────────────────────────────── @@ -31,89 +33,60 @@ class AgentEyes: Read content from any URL. Auto-detects platform. Supported: Web, GitHub, Reddit, Twitter, YouTube, - Bilibili, WeChat, XiaoHongShu, RSS, Telegram, etc. + Bilibili, RSS, and more. Returns: - Dict with title, content, url, author, etc. + Dict with title, content, url, author, platform, etc. """ - content = await self.reader.read(url) - return content.to_dict() + if not url.startswith(("http://", "https://")): + url = f"https://{url}" + + channel = get_channel_for_url(url) + result = await channel.read(url, config=self.config) + return result.to_dict() async def read_batch(self, urls: List[str]) -> List[Dict[str, Any]]: """Read multiple URLs concurrently.""" - contents = await self.reader.read_batch(urls) - return [c.to_dict() for c in contents] + tasks = [self.read(url) for url in urls] + results = await asyncio.gather(*tasks, return_exceptions=True) + return [r for r in results if not isinstance(r, Exception)] def detect_platform(self, url: str) -> str: """Detect what platform a URL belongs to.""" - return self.reader._detect_platform(url) + channel = get_channel_for_url(url) + return channel.name # ── Searching ─────────────────────────────────────── async def search(self, query: str, num_results: int = 5) -> List[Dict[str, Any]]: - """ - Semantic web search via Exa. Requires Exa API key. + """Semantic web search via Exa.""" + ch = get_channel("exa_search") + results = await ch.search(query, config=self.config, limit=num_results) + return [r.to_dict() for r in results] - Args: - query: Search query - num_results: Number of results (max 10) - """ - from agent_eyes.search.exa import search_web - return await search_web(query, num_results=num_results, config=self.config) + async def search_reddit(self, query: str, subreddit: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: + """Search Reddit via Exa (bypasses IP blocks).""" + ch = get_channel("exa_search") + q = f"site:reddit.com/r/{subreddit} {query}" if subreddit else f"site:reddit.com {query}" + results = await ch.search(q, config=self.config, limit=limit) + return [r.to_dict() for r in results] - async def search_reddit( - self, - query: str, - subreddit: Optional[str] = None, - limit: int = 10, - ) -> List[Dict[str, Any]]: - """ - Search Reddit via Exa (bypasses IP blocks). + async def search_github(self, query: str, language: Optional[str] = None, limit: int = 5) -> List[Dict[str, Any]]: + """Search GitHub repositories.""" + ch = get_channel("github") + results = await ch.search(query, config=self.config, language=language, limit=limit) + return [r.to_dict() for r in results] - Args: - query: Search query - subreddit: Optional subreddit filter - limit: Number of results - """ - from agent_eyes.search.reddit import search_reddit - return await search_reddit(query, subreddit=subreddit, num_results=limit, config=self.config) - - async def search_github( - self, - query: str, - language: Optional[str] = None, - limit: int = 5, - ) -> List[Dict[str, Any]]: - """ - Search GitHub repositories. - - Args: - query: Search query - language: Filter by language - limit: Number of results - """ - from agent_eyes.search.github import search_github - return await search_github(query, language=language, limit=limit, config=self.config) - - async def search_twitter( - self, - query: str, - limit: int = 10, - ) -> List[Dict[str, Any]]: - """ - Search Twitter. Uses birdx if available, else Exa. - - Args: - query: Search query - limit: Number of results - """ - from agent_eyes.search.twitter import search_twitter - return await search_twitter(query, limit=limit, config=self.config) + async def search_twitter(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """Search Twitter. Uses birdx if available, else Exa.""" + ch = get_channel("twitter") + results = await ch.search(query, config=self.config, limit=limit) + return [r.to_dict() for r in results] # ── Health ────────────────────────────────────────── def doctor(self) -> Dict[str, dict]: - """Check all platform/feature availability.""" + """Check all channel availability.""" from agent_eyes.doctor import check_all return check_all(self.config) diff --git a/agent_eyes/doctor.py b/agent_eyes/doctor.py index 08c8742..becee0d 100644 --- a/agent_eyes/doctor.py +++ b/agent_eyes/doctor.py @@ -1,187 +1,26 @@ # -*- coding: utf-8 -*- -"""Environment health checker for Agent Eyes. +"""Environment health checker — powered by channels. -Checks all platforms, tools, and API keys. -Outputs a rich-formatted status report. +Each channel knows how to check itself. Doctor just collects the results. """ -import shutil -import subprocess from typing import Dict - from agent_eyes.config import Config - - -STATUS_OK = "ok" -STATUS_WARN = "warn" -STATUS_OFF = "off" -STATUS_ERROR = "error" - - -def _check_command(cmd: str) -> bool: - """Check if a command exists on PATH.""" - return shutil.which(cmd) is not None - - -def _check_python_import(module: str) -> bool: - """Check if a Python module is importable.""" - try: - __import__(module) - return True - except ImportError: - return False +from agent_eyes.channels import get_all_channels def check_all(config: Config) -> Dict[str, dict]: - """Check all features and return status dict.""" + """Check all channels and return status dict.""" results = {} - - # === Zero-config (always available) === - results["web"] = { - "status": STATUS_OK, - "name": "Web Pages", - "message": "Jina Reader (built-in)", - "tier": 0, - } - results["github_read"] = { - "status": STATUS_OK, - "name": "GitHub", - "message": "Public API (built-in)", - "tier": 0, - } - results["bilibili"] = { - "status": STATUS_OK, - "name": "Bilibili", - "message": "Public API (built-in)", - "tier": 0, - } - results["rss"] = { - "status": STATUS_OK, - "name": "RSS", - "message": "feedparser (built-in)", - "tier": 0, - } - results["tweet_read"] = { - "status": STATUS_OK, - "name": "Tweet (single)", - "message": "Jina Reader (built-in)", - "tier": 0, - } - - # YouTube — needs yt-dlp - if _check_command("yt-dlp"): - results["youtube"] = { - "status": STATUS_OK, - "name": "YouTube", - "message": "yt-dlp found", - "tier": 0, + for ch in get_all_channels(): + status, message = ch.check(config) + results[ch.name] = { + "status": status, + "name": ch.description, + "message": message, + "tier": ch.tier, + "backends": ch.backends, } - else: - results["youtube"] = { - "status": STATUS_WARN, - "name": "YouTube", - "message": "Install yt-dlp: pip install yt-dlp", - "tier": 0, - } - - # === Needs API key (Tier 1) === - exa_key = config.get("exa_api_key") - if exa_key: - results["search_web"] = { - "status": STATUS_OK, - "name": "Web Search", - "message": "Exa API configured", - "tier": 1, - } - results["search_reddit"] = { - "status": STATUS_OK, - "name": "Reddit Search", - "message": "Via Exa (site:reddit.com)", - "tier": 1, - } - results["search_twitter"] = { - "status": STATUS_OK, - "name": "Twitter Search", - "message": "Via Exa (site:x.com)", - "tier": 1, - } - else: - for key in ("search_web", "search_reddit", "search_twitter"): - results[key] = { - "status": STATUS_OFF, - "name": {"search_web": "Web Search", "search_reddit": "Reddit Search", - "search_twitter": "Twitter Search"}[key], - "message": "Need Exa API key (free). Run: agent-eyes setup", - "tier": 1, - } - - # GitHub search (always works, token optional for higher limits) - github_token = config.get("github_token") - results["search_github"] = { - "status": STATUS_OK, - "name": "GitHub Search", - "message": f"API ({'authenticated' if github_token else 'public, 60 req/hr'})", - "tier": 0, - } - - # === Optional tools (Tier 2) === - - # Twitter advanced (birdx) - if _check_command("birdx"): - results["twitter_advanced"] = { - "status": STATUS_OK, - "name": "Twitter Advanced", - "message": "birdx found", - "tier": 2, - } - else: - results["twitter_advanced"] = { - "status": STATUS_OFF, - "name": "Twitter Advanced", - "message": "Install birdx for timeline/deep search", - "tier": 2, - } - - # Reddit full reader - reddit_proxy = config.get("reddit_proxy") - if reddit_proxy: - results["reddit_full"] = { - "status": STATUS_OK, - "name": "Reddit Reader", - "message": "Proxy configured", - "tier": 2, - } - else: - results["reddit_full"] = { - "status": STATUS_OFF, - "name": "Reddit Reader", - "message": "Need proxy for full post reading", - "tier": 2, - } - - # WeChat / XHS (playwright) - if _check_python_import("playwright"): - results["wechat"] = { - "status": STATUS_OK, - "name": "WeChat", - "message": "Playwright available", - "tier": 2, - } - results["xhs"] = { - "status": STATUS_OK, - "name": "XiaoHongShu", - "message": "Playwright available", - "tier": 2, - } - else: - for key in ("wechat", "xhs"): - results[key] = { - "status": STATUS_OFF, - "name": "WeChat" if key == "wechat" else "XiaoHongShu", - "message": "pip install agent-eyes[browser]", - "tier": 2, - } - return results @@ -191,35 +30,39 @@ def format_report(results: Dict[str, dict]) -> str: lines.append("👁️ Agent Eyes Status") lines.append("=" * 40) - # Count stats - ok_count = sum(1 for r in results.values() if r["status"] == STATUS_OK) + ok_count = sum(1 for r in results.values() if r["status"] == "ok") total = len(results) - # Group by tier + # Tier 0 — zero config lines.append("") lines.append("✅ Ready (no setup needed):") for key, r in results.items(): - if r["tier"] == 0 and r["status"] == STATUS_OK: - lines.append(f" ✅ {r['name']}") - elif r["tier"] == 0 and r["status"] == STATUS_WARN: + if r["tier"] == 0 and r["status"] == "ok": + backends = ", ".join(r["backends"]) if r["backends"] else "built-in" + lines.append(f" ✅ {r['name']} [{backends}]") + elif r["tier"] == 0 and r["status"] in ("warn", "off"): lines.append(f" ⚠️ {r['name']} — {r['message']}") - lines.append("") - lines.append("🔍 Search (need free Exa API key):") - for key, r in results.items(): - if r["tier"] == 1: - icon = "✅" if r["status"] == STATUS_OK else "⬜" + # Tier 1 — needs free key + tier1 = {k: r for k, r in results.items() if r["tier"] == 1} + if tier1: + lines.append("") + lines.append("🔍 Search (need free Exa API key):") + for key, r in tier1.items(): + icon = "✅" if r["status"] == "ok" else "⬜" lines.append(f" {icon} {r['name']}") - lines.append("") - lines.append("🔧 Optional (advanced setup):") - for key, r in results.items(): - if r["tier"] == 2: - icon = "✅" if r["status"] == STATUS_OK else "⬜" + # Tier 2 — optional setup + tier2 = {k: r for k, r in results.items() if r["tier"] == 2} + if tier2: + lines.append("") + lines.append("🔧 Optional (advanced setup):") + for key, r in tier2.items(): + icon = "✅" if r["status"] == "ok" else "⬜" lines.append(f" {icon} {r['name']} — {r['message']}") lines.append("") - lines.append(f"Status: {ok_count}/{total} platforms active") + lines.append(f"Status: {ok_count}/{total} channels active") if ok_count < total: lines.append("Run `agent-eyes setup` to unlock more!") diff --git a/agent_eyes/integrations/mcp_server.py b/agent_eyes/integrations/mcp_server.py index 7542255..399faec 100644 --- a/agent_eyes/integrations/mcp_server.py +++ b/agent_eyes/integrations/mcp_server.py @@ -3,9 +3,8 @@ Agent Eyes MCP Server — expose all capabilities as MCP tools. Run: python -m agent_eyes.integrations.mcp_server -Or: agent-eyes serve (after pip install) -10 tools for any MCP-compatible AI Agent. +8 tools for any MCP-compatible AI Agent. """ import asyncio @@ -25,7 +24,6 @@ except ImportError: def create_server(): - """Create and configure the MCP server.""" if not HAS_MCP: print("MCP not installed. Install: pip install agent-eyes[mcp]", file=sys.stderr) sys.exit(1) @@ -37,97 +35,30 @@ def create_server(): @server.list_tools() async def list_tools(): return [ - Tool( - name="read_url", - description="Read content from any URL. Supports: web pages, GitHub, Reddit, Twitter, YouTube, Bilibili, WeChat, XiaoHongShu, RSS, Telegram.", - inputSchema={ - "type": "object", - "properties": { - "url": {"type": "string", "description": "URL to read"}, - }, - "required": ["url"], - }, - ), - Tool( - name="read_batch", - description="Read multiple URLs concurrently.", - inputSchema={ - "type": "object", - "properties": { - "urls": {"type": "array", "items": {"type": "string"}, "description": "List of URLs"}, - }, - "required": ["urls"], - }, - ), - Tool( - name="detect_platform", - description="Detect what platform a URL belongs to (github, reddit, twitter, youtube, etc).", - inputSchema={ - "type": "object", - "properties": { - "url": {"type": "string", "description": "URL to detect"}, - }, - "required": ["url"], - }, - ), - Tool( - name="search", - description="Semantic web search using Exa. Find any information on the internet.", - inputSchema={ - "type": "object", - "properties": { - "query": {"type": "string", "description": "Search query"}, - "num_results": {"type": "integer", "description": "Number of results (1-10)", "default": 5}, - }, - "required": ["query"], - }, - ), - Tool( - name="search_reddit", - description="Search Reddit posts and discussions. Works even when Reddit blocks your IP.", - inputSchema={ - "type": "object", - "properties": { - "query": {"type": "string", "description": "Search query"}, - "subreddit": {"type": "string", "description": "Optional subreddit filter (e.g. 'LocalLLaMA')"}, - "limit": {"type": "integer", "description": "Number of results", "default": 10}, - }, - "required": ["query"], - }, - ), - Tool( - name="search_github", - description="Search GitHub repositories by topic, keyword, or technology.", - inputSchema={ - "type": "object", - "properties": { - "query": {"type": "string", "description": "Search query"}, - "language": {"type": "string", "description": "Filter by language (e.g. 'python')"}, - "limit": {"type": "integer", "description": "Number of results", "default": 5}, - }, - "required": ["query"], - }, - ), - Tool( - name="search_twitter", - description="Search Twitter/X posts. Uses birdx if available, otherwise Exa.", - inputSchema={ - "type": "object", - "properties": { - "query": {"type": "string", "description": "Search query"}, - "limit": {"type": "integer", "description": "Number of results", "default": 10}, - }, - "required": ["query"], - }, - ), - Tool( - name="get_status", - description="Get Agent Eyes status: which platforms are active, which need configuration.", - inputSchema={ - "type": "object", - "properties": {}, - }, - ), + Tool(name="read_url", + description="Read content from any URL. Supports: web, GitHub, Reddit, Twitter, YouTube, Bilibili, RSS.", + inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}), + Tool(name="read_batch", + description="Read multiple URLs concurrently.", + inputSchema={"type": "object", "properties": {"urls": {"type": "array", "items": {"type": "string"}}}, "required": ["urls"]}), + Tool(name="detect_platform", + description="Detect what platform a URL belongs to.", + inputSchema={"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}), + Tool(name="search", + description="Semantic web search via Exa.", + inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "num_results": {"type": "integer", "default": 5}}, "required": ["query"]}), + Tool(name="search_reddit", + description="Search Reddit posts.", + inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "subreddit": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}), + Tool(name="search_github", + description="Search GitHub repositories.", + inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "language": {"type": "string"}, "limit": {"type": "integer", "default": 5}}, "required": ["query"]}), + Tool(name="search_twitter", + description="Search Twitter/X posts.", + inputSchema={"type": "object", "properties": {"query": {"type": "string"}, "limit": {"type": "integer", "default": 10}}, "required": ["query"]}), + Tool(name="get_status", + description="Get Agent Eyes status: which channels are active.", + inputSchema={"type": "object", "properties": {}}), ] @server.call_tool() @@ -135,53 +66,25 @@ def create_server(): try: if name == "read_url": result = await eyes.read(arguments["url"]) - return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))] - elif name == "read_batch": - results = await eyes.read_batch(arguments["urls"]) - return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))] - + result = await eyes.read_batch(arguments["urls"]) elif name == "detect_platform": - platform = eyes.detect_platform(arguments["url"]) - return [TextContent(type="text", text=f"Platform: {platform}")] - + result = eyes.detect_platform(arguments["url"]) elif name == "search": - results = await eyes.search( - arguments["query"], - num_results=arguments.get("num_results", 5), - ) - return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))] - + result = await eyes.search(arguments["query"], arguments.get("num_results", 5)) elif name == "search_reddit": - results = await eyes.search_reddit( - arguments["query"], - subreddit=arguments.get("subreddit"), - limit=arguments.get("limit", 10), - ) - return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))] - + result = await eyes.search_reddit(arguments["query"], arguments.get("subreddit"), arguments.get("limit", 10)) elif name == "search_github": - results = await eyes.search_github( - arguments["query"], - language=arguments.get("language"), - limit=arguments.get("limit", 5), - ) - return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))] - + result = await eyes.search_github(arguments["query"], arguments.get("language"), arguments.get("limit", 5)) elif name == "search_twitter": - results = await eyes.search_twitter( - arguments["query"], - limit=arguments.get("limit", 10), - ) - return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False, indent=2))] - + result = await eyes.search_twitter(arguments["query"], arguments.get("limit", 10)) elif name == "get_status": - report = eyes.doctor_report() - return [TextContent(type="text", text=report)] - + result = eyes.doctor_report() else: - return [TextContent(type="text", text=f"Unknown tool: {name}")] + result = f"Unknown tool: {name}" + text = json.dumps(result, ensure_ascii=False, indent=2) if isinstance(result, (dict, list)) else str(result) + return [TextContent(type="text", text=text)] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] diff --git a/agent_eyes/reader.py b/agent_eyes/reader.py deleted file mode 100644 index f1f7c16..0000000 --- a/agent_eyes/reader.py +++ /dev/null @@ -1,184 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Universal Reader — routes any URL to the right fetcher. - -The core dispatcher: give it a URL, get back structured content. -""" - -import asyncio -from urllib.parse import urlparse -from loguru import logger -from typing import Dict, Any, Optional - -from agent_eyes.schema import ( - UnifiedContent, UnifiedInbox, SourceType, MediaType, - from_bilibili, from_twitter, from_wechat, - from_xiaohongshu, from_youtube, from_rss, from_telegram, -) -from agent_eyes.readers.jina import fetch_via_jina - - -class UniversalReader: - """ - Routes URLs to platform-specific fetchers. - Falls back to Jina Reader for unknown platforms. - """ - - def __init__(self, inbox: Optional[UnifiedInbox] = None): - self.inbox = inbox - - def _detect_platform(self, url: str) -> str: - """Detect platform from URL.""" - domain = urlparse(url).netloc.lower() - - if "mp.weixin.qq.com" in domain: - return "wechat" - if "x.com" in domain or "twitter.com" in domain: - return "twitter" - if "youtube.com" in domain or "youtu.be" in domain: - return "youtube" - if "xiaohongshu.com" in domain or "xhslink.com" in domain: - return "xhs" - if "bilibili.com" in domain or "b23.tv" in domain: - return "bilibili" - if "xiaoyuzhoufm.com" in domain: - return "podcast" - if "podcasts.apple.com" in domain: - return "podcast" - if "t.me" in domain or "telegram.org" in domain: - return "telegram" - if "reddit.com" in domain or "redd.it" in domain: - return "reddit" - if "github.com" in domain: - return "github" - if url.endswith(".xml") or "/rss" in url or "/feed" in url or "/atom" in url or "rss" in domain: - return "rss" - return "generic" - - async def read(self, url: str) -> UnifiedContent: - """ - Fetch content from any URL and return as UnifiedContent. - - The main entry point — give it a URL, get back structured content. - """ - # Ensure URL has scheme - if not url.startswith(("http://", "https://")): - url = f"https://{url}" - - platform = self._detect_platform(url) - logger.info(f"[{platform}] {url[:60]}...") - - try: - content = await self._fetch(platform, url) - - # Save to inbox if configured - if self.inbox: - if self.inbox.add(content): - self.inbox.save() - logger.info(f"Saved to inbox: {content.title[:50]}") - - # Save to markdown output if configured - from agent_eyes.utils.storage import save_to_markdown - save_to_markdown(content) - - return content - - except Exception as e: - logger.error(f"[{platform}] Failed: {e}") - raise - - async def _fetch(self, platform: str, url: str) -> UnifiedContent: - """Dispatch to platform-specific fetcher.""" - - if platform == "bilibili": - from agent_eyes.readers.bilibili import fetch_bilibili - data = await fetch_bilibili(url) - return from_bilibili(data) - - if platform == "twitter": - from agent_eyes.readers.twitter import fetch_twitter - data = await fetch_twitter(url) - return from_twitter(data) - - if platform == "wechat": - from agent_eyes.readers.wechat import fetch_wechat - data = await fetch_wechat(url) - return from_wechat(data) - - if platform == "xhs": - from agent_eyes.readers.xhs import fetch_xhs - data = await fetch_xhs(url) - return from_xiaohongshu(data) - - if platform == "youtube": - from agent_eyes.readers.youtube import fetch_youtube - data = await fetch_youtube(url) - return from_youtube(data) - - if platform == "rss": - from agent_eyes.readers.rss import fetch_rss - articles = await fetch_rss(url, limit=1) - if articles: - return from_rss(articles[0]) - raise ValueError(f"No articles found in RSS feed: {url}") - - if platform == "reddit": - from agent_eyes.readers.reddit import fetch_reddit - data = await fetch_reddit(url) - return UnifiedContent( - source_type=SourceType.REDDIT, - source_name=f"r/{data.get('subreddit', '')}", - title=data["title"], - content=data.get("content", ""), - url=data["url"], - media_type=MediaType.TEXT, - extra={"author": data.get("author", ""), "score": data.get("score", 0), "num_comments": data.get("num_comments", 0)}, - ) - - if platform == "github": - from agent_eyes.readers.github import fetch_github - data = await fetch_github(url) - return UnifiedContent( - source_type=SourceType.GITHUB, - source_name=data.get("title", ""), - title=data["title"], - content=data.get("content", ""), - url=data["url"], - media_type=MediaType.TEXT, - extra={k: v for k, v in data.items() if k not in ("title", "content", "url", "platform")}, - ) - - if platform == "telegram": - from agent_eyes.readers.telegram import fetch_telegram - # Extract channel username from t.me URL - path = urlparse(url).path.strip("/").split("/")[0] - channel = path if path else url - messages = await fetch_telegram(channel, limit=1) - if messages: - return from_telegram(messages[0], channel, channel) - raise ValueError(f"No messages from Telegram channel: {url}") - - # Fallback: Jina Reader for any unknown URL - logger.info(f"Using Jina fallback for: {url}") - data = fetch_via_jina(url) - return UnifiedContent( - source_type=SourceType.MANUAL, - source_name=urlparse(url).netloc, - title=data["title"], - content=data["content"], - url=url, - ) - - async def read_batch(self, urls: list[str]) -> list[UnifiedContent]: - """Fetch multiple URLs concurrently.""" - tasks = [self.read(url) for url in urls] - results = await asyncio.gather(*tasks, return_exceptions=True) - - contents = [] - for url, result in zip(urls, results): - if isinstance(result, Exception): - logger.error(f"Batch failed for {url}: {result}") - else: - contents.append(result) - - return contents diff --git a/agent_eyes/readers/__init__.py b/agent_eyes/readers/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/agent_eyes/readers/bilibili.py b/agent_eyes/readers/bilibili.py deleted file mode 100644 index 34305ae..0000000 --- a/agent_eyes/readers/bilibili.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- coding: utf-8 -*- -"""Bilibili video fetcher — uses official web API.""" - -import re -import requests -from loguru import logger -from typing import Dict, Any - - -API_URL = "https://api.bilibili.com/x/web-interface/view" -HEADERS = { - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" -} - - -async def fetch_bilibili(url_or_bv: str) -> Dict[str, Any]: - """Fetch Bilibili video metadata via official API.""" - logger.info(f"Fetching Bilibili: {url_or_bv}") - - bv_id = url_or_bv - if "bilibili.com" in url_or_bv or "b23.tv" in url_or_bv: - match = re.search(r'BV\w+', url_or_bv) - if match: - bv_id = match.group() - else: - raise ValueError(f"Cannot extract BV ID from: {url_or_bv}") - - resp = requests.get(API_URL, params={"bvid": bv_id}, headers=HEADERS, timeout=10) - resp.raise_for_status() - data = resp.json() - - if data.get("code") != 0: - raise ValueError(f"Bilibili API error: {data.get('message')}") - - video = data["data"] - return { - "title": video.get("title", ""), - "description": video.get("desc", ""), - "author": video.get("owner", {}).get("name", ""), - "url": f"https://www.bilibili.com/video/{bv_id}", - "cover": video.get("pic", ""), - "bvid": bv_id, - "duration": video.get("duration", 0), - "view_count": video.get("stat", {}).get("view", 0), - "platform": "bilibili", - } diff --git a/agent_eyes/readers/browser.py b/agent_eyes/readers/browser.py deleted file mode 100644 index 10ac347..0000000 --- a/agent_eyes/readers/browser.py +++ /dev/null @@ -1,88 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Playwright browser fetcher — headless Chromium fallback for anti-scraping sites. - -Used when Jina Reader fails (403/451/timeout). Supports persistent login -sessions via Playwright's storage_state for platforms requiring authentication. - -Install: pip install "x-reader[browser]" && playwright install chromium -""" - -from loguru import logger -from pathlib import Path - -SESSION_DIR = Path.home() / ".x-reader" / "sessions" -TIMEOUT_MS = 30_000 - - -async def fetch_via_browser(url: str, storage_state: str = None) -> dict: - """ - Fetch a URL using headless Chromium via Playwright. - - Args: - url: Target URL to fetch. - storage_state: Path to a Playwright storage state JSON file (cookies/localStorage). - If provided, the browser context will load this session. - - Returns: - dict with keys: title, content, url, author - """ - try: - from playwright.async_api import async_playwright - except ImportError: - raise RuntimeError( - "Playwright is not installed. Run:\n" - ' pip install "x-reader[browser]"\n' - " playwright install chromium" - ) - - logger.info(f"Browser fetch: {url}") - - async with async_playwright() as p: - browser = await p.chromium.launch(headless=True) - - context_kwargs = {} - if storage_state and Path(storage_state).exists(): - context_kwargs["storage_state"] = storage_state - logger.info(f"Using session: {storage_state}") - - context = await browser.new_context( - user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/120.0.0.0 Safari/537.36", - **context_kwargs, - ) - page = await context.new_page() - - try: - await page.goto(url, wait_until="domcontentloaded", timeout=TIMEOUT_MS) - # Extra wait for JS-heavy pages - await page.wait_for_timeout(2000) - - title = await page.title() - # Extract main text content, stripping scripts/styles - content = await page.evaluate("""() => { - const el = document.querySelector('article') - || document.querySelector('main') - || document.querySelector('.content') - || document.body; - return el ? el.innerText : ''; - }""") - - result = { - "title": (title or "").strip()[:200], - "content": (content or "").strip(), - "url": url, - "author": "", - } - logger.info(f"Browser fetch OK: {title[:60]}") - return result - - finally: - await context.close() - await browser.close() - - -def get_session_path(platform: str) -> str: - """Get the session file path for a platform.""" - return str(SESSION_DIR / f"{platform}.json") diff --git a/agent_eyes/readers/github.py b/agent_eyes/readers/github.py deleted file mode 100644 index bf54a74..0000000 --- a/agent_eyes/readers/github.py +++ /dev/null @@ -1,190 +0,0 @@ -# -*- coding: utf-8 -*- -"""GitHub fetcher — extracts repo info, issues, PRs, and README content. - -Uses GitHub public API (no token needed for public repos). -For higher rate limits, set GITHUB_TOKEN env var. -""" - -import os -import re -import base64 -import requests -from loguru import logger -from typing import Dict, Any, List, Optional - - -API_BASE = "https://api.github.com" - - -def _get_headers() -> Dict[str, str]: - """Get request headers, optionally with auth token.""" - headers = { - "Accept": "application/vnd.github.v3+json", - "User-Agent": "AgentEyes/1.0", - } - token = os.environ.get("GITHUB_TOKEN") - if token: - headers["Authorization"] = f"Bearer {token}" - return headers - - -def _parse_github_url(url: str) -> Dict[str, str]: - """Parse GitHub URL into components.""" - # Match: github.com/owner/repo[/type/number] - match = re.search( - r'github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(issues|pull|tree|blob)/(.+))?/?$', - url - ) - if not match: - raise ValueError(f"Cannot parse GitHub URL: {url}") - - return { - "owner": match.group(1), - "repo": match.group(2), - "type": match.group(3), # issues, pull, tree, blob, or None - "ref": match.group(4), # issue number, branch, file path, or None - } - - -async def fetch_github(url: str) -> Dict[str, Any]: - """Fetch content from a GitHub URL.""" - logger.info(f"Fetching GitHub: {url}") - - parsed = _parse_github_url(url) - owner = parsed["owner"] - repo = parsed["repo"] - content_type = parsed["type"] - ref = parsed["ref"] - - headers = _get_headers() - - if content_type == "issues" and ref: - return await _fetch_issue(owner, repo, ref, headers) - elif content_type == "pull" and ref: - return await _fetch_pull(owner, repo, ref, headers) - else: - return await _fetch_repo(owner, repo, headers) - - -async def _fetch_repo(owner: str, repo: str, headers: Dict) -> Dict[str, Any]: - """Fetch repo info + README.""" - # Get repo info - repo_resp = requests.get(f"{API_BASE}/repos/{owner}/{repo}", headers=headers, timeout=10) - repo_resp.raise_for_status() - repo_data = repo_resp.json() - - # Get README - readme_content = "" - try: - readme_resp = requests.get( - f"{API_BASE}/repos/{owner}/{repo}/readme", - headers=headers, timeout=10, - ) - if readme_resp.status_code == 200: - readme_data = readme_resp.json() - readme_content = base64.b64decode(readme_data.get("content", "")).decode("utf-8") - except Exception as e: - logger.warning(f"Could not fetch README: {e}") - - return { - "title": f"{owner}/{repo}", - "content": readme_content or repo_data.get("description", ""), - "description": repo_data.get("description", ""), - "author": owner, - "url": repo_data.get("html_url", ""), - "stars": repo_data.get("stargazers_count", 0), - "forks": repo_data.get("forks_count", 0), - "language": repo_data.get("language", ""), - "topics": repo_data.get("topics", []), - "license": (repo_data.get("license") or {}).get("spdx_id", ""), - "platform": "github", - } - - -async def _fetch_issue(owner: str, repo: str, number: str, headers: Dict) -> Dict[str, Any]: - """Fetch a GitHub issue with comments.""" - issue_num = re.match(r'(\d+)', number).group(1) - - # Get issue - resp = requests.get( - f"{API_BASE}/repos/{owner}/{repo}/issues/{issue_num}", - headers=headers, timeout=10, - ) - resp.raise_for_status() - issue = resp.json() - - # Get comments - comments_text = "" - if issue.get("comments", 0) > 0: - c_resp = requests.get( - f"{API_BASE}/repos/{owner}/{repo}/issues/{issue_num}/comments", - headers=headers, params={"per_page": 20}, timeout=10, - ) - if c_resp.status_code == 200: - comments = c_resp.json() - parts = ["\n---\n## Comments\n"] - for c in comments: - parts.append(f"**@{c.get('user', {}).get('login', '?')}**:\n{c.get('body', '')}\n") - comments_text = "\n".join(parts) - - return { - "title": f"[{owner}/{repo}#{issue_num}] {issue.get('title', '')}", - "content": (issue.get("body", "") or "") + comments_text, - "author": issue.get("user", {}).get("login", ""), - "url": issue.get("html_url", ""), - "state": issue.get("state", ""), - "labels": [l.get("name", "") for l in issue.get("labels", [])], - "platform": "github", - } - - -async def _fetch_pull(owner: str, repo: str, number: str, headers: Dict) -> Dict[str, Any]: - """Fetch a GitHub pull request.""" - pr_num = re.match(r'(\d+)', number).group(1) - - resp = requests.get( - f"{API_BASE}/repos/{owner}/{repo}/pulls/{pr_num}", - headers=headers, timeout=10, - ) - resp.raise_for_status() - pr = resp.json() - - return { - "title": f"[{owner}/{repo}#{pr_num}] {pr.get('title', '')}", - "content": pr.get("body", "") or "", - "author": pr.get("user", {}).get("login", ""), - "url": pr.get("html_url", ""), - "state": pr.get("state", ""), - "merged": pr.get("merged", False), - "additions": pr.get("additions", 0), - "deletions": pr.get("deletions", 0), - "changed_files": pr.get("changed_files", 0), - "platform": "github", - } - - -async def search_github(query: str, limit: int = 5) -> List[Dict[str, Any]]: - """Search GitHub repositories.""" - logger.info(f"Searching GitHub: {query}") - - resp = requests.get( - f"{API_BASE}/search/repositories", - headers=_get_headers(), - params={"q": query, "sort": "stars", "per_page": limit}, - timeout=10, - ) - resp.raise_for_status() - data = resp.json() - - results = [] - for item in data.get("items", []): - results.append({ - "title": item.get("full_name", ""), - "description": item.get("description", ""), - "url": item.get("html_url", ""), - "stars": item.get("stargazers_count", 0), - "language": item.get("language", ""), - "updated_at": item.get("updated_at", ""), - }) - - return results diff --git a/agent_eyes/readers/jina.py b/agent_eyes/readers/jina.py deleted file mode 100644 index 3ee6882..0000000 --- a/agent_eyes/readers/jina.py +++ /dev/null @@ -1,63 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Jina Reader — universal fallback for content extraction. - -Uses https://r.jina.ai/{url} to extract markdown from any web page. -Free, no API key required, handles JS rendering and anti-scraping. -""" - -import requests -from loguru import logger - - -JINA_BASE = "https://r.jina.ai" -TIMEOUT = 30 - -HEADERS = { - "Accept": "text/markdown", - "User-Agent": "x-reader/0.1", -} - - -def fetch_via_jina(url: str) -> dict: - """ - Fetch any URL via Jina Reader and return structured data. - - Returns: - dict with keys: title, content, url, author (best-effort) - """ - jina_url = f"{JINA_BASE}/{url}" - logger.info(f"Jina fetch: {url}") - - try: - resp = requests.get(jina_url, headers=HEADERS, timeout=TIMEOUT) - resp.raise_for_status() - text = resp.text - - # Jina returns markdown; first line is usually the title - lines = text.strip().split("\n") - title = "" - content_lines = [] - - for line in lines: - if not title and line.strip(): - # First non-empty line as title, strip markdown heading - title = line.lstrip("#").strip() - else: - content_lines.append(line) - - content = "\n".join(content_lines).strip() - - return { - "title": title[:200], - "content": content, - "url": url, - "author": "", - } - - except requests.Timeout: - logger.error(f"Jina timeout: {url}") - raise - except requests.RequestException as e: - logger.error(f"Jina fetch failed: {url} — {e}") - raise diff --git a/agent_eyes/readers/reddit.py b/agent_eyes/readers/reddit.py deleted file mode 100644 index a435b7b..0000000 --- a/agent_eyes/readers/reddit.py +++ /dev/null @@ -1,132 +0,0 @@ -# -*- coding: utf-8 -*- -"""Reddit fetcher — extracts posts and comments via JSON API. - -Supports optional proxy via REDDIT_PROXY env var (many IPs are blocked by Reddit). -Example: REDDIT_PROXY=http://user:pass@host:port -""" - -import os -import re -import requests -from loguru import logger -from typing import Dict, Any, List, Optional - - -HEADERS = { - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" -} - - -def _get_proxies() -> Optional[Dict[str, str]]: - """Get proxy config from env.""" - proxy = os.environ.get("REDDIT_PROXY") - if proxy: - return {"http": proxy, "https": proxy} - return None - - -def _extract_post(post_data: Dict) -> Dict[str, Any]: - """Extract post info from Reddit JSON.""" - data = post_data.get("data", {}) - return { - "title": data.get("title", ""), - "author": data.get("author", "[deleted]"), - "selftext": data.get("selftext", ""), - "score": data.get("score", 0), - "num_comments": data.get("num_comments", 0), - "url": f"https://www.reddit.com{data.get('permalink', '')}", - "created_utc": data.get("created_utc", 0), - "subreddit": data.get("subreddit", ""), - } - - -def _extract_comments(comments_data: Dict, limit: int = 20) -> List[Dict[str, str]]: - """Extract top-level comments.""" - comments = [] - children = comments_data.get("data", {}).get("children", []) - for child in children[:limit]: - if child.get("kind") != "t1": - continue - data = child.get("data", {}) - comments.append({ - "author": data.get("author", "[deleted]"), - "body": data.get("body", ""), - "score": data.get("score", 0), - }) - return comments - - -async def fetch_reddit(url: str) -> Dict[str, Any]: - """Fetch Reddit post + comments via JSON API.""" - logger.info(f"Fetching Reddit: {url}") - - # Normalize URL and append .json - clean_url = re.sub(r'\?.*$', '', url.rstrip('/')) - json_url = f"{clean_url}.json" - - resp = requests.get( - json_url, - headers=HEADERS, - proxies=_get_proxies(), - timeout=15, - ) - resp.raise_for_status() - data = resp.json() - - # Reddit returns [post_listing, comments_listing] - if not isinstance(data, list) or len(data) < 2: - raise ValueError(f"Unexpected Reddit response format") - - post_listing = data[0].get("data", {}).get("children", []) - if not post_listing: - raise ValueError("No post found") - - post = _extract_post(post_listing[0]) - comments = _extract_comments(data[1]) - - # Build readable content - content_parts = [post["selftext"]] if post["selftext"] else [] - if comments: - content_parts.append("\n---\n## Top Comments\n") - for c in comments: - content_parts.append(f"**u/{c['author']}** ({c['score']} pts):\n{c['body']}\n") - - return { - "title": post["title"], - "content": "\n".join(content_parts), - "author": f"u/{post['author']}", - "url": post["url"], - "subreddit": post["subreddit"], - "score": post["score"], - "num_comments": post["num_comments"], - "platform": "reddit", - } - - -async def search_reddit(query: str, subreddit: str = None, limit: int = 10) -> List[Dict[str, Any]]: - """Search Reddit posts.""" - logger.info(f"Searching Reddit: {query} (sub={subreddit})") - - if subreddit: - search_url = f"https://www.reddit.com/r/{subreddit}/search.json" - params = {"q": query, "restrict_sr": "on", "limit": limit, "sort": "relevance"} - else: - search_url = "https://www.reddit.com/search.json" - params = {"q": query, "limit": limit, "sort": "relevance"} - - resp = requests.get( - search_url, - headers=HEADERS, - params=params, - proxies=_get_proxies(), - timeout=15, - ) - resp.raise_for_status() - data = resp.json() - - results = [] - for child in data.get("data", {}).get("children", []): - results.append(_extract_post(child)) - - return results diff --git a/agent_eyes/readers/rss.py b/agent_eyes/readers/rss.py deleted file mode 100644 index 666155d..0000000 --- a/agent_eyes/readers/rss.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- coding: utf-8 -*- -"""RSS feed fetcher — uses feedparser.""" - -import feedparser -from loguru import logger -from typing import Dict, Any, List - - -async def fetch_rss(url: str, limit: int = 20) -> List[Dict[str, Any]]: - """ - Fetch and parse an RSS/Atom feed. - - Args: - url: RSS feed URL - limit: Max number of entries to return - - Returns: - List of article dicts with: title, summary, url, source, published - """ - logger.info(f"Fetching RSS: {url}") - - feed = feedparser.parse(url) - - if feed.bozo and not feed.entries: - raise ValueError(f"Failed to parse RSS feed: {feed.bozo_exception}") - - source_name = feed.feed.get("title", url) - articles = [] - - for entry in feed.entries[:limit]: - summary = "" - if hasattr(entry, "summary"): - summary = entry.summary - elif hasattr(entry, "content"): - summary = entry.content[0].get("value", "") - - articles.append({ - "title": entry.get("title", ""), - "summary": summary, - "url": entry.get("link", ""), - "source": source_name, - "published": entry.get("published", ""), - "platform": "rss", - }) - - logger.info(f"RSS: {len(articles)} articles from {source_name}") - return articles diff --git a/agent_eyes/readers/telegram.py b/agent_eyes/readers/telegram.py deleted file mode 100644 index 803339d..0000000 --- a/agent_eyes/readers/telegram.py +++ /dev/null @@ -1,71 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Telegram channel fetcher — uses Telethon. - -Requires: pip install x-reader[telegram] -Requires: TG_API_ID + TG_API_HASH in .env -""" - -import os -from datetime import datetime, timedelta, timezone -from loguru import logger -from typing import Dict, Any, List - - -async def fetch_telegram( - channel: str, - limit: int = 20, - hours: int = 24, - session_path: str = None, -) -> List[Dict[str, Any]]: - """ - Fetch recent messages from a Telegram channel. - - Args: - channel: Channel username (e.g. 'predictionmkt') - limit: Max messages per channel - hours: Only fetch messages from the last N hours - session_path: Path to Telethon session file - - Returns: - List of message dicts - """ - try: - from telethon import TelegramClient - from telethon.tl.types import Message - except ImportError: - raise ImportError( - "Telethon is required for Telegram fetching. " - "Install with: pip install x-reader[telegram]" - ) - - api_id = os.getenv("TG_API_ID", "") - api_hash = os.getenv("TG_API_HASH", "") - - if not api_id or not api_hash: - raise ValueError("TG_API_ID and TG_API_HASH must be set in .env") - - session = session_path or os.getenv("TG_SESSION_PATH", "./tg_session") - cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) - - messages = [] - async with TelegramClient(session, int(api_id), api_hash) as client: - logger.info(f"Fetching TG channel: {channel}") - entity = await client.get_entity(channel) - - async for msg in client.iter_messages(entity, limit=limit): - if not isinstance(msg, Message) or not msg.text: - continue - if msg.date < cutoff: - break - - messages.append({ - "text": msg.text, - "views": msg.views or 0, - "date": msg.date.isoformat(), - "url": f"https://t.me/{channel}/{msg.id}", - "platform": "telegram", - }) - - logger.info(f"TG {channel}: {len(messages)} messages") - return messages diff --git a/agent_eyes/readers/twitter.py b/agent_eyes/readers/twitter.py deleted file mode 100644 index f9d508a..0000000 --- a/agent_eyes/readers/twitter.py +++ /dev/null @@ -1,217 +0,0 @@ -# -*- coding: utf-8 -*- -""" -X/Twitter fetcher — three-tier fallback: - -1. X oEmbed API (fast, reliable for individual tweets, no login needed) -2. Jina Reader (handles non-tweet X pages like profiles) -3. Playwright + saved session (handles login-required content) - -Install browser tier: pip install "x-reader[browser]" && playwright install chromium -Save X session: x-reader login twitter -""" - -import re -import requests -from loguru import logger -from typing import Dict, Any - -from agent_eyes.readers.jina import fetch_via_jina - - -OEMBED_URL = "https://publish.twitter.com/oembed" - - -def _extract_author(url: str) -> str: - """Extract @username from tweet URL.""" - match = re.search(r'x\.com/(\w+)/status', url) - return f"@{match.group(1)}" if match else "" - - -def _is_tweet_url(url: str) -> bool: - """Check if this is a direct tweet/status URL (vs profile or other X page).""" - return bool(re.search(r'x\.com/\w+/status/\d+', url)) - - -def _fetch_via_oembed(url: str) -> Dict[str, Any]: - """ - Fetch tweet text via X's oEmbed API. - Free, reliable, no auth needed. Works for public tweets. - Note: oEmbed requires twitter.com URLs (not x.com). - """ - # oEmbed API requires twitter.com format - oembed_query_url = url.replace("x.com", "twitter.com") - resp = requests.get( - OEMBED_URL, - params={"url": oembed_query_url, "omit_script": "true"}, - timeout=10, - ) - resp.raise_for_status() - data = resp.json() - - # Strip HTML tags from the embedded HTML to get clean text - html = data.get("html", "") - text = re.sub(r'<[^>]+>', ' ', html) - text = re.sub(r'\s+', ' ', text).strip() - - return { - "text": text, - "author": data.get("author_name", ""), - "author_url": data.get("author_url", ""), - "title": text[:100] if text else "", - } - - -async def _fetch_via_playwright(url: str) -> Dict[str, Any]: - """ - Fetch tweet via Playwright with X-specific DOM selectors. - Uses saved login session if available (~/.x-reader/sessions/twitter.json). - """ - try: - from playwright.async_api import async_playwright - except ImportError: - raise RuntimeError( - "Playwright not installed. Run:\n" - ' pip install "x-reader[browser]"\n' - " playwright install chromium" - ) - - from agent_eyes.readers.browser import get_session_path - from pathlib import Path - - session_path = get_session_path("twitter") - has_session = Path(session_path).exists() - if has_session: - logger.info(f"Using saved X session: {session_path}") - - async with async_playwright() as p: - browser = await p.chromium.launch(headless=True) - - context_kwargs = {} - if has_session: - context_kwargs["storage_state"] = session_path - - context = await browser.new_context( - user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/120.0.0.0 Safari/537.36", - **context_kwargs, - ) - page = await context.new_page() - - try: - await page.goto(url, wait_until="domcontentloaded", timeout=30_000) - - # Wait for tweet text to render (X is a SPA, needs JS execution) - try: - await page.wait_for_selector( - '[data-testid="tweetText"]', timeout=10_000 - ) - except Exception: - pass # May not appear if login required - - # Extract tweet content with X-specific selectors - tweet_text = await page.evaluate("""() => { - // Priority 1: tweet text element - const tweetEl = document.querySelector('[data-testid="tweetText"]'); - if (tweetEl) return tweetEl.innerText; - - // Priority 2: article element (thread view) - const article = document.querySelector('article'); - if (article) return article.innerText; - - // Priority 3: main content area - const main = document.querySelector('main'); - if (main) return main.innerText; - - return ''; - }""") - - title = await page.title() - - return { - "text": (tweet_text or "").strip(), - "title": (title or "").strip()[:200], - } - finally: - await context.close() - await browser.close() - - -async def fetch_twitter(url: str) -> Dict[str, Any]: - """ - Fetch a tweet or X post with three-tier fallback. - - Args: - url: Tweet URL (x.com or twitter.com) - - Returns: - Dict with: text, author, url, title, platform - """ - url = url.replace("twitter.com", "x.com") - author = _extract_author(url) - - # Tier 1: oEmbed API (best for individual tweets) - if _is_tweet_url(url): - try: - logger.info(f"[Twitter] Tier 1 — oEmbed: {url}") - data = _fetch_via_oembed(url) - if data.get("text") and len(data["text"].strip()) > 20: - return { - "text": data["text"], - "author": author or data.get("author", ""), - "url": url, - "title": data.get("title", ""), - "platform": "twitter", - } - logger.warning("[Twitter] oEmbed returned thin content") - except Exception as e: - logger.warning(f"[Twitter] oEmbed failed ({e})") - - # Tier 2: Jina Reader (handles profiles, threads, non-tweet pages) - try: - logger.info(f"[Twitter] Tier 2 — Jina: {url}") - data = fetch_via_jina(url) - content = data.get("content", "") - title = data.get("title", "") - jina_ok = ( - content - and len(content.strip()) > 100 - and "not yet fully loaded" not in content.lower() - and title.lower() not in ("x", "title: x", "") - ) - if jina_ok: - return { - "text": content, - "author": author, - "url": url, - "title": title, - "platform": "twitter", - } - logger.warning("[Twitter] Jina returned unusable content") - except Exception as e: - logger.warning(f"[Twitter] Jina failed ({e})") - - # Tier 3: Playwright + session with X-specific extraction - try: - logger.info(f"[Twitter] Tier 3 — Playwright: {url}") - data = await _fetch_via_playwright(url) - content = data.get("text", "") - if content and len(content.strip()) > 20: - return { - "text": content, - "author": author, - "url": url, - "title": data.get("title", ""), - "platform": "twitter", - } - logger.warning("[Twitter] Playwright returned empty content") - except RuntimeError: - raise - except Exception as e: - logger.error(f"[Twitter] All methods failed: {e}") - - raise RuntimeError( - f"❌ All Twitter fetch methods failed for: {url}\n" - f" Try: x-reader login twitter (to save session for browser fallback)\n" - f" Then retry: x-reader {url}" - ) diff --git a/agent_eyes/readers/wechat.py b/agent_eyes/readers/wechat.py deleted file mode 100644 index def6d36..0000000 --- a/agent_eyes/readers/wechat.py +++ /dev/null @@ -1,62 +0,0 @@ -# -*- coding: utf-8 -*- -""" -WeChat article fetcher — two-tier fallback: - -1. Jina Reader (fast, no deps) -2. Playwright headless (no login needed for public articles) -""" - -from loguru import logger -from typing import Dict, Any - - -async def fetch_wechat(url: str) -> Dict[str, Any]: - """ - Fetch a WeChat public account article with fallback. - - Args: - url: mp.weixin.qq.com article URL - - Returns: - Dict with: title, content, author, url, platform - """ - # Tier 1: Jina Reader - try: - logger.info(f"[WeChat] Tier 1 — Jina: {url}") - from agent_eyes.readers.jina import fetch_via_jina - - data = fetch_via_jina(url) - if data.get("content"): - return { - "title": data["title"], - "content": data["content"], - "author": data.get("author", ""), - "url": url, - "platform": "wechat", - } - logger.warning("[WeChat] Jina returned empty content, falling back to browser") - except Exception as e: - logger.warning(f"[WeChat] Jina failed ({e}), falling back to browser") - - # Tier 2: Playwright headless (no session needed) - try: - logger.info(f"[WeChat] Tier 2 — Playwright headless: {url}") - from agent_eyes.readers.browser import fetch_via_browser - - data = await fetch_via_browser(url) - return { - "title": data["title"], - "content": data["content"], - "author": data.get("author", ""), - "url": url, - "platform": "wechat", - } - except RuntimeError: - # Playwright not installed — re-raise with original Jina error context - raise - except Exception as e: - logger.error(f"[WeChat] Browser fetch also failed: {e}") - raise RuntimeError( - f"❌ All WeChat fetch methods failed.\n" - f" Last error: {e}" - ) diff --git a/agent_eyes/readers/xhs.py b/agent_eyes/readers/xhs.py deleted file mode 100644 index da720b5..0000000 --- a/agent_eyes/readers/xhs.py +++ /dev/null @@ -1,78 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Xiaohongshu (RED) note fetcher — three-tier fallback: - -1. Jina Reader (fast, no deps) -2. Playwright + saved session (handles 451/403) -3. Error with login instructions - -Install browser tier: pip install "x-reader[browser]" && playwright install chromium -""" - -from loguru import logger -from typing import Dict, Any -from pathlib import Path - -from agent_eyes.readers.jina import fetch_via_jina - - -async def fetch_xhs(url: str) -> Dict[str, Any]: - """ - Fetch a Xiaohongshu note with three-tier fallback. - - Args: - url: xiaohongshu.com or xhslink.com URL - - Returns: - Dict with: title, content, author, url, platform - """ - # Tier 1: Jina Reader - try: - logger.info(f"[XHS] Tier 1 — Jina: {url}") - data = fetch_via_jina(url) - if data.get("content"): - return { - "title": data["title"], - "content": data["content"], - "author": data.get("author", ""), - "url": url, - "platform": "xhs", - } - logger.warning("[XHS] Jina returned empty content, falling back to browser") - except Exception as e: - logger.warning(f"[XHS] Jina failed ({e}), falling back to browser") - - # Tier 2: Playwright with session - from agent_eyes.readers.browser import get_session_path, SESSION_DIR - - session_path = get_session_path("xhs") - if not Path(session_path).exists(): - # Tier 3: No session — guide user - raise RuntimeError( - f"❌ XHS blocked Jina and no saved session found.\n" - f" Run: x-reader login xhs\n" - f" Then retry this URL." - ) - - try: - logger.info(f"[XHS] Tier 2 — Playwright with session: {url}") - from agent_eyes.readers.browser import fetch_via_browser - - data = await fetch_via_browser(url, storage_state=session_path) - return { - "title": data["title"], - "content": data["content"], - "author": data.get("author", ""), - "url": url, - "platform": "xhs", - } - except RuntimeError: - # Playwright not installed - raise - except Exception as e: - logger.error(f"[XHS] Browser fetch also failed: {e}") - raise RuntimeError( - f"❌ All XHS fetch methods failed.\n" - f" Last error: {e}\n" - f" Try: x-reader login xhs (to refresh session)" - ) diff --git a/agent_eyes/readers/youtube.py b/agent_eyes/readers/youtube.py deleted file mode 100644 index f32f565..0000000 --- a/agent_eyes/readers/youtube.py +++ /dev/null @@ -1,211 +0,0 @@ -# -*- coding: utf-8 -*- -""" -YouTube video fetcher — three-tier content extraction: - -1. yt-dlp auto-subtitles (fastest, best quality for subtitled videos) -2. yt-dlp audio download → Groq Whisper API transcription (for non-subtitled videos) -3. Jina Reader fallback (page description only) - -Requires: yt-dlp installed (brew install yt-dlp / pip install yt-dlp) -Optional: GROQ_API_KEY env var for Whisper transcription -""" - -import re -import os -import subprocess -import tempfile -from loguru import logger -from typing import Dict, Any - -from agent_eyes.readers.jina import fetch_via_jina - - -def _extract_video_id(url: str) -> str: - """Extract video ID from YouTube URL.""" - match = re.search(r'(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})', url) - return match.group(1) if match else "" - - -def _get_subtitles_via_ytdlp(url: str, lang: str = "en") -> str: - """ - Download auto-generated subtitles using yt-dlp. - Returns subtitle text, or empty string if unavailable. - """ - with tempfile.TemporaryDirectory() as tmpdir: - output_path = os.path.join(tmpdir, "sub") - - cmd = [ - "yt-dlp", - "--write-auto-sub", - "--write-sub", - "--sub-lang", lang, - "--sub-format", "srt", - "--skip-download", - "-o", output_path, - url, - ] - - try: - subprocess.run(cmd, capture_output=True, text=True, timeout=60) - except FileNotFoundError: - logger.warning("yt-dlp not found. Install with: brew install yt-dlp") - return "" - except subprocess.TimeoutExpired: - logger.warning("yt-dlp subtitle download timed out") - return "" - - for ext in [f".{lang}.srt", f".{lang}.vtt"]: - sub_file = output_path + ext - if os.path.exists(sub_file): - return _parse_srt(sub_file) - - return "" - - -def _parse_srt(filepath: str) -> str: - """Parse SRT file into clean text (strip timestamps and sequence numbers).""" - with open(filepath, 'r', encoding='utf-8') as f: - lines = f.readlines() - - text_lines = [] - seen = set() - - for line in lines: - line = line.strip() - if not line or line.isdigit() or '-->' in line: - continue - if line.startswith('[') and line.endswith(']'): - continue - if line not in seen: - seen.add(line) - text_lines.append(line) - - return " ".join(text_lines) - - -def _transcribe_via_whisper(url: str) -> str: - """ - Download audio with yt-dlp and transcribe via Groq Whisper API. - - Requires: GROQ_API_KEY env var + yt-dlp + ffmpeg installed. - Groq Whisper limit: 25MB audio file. - Returns transcript text, or empty string if unavailable. - """ - api_key = os.getenv("GROQ_API_KEY") - if not api_key: - logger.info("GROQ_API_KEY not set, skipping Whisper transcription") - return "" - - with tempfile.TemporaryDirectory() as tmpdir: - output_template = os.path.join(tmpdir, "audio.%(ext)s") - - cmd = [ - "yt-dlp", - "-x", - "--audio-format", "m4a", - "--audio-quality", "5", - "-o", output_template, - "--no-playlist", - url, - ] - - try: - subprocess.run(cmd, capture_output=True, text=True, timeout=180) - except FileNotFoundError: - logger.warning("yt-dlp not found for audio download") - return "" - except subprocess.TimeoutExpired: - logger.warning("yt-dlp audio download timed out") - return "" - - # Find the downloaded audio file - audio_path = os.path.join(tmpdir, "audio.m4a") - if not os.path.exists(audio_path): - for f in os.listdir(tmpdir): - if f.startswith("audio."): - audio_path = os.path.join(tmpdir, f) - break - else: - logger.warning("No audio file downloaded") - return "" - - file_size = os.path.getsize(audio_path) - if file_size > 25 * 1024 * 1024: - logger.warning(f"Audio file too large ({file_size // 1024 // 1024}MB > 25MB limit)") - return "" - - logger.info(f"Transcribing {file_size // 1024}KB audio via Groq Whisper...") - - import requests - try: - with open(audio_path, "rb") as f: - response = requests.post( - "https://api.groq.com/openai/v1/audio/transcriptions", - headers={"Authorization": f"Bearer {api_key}"}, - files={"file": (os.path.basename(audio_path), f, "audio/mp4")}, - data={"model": "whisper-large-v3", "response_format": "text"}, - timeout=120, - ) - - if response.status_code == 200: - transcript = response.text.strip() - logger.info(f"Whisper transcript: {len(transcript)} chars") - return transcript - else: - logger.warning(f"Groq Whisper API error: {response.status_code} {response.text[:200]}") - return "" - except Exception as e: - logger.warning(f"Whisper transcription failed: {e}") - return "" - - -async def fetch_youtube(url: str, sub_lang: str = "en") -> Dict[str, Any]: - """ - Fetch YouTube video content with three-tier extraction. - - Strategy: - 1. yt-dlp auto-subtitles (full transcript, fastest) - 2. yt-dlp audio + Groq Whisper API (for non-subtitled videos) - 3. Jina Reader fallback (page description only) - - Args: - url: YouTube video URL - sub_lang: Subtitle language code (default: "en") - - Returns: - Dict with: title, description, author, url, video_id, has_transcript, platform - """ - logger.info(f"Fetching YouTube: {url}") - video_id = _extract_video_id(url) - - # Step 1: Get metadata via Jina (fast, always works) - jina_data = fetch_via_jina(url) - title = jina_data["title"] - - # Step 2: Try yt-dlp auto-subtitles - logger.info(f"Extracting subtitles ({sub_lang})...") - transcript = _get_subtitles_via_ytdlp(url, lang=sub_lang) - - # Step 3: No subtitles? Try Whisper transcription - if not transcript: - logger.info("No subtitles available, trying Whisper transcription...") - transcript = _transcribe_via_whisper(url) - - if transcript: - logger.info(f"Got transcript: {len(transcript)} chars") - content = transcript - has_transcript = True - else: - logger.info("No transcript available, using page description") - content = jina_data["content"] - has_transcript = False - - return { - "title": title, - "description": content, - "author": jina_data.get("author", ""), - "url": url, - "video_id": video_id, - "has_transcript": has_transcript, - "platform": "youtube", - } diff --git a/agent_eyes/schema.py b/agent_eyes/schema.py deleted file mode 100644 index 1582eb5..0000000 --- a/agent_eyes/schema.py +++ /dev/null @@ -1,283 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Unified content schema for Agent Eyes. - -Defines the standard data format for all content sources: -- Telegram channels -- RSS feeds -- Bilibili videos -- Xiaohongshu (RED) notes -- WeChat articles -- X/Twitter posts -- YouTube videos -- Reddit posts -- GitHub repos/issues/PRs -- Web search results -- Manual input -""" - -from dataclasses import dataclass, field, asdict -from datetime import datetime, timedelta -from typing import Optional, List -from enum import Enum -import hashlib -import json - - -class SourceType(str, Enum): - """Content source types.""" - TELEGRAM = "telegram" - RSS = "rss" - BILIBILI = "bilibili" - XIAOHONGSHU = "xhs" - TWITTER = "twitter" - WECHAT = "wechat" - YOUTUBE = "youtube" - REDDIT = "reddit" - GITHUB = "github" - SEARCH = "search" - MANUAL = "manual" - - -class MediaType(str, Enum): - """Media types.""" - TEXT = "text" - VIDEO = "video" - AUDIO = "audio" - IMAGE = "image" - - -class Priority(str, Enum): - """Content priority levels.""" - HOT = "hot" - QUALITY = "quality" - DEEP = "deep" - NORMAL = "normal" - LOW = "low" - - -@dataclass -class UnifiedContent: - """Unified content format across all platforms.""" - - # === Required === - source_type: SourceType - source_name: str - title: str - content: str - url: str - - # === Auto-generated === - id: str = "" - fetched_at: str = "" - - # === Media === - media_type: MediaType = MediaType.TEXT - media_url: Optional[str] = None - - # === Scoring === - score: int = 0 - priority: Priority = Priority.NORMAL - category: str = "" - tags: List[str] = field(default_factory=list) - - # === Processing state === - processed: bool = False - digest_date: Optional[str] = None - - # === Translation === - title_cn: Optional[str] = None - content_cn: Optional[str] = None - - # === Metadata === - extra: dict = field(default_factory=dict) - - def __post_init__(self): - if not self.id: - self.id = hashlib.md5(self.url.encode()).hexdigest()[:12] - if not self.fetched_at: - self.fetched_at = datetime.now().isoformat() - - def to_dict(self) -> dict: - d = asdict(self) - d['source_type'] = self.source_type.value - d['media_type'] = self.media_type.value - d['priority'] = self.priority.value - return d - - @classmethod - def from_dict(cls, data: dict) -> 'UnifiedContent': - if isinstance(data.get('source_type'), str): - data['source_type'] = SourceType(data['source_type']) - if isinstance(data.get('media_type'), str): - data['media_type'] = MediaType(data['media_type']) - if isinstance(data.get('priority'), str): - data['priority'] = Priority(data['priority']) - known = {f.name for f in cls.__dataclass_fields__.values()} - data = {k: v for k, v in data.items() if k in known} - return cls(**data) - - -# ============================================================================= -# Converters: platform-specific dict → UnifiedContent -# ============================================================================= - -def from_telegram(msg: dict, channel_name: str, channel_username: str) -> UnifiedContent: - return UnifiedContent( - source_type=SourceType.TELEGRAM, - source_name=channel_name, - title=msg.get('text', '')[:100], - content=msg.get('text', ''), - url=msg.get('url', f"https://t.me/{channel_username}"), - extra={"views": msg.get('views', 0), "channel_username": channel_username}, - ) - - -def from_rss(article: dict) -> UnifiedContent: - return UnifiedContent( - source_type=SourceType.RSS, - source_name=article.get('source', ''), - title=article.get('title', ''), - content=article.get('summary', ''), - url=article.get('url', article.get('link', '')), - score=article.get('score', 0), - category=article.get('category', ''), - title_cn=article.get('title_cn'), - content_cn=article.get('summary_cn'), - ) - - -def from_bilibili(video: dict) -> UnifiedContent: - return UnifiedContent( - source_type=SourceType.BILIBILI, - source_name=video.get('author', ''), - title=video.get('title', ''), - content=video.get('description', ''), - url=video.get('url', ''), - media_type=MediaType.VIDEO, - media_url=video.get('cover', ''), - extra={ - "bvid": video.get('bvid', ''), - "duration": video.get('duration', 0), - "view_count": video.get('view_count', 0), - }, - ) - - -def from_twitter(data: dict) -> UnifiedContent: - return UnifiedContent( - source_type=SourceType.TWITTER, - source_name=data.get('author', ''), - title=data.get('text', '')[:100], - content=data.get('text', ''), - url=data.get('url', ''), - extra={ - "likes": data.get('likes', 0), - "retweets": data.get('retweets', 0), - }, - ) - - -def from_wechat(article: dict) -> UnifiedContent: - return UnifiedContent( - source_type=SourceType.WECHAT, - source_name=article.get('author', ''), - title=article.get('title', ''), - content=article.get('content', ''), - url=article.get('url', ''), - ) - - -def from_xiaohongshu(note: dict) -> UnifiedContent: - return UnifiedContent( - source_type=SourceType.XIAOHONGSHU, - source_name=note.get('author', ''), - title=note.get('title', ''), - content=note.get('content', ''), - url=note.get('url', ''), - media_type=MediaType.IMAGE if note.get('images') else MediaType.TEXT, - extra={ - "likes": note.get('likes', 0), - "collects": note.get('collects', 0), - }, - ) - - -def from_youtube(video: dict) -> UnifiedContent: - return UnifiedContent( - source_type=SourceType.YOUTUBE, - source_name=video.get('author', ''), - title=video.get('title', ''), - content=video.get('description', ''), - url=video.get('url', ''), - media_type=MediaType.VIDEO, - extra={ - "duration": video.get('duration', ''), - "view_count": video.get('view_count', 0), - }, - ) - - -def from_manual(title: str, content: str, url: str = "") -> UnifiedContent: - return UnifiedContent( - source_type=SourceType.MANUAL, - source_name="manual", - title=title, - content=content, - url=url or f"manual://{hashlib.md5(title.encode()).hexdigest()[:8]}", - ) - - -# ============================================================================= -# Unified Inbox -# ============================================================================= - -class UnifiedInbox: - """JSON-based content inbox with dedup.""" - - def __init__(self, filepath: str = "unified_inbox.json"): - self.filepath = filepath - self.items: List[UnifiedContent] = [] - self.load() - - def load(self): - import os - if os.path.exists(self.filepath): - try: - with open(self.filepath, 'r', encoding='utf-8') as f: - data = json.load(f) - self.items = [UnifiedContent.from_dict(d) for d in data] - except (json.JSONDecodeError, IOError): - self.items = [] - - def save(self): - with open(self.filepath, 'w', encoding='utf-8') as f: - json.dump([item.to_dict() for item in self.items], f, - ensure_ascii=False, indent=2) - - def add(self, item: UnifiedContent) -> bool: - if any(i.id == item.id for i in self.items): - return False - self.items.append(item) - return True - - def add_batch(self, items: List[UnifiedContent]) -> int: - return sum(1 for item in items if self.add(item)) - - def get_unprocessed(self) -> List[UnifiedContent]: - return [i for i in self.items if not i.processed] - - def get_by_source(self, source_type: SourceType) -> List[UnifiedContent]: - return [i for i in self.items if i.source_type == source_type] - - def mark_processed(self, item_id: str, digest_date: str = None): - for item in self.items: - if item.id == item_id: - item.processed = True - if digest_date: - item.digest_date = digest_date - break - - def clear_old(self, days: int = 7): - cutoff = (datetime.now() - timedelta(days=days)).isoformat() - self.items = [i for i in self.items if i.fetched_at > cutoff] diff --git a/agent_eyes/search/__init__.py b/agent_eyes/search/__init__.py deleted file mode 100644 index c7ccd8a..0000000 --- a/agent_eyes/search/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# -*- coding: utf-8 -*- -"""Search module init.""" - -from agent_eyes.search.exa import search_web -from agent_eyes.search.reddit import search_reddit -from agent_eyes.search.github import search_github, search_github_issues -from agent_eyes.search.twitter import search_twitter, get_user_tweets - -__all__ = [ - "search_web", - "search_reddit", - "search_github", - "search_github_issues", - "search_twitter", - "get_user_tweets", -] diff --git a/agent_eyes/search/exa.py b/agent_eyes/search/exa.py deleted file mode 100644 index e41b7e2..0000000 --- a/agent_eyes/search/exa.py +++ /dev/null @@ -1,79 +0,0 @@ -# -*- coding: utf-8 -*- -"""Exa semantic web search. - -Get a free API key at https://exa.ai (1000 searches/month free). -""" - -import os -import requests -from loguru import logger -from typing import Any, Dict, List, Optional - - -EXA_API_URL = "https://api.exa.ai/search" - - -def _get_api_key(config=None) -> str: - """Get Exa API key from config or env.""" - if config: - key = config.get("exa_api_key") - if key: - return key - key = os.environ.get("EXA_API_KEY") - if key: - return key - raise ValueError( - "Exa API key not configured.\n" - "Get a free key at https://exa.ai (1000 searches/month free)\n" - "Then run: agent-eyes setup" - ) - - -async def search_web( - query: str, - num_results: int = 5, - search_type: str = "auto", - config=None, -) -> List[Dict[str, Any]]: - """ - Semantic web search via Exa. - - Args: - query: Search query (supports site: prefix) - num_results: Number of results (default 5, max 10) - search_type: "auto" / "neural" / "keyword" - config: Optional Config instance - - Returns: - List of {title, url, snippet, published_date, score} - """ - api_key = _get_api_key(config) - logger.info(f"Exa search: {query} (n={num_results})") - - resp = requests.post( - EXA_API_URL, - headers={ - "Content-Type": "application/json", - "x-api-key": api_key, - }, - json={ - "query": query, - "numResults": min(num_results, 10), - "type": search_type, - "contents": {"text": {"maxCharacters": 500}}, - }, - timeout=15, - ) - resp.raise_for_status() - data = resp.json() - - results = [] - for item in data.get("results", []): - results.append({ - "title": item.get("title", ""), - "url": item.get("url", ""), - "snippet": item.get("text", ""), - "published_date": item.get("publishedDate", ""), - "score": item.get("score", 0), - }) - return results diff --git a/agent_eyes/search/github.py b/agent_eyes/search/github.py deleted file mode 100644 index 2ed8204..0000000 --- a/agent_eyes/search/github.py +++ /dev/null @@ -1,118 +0,0 @@ -# -*- coding: utf-8 -*- -"""GitHub search via public API (no key required).""" - -import os -import requests -from loguru import logger -from typing import Any, Dict, List, Optional - - -GITHUB_API = "https://api.github.com" - - -def _get_headers(config=None) -> dict: - """Get GitHub API headers with optional auth token.""" - headers = {"Accept": "application/vnd.github+json"} - token = None - if config: - token = config.get("github_token") - if not token: - token = os.environ.get("GITHUB_TOKEN") - if token: - headers["Authorization"] = f"Bearer {token}" - return headers - - -async def search_github( - query: str, - language: Optional[str] = None, - sort: str = "stars", - limit: int = 5, - config=None, -) -> List[Dict[str, Any]]: - """ - Search GitHub repositories. - - Args: - query: Search query - language: Filter by language (e.g. "python") - sort: Sort by "stars" / "forks" / "updated" - limit: Number of results (max 30) - config: Optional Config instance - - Returns: - List of {name, url, description, stars, language, updated} - """ - q = query - if language: - q += f" language:{language}" - - logger.info(f"GitHub search: {q} (n={limit})") - - resp = requests.get( - f"{GITHUB_API}/search/repositories", - headers=_get_headers(config), - params={"q": q, "sort": sort, "per_page": min(limit, 30)}, - timeout=15, - ) - resp.raise_for_status() - data = resp.json() - - results = [] - for repo in data.get("items", []): - results.append({ - "name": repo.get("full_name", ""), - "url": repo.get("html_url", ""), - "description": repo.get("description", ""), - "stars": repo.get("stargazers_count", 0), - "forks": repo.get("forks_count", 0), - "language": repo.get("language", ""), - "updated": repo.get("updated_at", ""), - "topics": repo.get("topics", []), - }) - return results - - -async def search_github_issues( - query: str, - repo: Optional[str] = None, - state: str = "open", - limit: int = 5, - config=None, -) -> List[Dict[str, Any]]: - """ - Search GitHub issues and discussions. - - Args: - query: Search query - repo: Filter by repo (e.g. "owner/repo") - state: "open" / "closed" - limit: Number of results - config: Optional Config instance - """ - q = query - if repo: - q += f" repo:{repo}" - q += f" state:{state}" - - resp = requests.get( - f"{GITHUB_API}/search/issues", - headers=_get_headers(config), - params={"q": q, "sort": "reactions", "per_page": min(limit, 30)}, - timeout=15, - ) - resp.raise_for_status() - data = resp.json() - - results = [] - for issue in data.get("items", []): - results.append({ - "title": issue.get("title", ""), - "url": issue.get("html_url", ""), - "body": (issue.get("body", "") or "")[:500], - "state": issue.get("state", ""), - "comments": issue.get("comments", 0), - "reactions": issue.get("reactions", {}).get("total_count", 0), - "created": issue.get("created_at", ""), - }) - return results diff --git a/agent_eyes/search/reddit.py b/agent_eyes/search/reddit.py deleted file mode 100644 index 41e8a9f..0000000 --- a/agent_eyes/search/reddit.py +++ /dev/null @@ -1,30 +0,0 @@ -# -*- coding: utf-8 -*- -"""Reddit search via Exa (bypasses Reddit IP blocks).""" - -from typing import Any, Dict, List, Optional -from agent_eyes.search.exa import search_web - - -async def search_reddit( - query: str, - subreddit: Optional[str] = None, - num_results: int = 10, - config=None, -) -> List[Dict[str, Any]]: - """ - Search Reddit content via Exa semantic search. - - Args: - query: Search query - subreddit: Optional subreddit (e.g. "LocalLLaMA") - num_results: Number of results - config: Optional Config instance - - Returns: - List of {title, url, snippet, published_date, score} - """ - if subreddit: - full_query = f"site:reddit.com/r/{subreddit} {query}" - else: - full_query = f"site:reddit.com {query}" - return await search_web(full_query, num_results=num_results, config=config) diff --git a/agent_eyes/search/twitter.py b/agent_eyes/search/twitter.py deleted file mode 100644 index 1c28c8a..0000000 --- a/agent_eyes/search/twitter.py +++ /dev/null @@ -1,140 +0,0 @@ -# -*- coding: utf-8 -*- -"""Twitter search — uses birdx if available, falls back to Exa.""" - -import json -import shutil -import subprocess -from loguru import logger -from typing import Any, Dict, List, Optional - - -async def search_twitter( - query: str, - limit: int = 10, - config=None, -) -> List[Dict[str, Any]]: - """ - Search Twitter/X content. - - Strategy: - 1. If birdx is installed → use it (full search, timeline, threads) - 2. Otherwise → use Exa with site:x.com (basic search) - - Args: - query: Search query - limit: Number of results - config: Optional Config instance - - Returns: - List of {author, text, url, likes, retweets, date} - """ - if shutil.which("birdx"): - return await _search_birdx(query, limit) - else: - return await _search_exa(query, limit, config) - - -async def _search_birdx(query: str, limit: int) -> List[Dict[str, Any]]: - """Search Twitter via birdx CLI.""" - logger.info(f"birdx search: {query} (n={limit})") - try: - # birdx --json returns [] for search, so use plain text output - result = subprocess.run( - ["birdx", "search", query, "-n", str(limit)], - capture_output=True, text=True, timeout=30, - ) - if result.returncode != 0: - logger.error(f"birdx search failed: {result.stderr}") - return [] - return _parse_birdx_text(result.stdout) - except (subprocess.TimeoutExpired, FileNotFoundError) as e: - logger.error(f"birdx search failed: {e}") - return [] - - -def _parse_birdx_text(text: str) -> List[Dict[str, Any]]: - """Parse birdx plain text output into structured data. - - Format: - @handle (Display Name): - Tweet text here - possibly multiple lines - date: Mon Feb 24 12:00:00 +0000 2026 - url: https://x.com/handle/status/123 - ────────────────────────────────────────────────── - """ - results = [] - current: Dict[str, Any] = {} - text_lines = [] - - for line in text.strip().split("\n"): - line = line.strip() - - # Separator between tweets - if line.startswith("─"): - if current: - if text_lines: - current["text"] = "\n".join(text_lines).strip() - results.append(current) - current = {} - text_lines = [] - continue - - # Author line: @handle (Display Name): - if line.startswith("@") and line.endswith(":") and "(" in line: - handle = line.split()[0] - current["author"] = handle - continue - - # Date line - if line.startswith("date:"): - current["date"] = line[5:].strip() - continue - - # URL line - if line.startswith("url:"): - current["url"] = line[4:].strip() - continue - - # Content line - if current: - text_lines.append(line) - - # Last tweet - if current: - if text_lines: - current["text"] = "\n".join(text_lines).strip() - results.append(current) - - return results - - -async def _search_exa(query: str, limit: int, config=None) -> List[Dict[str, Any]]: - """Search Twitter via Exa (site:x.com).""" - from agent_eyes.search.exa import search_web - return await search_web( - f"site:x.com {query}", - num_results=limit, - config=config, - ) - - -async def get_user_tweets( - username: str, - limit: int = 10, -) -> List[Dict[str, Any]]: - """Get recent tweets from a user (requires birdx).""" - if not shutil.which("birdx"): - raise RuntimeError( - "birdx not installed. Install: pip install birdx\n" - "Then configure cookies: agent-eyes setup" - ) - try: - result = subprocess.run( - ["birdx", "user-tweets", f"@{username.lstrip('@')}", "-n", str(limit)], - capture_output=True, text=True, timeout=30, - ) - return _parse_birdx_text(result.stdout) - except subprocess.TimeoutExpired: - logger.error("birdx timed out") - return [] diff --git a/agent_eyes/utils/__init__.py b/agent_eyes/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/agent_eyes/utils/storage.py b/agent_eyes/utils/storage.py deleted file mode 100644 index 4b481c0..0000000 --- a/agent_eyes/utils/storage.py +++ /dev/null @@ -1,88 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Storage utilities — save content to JSON inbox and optional Markdown file. - -Implements the "atomic archiving" from the tweet: -- unified_inbox.json (for AI/programmatic use) -- markdown file (for human reading, e.g. Obsidian) -""" - -import json -import os -from datetime import datetime -from pathlib import Path -from loguru import logger - -from agent_eyes.schema import UnifiedContent - - -def save_to_json(item: UnifiedContent, filepath: str = "unified_inbox.json"): - """Append content to JSON inbox file.""" - path = Path(filepath) - data = [] - - if path.exists(): - try: - with open(path, 'r', encoding='utf-8') as f: - data = json.load(f) - except (json.JSONDecodeError, IOError): - data = [] - - data.append(item.to_dict()) - - # Keep last 500 entries to prevent unbounded growth - data = data[-500:] - - with open(path, 'w', encoding='utf-8') as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - logger.info(f"Saved to JSON: {path}") - - -def save_to_markdown(item: UnifiedContent, filepath: str = None): - """ - Append content to a Markdown file (e.g. Obsidian vault). - - Supports two output modes: - - OUTPUT_DIR: Write to {OUTPUT_DIR}/content_hub.md - - OBSIDIAN_VAULT: Write to {OBSIDIAN_VAULT}/01-收集箱/x-reader-inbox.md - - If neither is set, skips markdown output. - """ - if not filepath: - # Priority 1: Obsidian vault - vault_path = os.getenv("OBSIDIAN_VAULT", "") - if vault_path: - filepath = os.path.join(vault_path, "01-收集箱", "x-reader-inbox.md") - else: - # Priority 2: generic output dir - output_dir = os.getenv("OUTPUT_DIR", "") - if not output_dir: - return - filepath = os.path.join(output_dir, "content_hub.md") - - path = Path(filepath) - path.parent.mkdir(parents=True, exist_ok=True) - - emoji = { - "telegram": "📢", "rss": "📰", "bilibili": "🎬", - "xhs": "📕", "twitter": "🐦", "wechat": "💬", - "youtube": "▶️", "manual": "✏️", - }.get(item.source_type.value, "📄") - - with open(path, 'a', encoding='utf-8') as f: - f.write(f"\n## {emoji} {item.title}\n") - f.write(f"- Source: {item.source_name} ({item.source_type.value})\n") - f.write(f"- URL: {item.url}\n") - f.write(f"- Fetched: {item.fetched_at[:16]}\n\n") - f.write(f"{item.content[:2000]}\n") - f.write("\n---\n") - - logger.info(f"Saved to Markdown: {path}") - - -def save_content(item: UnifiedContent, json_path: str = None, md_path: str = None): - """Save content to both JSON and Markdown.""" - inbox_file = json_path or os.getenv("INBOX_FILE", "unified_inbox.json") - save_to_json(item, inbox_file) - save_to_markdown(item, md_path) diff --git a/tests/test_channels.py b/tests/test_channels.py new file mode 100644 index 0000000..8dd1dc6 --- /dev/null +++ b/tests/test_channels.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +"""Tests for the channel system.""" + +import pytest +from unittest.mock import patch, MagicMock + +from agent_eyes.channels import get_channel_for_url, get_channel, get_all_channels +from agent_eyes.channels.base import ReadResult, SearchResult + + +class TestChannelRouting: + def test_github_url(self): + ch = get_channel_for_url("https://github.com/openai/gpt-4") + assert ch.name == "github" + + def test_twitter_url(self): + ch = get_channel_for_url("https://x.com/elonmusk/status/123") + assert ch.name == "twitter" + + def test_youtube_url(self): + ch = get_channel_for_url("https://youtube.com/watch?v=abc") + assert ch.name == "youtube" + + def test_reddit_url(self): + ch = get_channel_for_url("https://reddit.com/r/test") + assert ch.name == "reddit" + + def test_bilibili_url(self): + ch = get_channel_for_url("https://bilibili.com/video/BV1xx") + assert ch.name == "bilibili" + + def test_rss_url(self): + ch = get_channel_for_url("https://example.com/feed.xml") + assert ch.name == "rss" + + def test_generic_url_fallback(self): + ch = get_channel_for_url("https://example.com") + assert ch.name == "web" + + def test_get_channel_by_name(self): + ch = get_channel("github") + assert ch is not None + assert ch.name == "github" + + def test_all_channels_registered(self): + channels = get_all_channels() + names = [ch.name for ch in channels] + assert "web" in names + assert "github" in names + assert "twitter" in names + + +class TestReadResult: + def test_to_dict(self): + r = ReadResult(title="Test", content="Body", url="https://example.com", platform="web") + d = r.to_dict() + assert d["title"] == "Test" + assert d["content"] == "Body" + assert d["platform"] == "web" + + def test_to_dict_optional_fields(self): + r = ReadResult(title="T", content="C", url="u", author="A", date="2025-01-01") + d = r.to_dict() + assert d["author"] == "A" + assert d["date"] == "2025-01-01" + + +class TestSearchResult: + def test_to_dict(self): + r = SearchResult(title="Test", url="https://example.com", snippet="A snippet") + d = r.to_dict() + assert d["title"] == "Test" + assert d["snippet"] == "A snippet" + + +class TestGitHubChannel: + @patch("agent_eyes.channels.github.requests.get") + @pytest.mark.asyncio + async def test_search(self, mock_get): + mock_resp = MagicMock() + mock_resp.json.return_value = { + "items": [{"full_name": "test/repo", "html_url": "https://github.com/test/repo", + "description": "A test", "stargazers_count": 100, "forks_count": 10, + "language": "Python", "updated_at": "2025-01-01"}] + } + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + ch = get_channel("github") + results = await ch.search("test query") + assert len(results) == 1 + assert results[0].title == "test/repo" + + +class TestExaSearch: + @patch("agent_eyes.channels.exa_search.requests.post") + @pytest.mark.asyncio + async def test_search(self, mock_post): + from agent_eyes.config import Config + config = Config(config_path="/tmp/test-exa-config.yaml") + config.set("exa_api_key", "test-key") + + mock_resp = MagicMock() + mock_resp.json.return_value = { + "results": [{"title": "Result", "url": "https://example.com", + "text": "snippet", "publishedDate": "", "score": 0.9}] + } + mock_resp.raise_for_status = MagicMock() + mock_post.return_value = mock_resp + + ch = get_channel("exa_search") + results = await ch.search("test", config=config) + assert len(results) == 1 + assert results[0].title == "Result" diff --git a/tests/test_core.py b/tests/test_core.py index 99e4bff..08560a0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -2,9 +2,6 @@ """Tests for AgentEyes core class.""" import pytest -from unittest.mock import patch, MagicMock, AsyncMock -from pathlib import Path - from agent_eyes.config import Config from agent_eyes.core import AgentEyes @@ -18,22 +15,20 @@ def eyes(tmp_path): class TestAgentEyes: def test_init(self, eyes): assert eyes.config is not None - assert eyes.reader is not None def test_detect_platform(self, eyes): - assert eyes.detect_platform("https://github.com/openai/gpt-4") == "github" + assert eyes.detect_platform("https://github.com/test/repo") == "github" assert eyes.detect_platform("https://reddit.com/r/test") == "reddit" - assert eyes.detect_platform("https://x.com/elonmusk/status/123") == "twitter" + assert eyes.detect_platform("https://x.com/user/status/123") == "twitter" assert eyes.detect_platform("https://youtube.com/watch?v=abc") == "youtube" assert eyes.detect_platform("https://bilibili.com/video/BV1xx") == "bilibili" - assert eyes.detect_platform("https://mp.weixin.qq.com/s/abc") == "wechat" - assert eyes.detect_platform("https://example.com") == "generic" + assert eyes.detect_platform("https://example.com") == "web" def test_doctor(self, eyes): results = eyes.doctor() assert isinstance(results, dict) assert "web" in results - assert "github_read" in results + assert "github" in results def test_doctor_report(self, eyes): report = eyes.doctor_report() diff --git a/tests/test_doctor.py b/tests/test_doctor.py index 8a1aa8b..e0a92a6 100644 --- a/tests/test_doctor.py +++ b/tests/test_doctor.py @@ -1,11 +1,9 @@ # -*- coding: utf-8 -*- -"""Tests for Agent Eyes doctor module.""" +"""Tests for doctor module.""" import pytest -from unittest.mock import patch, MagicMock - from agent_eyes.config import Config -from agent_eyes.doctor import check_all, format_report, STATUS_OK, STATUS_OFF +from agent_eyes.doctor import check_all, format_report @pytest.fixture @@ -14,54 +12,25 @@ def tmp_config(tmp_path): class TestDoctor: - def test_zero_config_platforms_always_ok(self, tmp_config): + def test_zero_config_channels_ok(self, tmp_config): results = check_all(tmp_config) - assert results["web"]["status"] == STATUS_OK - assert results["github_read"]["status"] == STATUS_OK - assert results["bilibili"]["status"] == STATUS_OK - assert results["rss"]["status"] == STATUS_OK - assert results["tweet_read"]["status"] == STATUS_OK + assert results["web"]["status"] == "ok" + assert results["github"]["status"] == "ok" + assert results["bilibili"]["status"] == "ok" + assert results["rss"]["status"] == "ok" - def test_search_off_without_exa_key(self, tmp_config): + def test_exa_off_without_key(self, tmp_config): results = check_all(tmp_config) - assert results["search_web"]["status"] == STATUS_OFF - assert results["search_reddit"]["status"] == STATUS_OFF - assert results["search_twitter"]["status"] == STATUS_OFF + assert results["exa_search"]["status"] == "off" - def test_search_on_with_exa_key(self, tmp_config): + def test_exa_on_with_key(self, tmp_config): tmp_config.set("exa_api_key", "test-key") results = check_all(tmp_config) - assert results["search_web"]["status"] == STATUS_OK - assert results["search_reddit"]["status"] == STATUS_OK - assert results["search_twitter"]["status"] == STATUS_OK + assert results["exa_search"]["status"] == "ok" - def test_github_search_always_on(self, tmp_config): - results = check_all(tmp_config) - assert results["search_github"]["status"] == STATUS_OK - - def test_reddit_full_off_without_proxy(self, tmp_config): - results = check_all(tmp_config) - assert results["reddit_full"]["status"] == STATUS_OFF - - def test_reddit_full_on_with_proxy(self, tmp_config): - tmp_config.set("reddit_proxy", "http://user:pass@ip:port") - results = check_all(tmp_config) - assert results["reddit_full"]["status"] == STATUS_OK - - @patch("agent_eyes.doctor._check_command") - def test_birdx_detection(self, mock_cmd, tmp_config): - mock_cmd.return_value = True - results = check_all(tmp_config) - assert results["twitter_advanced"]["status"] == STATUS_OK - - def test_format_report_is_string(self, tmp_config): + def test_format_report(self, tmp_config): results = check_all(tmp_config) report = format_report(results) - assert isinstance(report, str) assert "Agent Eyes" in report assert "✅" in report - - def test_format_report_shows_count(self, tmp_config): - results = check_all(tmp_config) - report = format_report(results) - assert "platforms active" in report + assert "channels active" in report diff --git a/tests/test_search.py b/tests/test_search.py deleted file mode 100644 index 7a74c89..0000000 --- a/tests/test_search.py +++ /dev/null @@ -1,142 +0,0 @@ -# -*- coding: utf-8 -*- -"""Tests for Agent Eyes search modules.""" - -import pytest -from unittest.mock import patch, MagicMock - -from agent_eyes.config import Config - - -@pytest.fixture -def tmp_config(tmp_path): - c = Config(config_path=tmp_path / "config.yaml") - c.set("exa_api_key", "test-key") - return c - - -class TestExaSearch: - @patch("agent_eyes.search.exa.requests.post") - @pytest.mark.asyncio - async def test_search_web(self, mock_post, tmp_config): - mock_resp = MagicMock() - mock_resp.json.return_value = { - "results": [ - { - "title": "Test Result", - "url": "https://example.com", - "text": "This is a test snippet", - "publishedDate": "2025-01-01", - "score": 0.95, - } - ] - } - mock_resp.raise_for_status = MagicMock() - mock_post.return_value = mock_resp - - from agent_eyes.search.exa import search_web - results = await search_web("test query", config=tmp_config) - - assert len(results) == 1 - assert results[0]["title"] == "Test Result" - assert results[0]["url"] == "https://example.com" - assert results[0]["snippet"] == "This is a test snippet" - - def test_search_web_requires_key(self): - from agent_eyes.search.exa import _get_api_key - with pytest.raises(ValueError, match="Exa API key"): - _get_api_key(Config(config_path="/tmp/nonexistent/config.yaml")) - - -class TestRedditSearch: - @patch("agent_eyes.search.exa.requests.post") - @pytest.mark.asyncio - async def test_search_reddit(self, mock_post, tmp_config): - mock_resp = MagicMock() - mock_resp.json.return_value = {"results": []} - mock_resp.raise_for_status = MagicMock() - mock_post.return_value = mock_resp - - from agent_eyes.search.reddit import search_reddit - results = await search_reddit("test", config=tmp_config) - - # Verify it searched site:reddit.com - call_args = mock_post.call_args - query = call_args[1]["json"]["query"] - assert "site:reddit.com" in query - - @patch("agent_eyes.search.exa.requests.post") - @pytest.mark.asyncio - async def test_search_reddit_with_sub(self, mock_post, tmp_config): - mock_resp = MagicMock() - mock_resp.json.return_value = {"results": []} - mock_resp.raise_for_status = MagicMock() - mock_post.return_value = mock_resp - - from agent_eyes.search.reddit import search_reddit - await search_reddit("test", subreddit="LocalLLaMA", config=tmp_config) - - call_args = mock_post.call_args - query = call_args[1]["json"]["query"] - assert "site:reddit.com/r/LocalLLaMA" in query - - -class TestGitHubSearch: - @patch("agent_eyes.search.github.requests.get") - @pytest.mark.asyncio - async def test_search_github(self, mock_get): - mock_resp = MagicMock() - mock_resp.json.return_value = { - "items": [ - { - "full_name": "owner/repo", - "html_url": "https://github.com/owner/repo", - "description": "A test repo", - "stargazers_count": 100, - "forks_count": 20, - "language": "Python", - "updated_at": "2025-01-01", - "topics": ["ai"], - } - ] - } - mock_resp.raise_for_status = MagicMock() - mock_get.return_value = mock_resp - - from agent_eyes.search.github import search_github - results = await search_github("test") - - assert len(results) == 1 - assert results[0]["name"] == "owner/repo" - assert results[0]["stars"] == 100 - - @patch("agent_eyes.search.github.requests.get") - @pytest.mark.asyncio - async def test_search_github_with_language(self, mock_get): - mock_resp = MagicMock() - mock_resp.json.return_value = {"items": []} - mock_resp.raise_for_status = MagicMock() - mock_get.return_value = mock_resp - - from agent_eyes.search.github import search_github - await search_github("test", language="python") - - call_args = mock_get.call_args - assert "language:python" in call_args[1]["params"]["q"] - - -class TestTwitterSearch: - @patch("agent_eyes.search.twitter.shutil.which", return_value=None) - @patch("agent_eyes.search.exa.requests.post") - @pytest.mark.asyncio - async def test_search_twitter_exa_fallback(self, mock_post, mock_which, tmp_config): - mock_resp = MagicMock() - mock_resp.json.return_value = {"results": []} - mock_resp.raise_for_status = MagicMock() - mock_post.return_value = mock_resp - - from agent_eyes.search.twitter import search_twitter - results = await search_twitter("test", config=tmp_config) - - call_args = mock_post.call_args - query = call_args[1]["json"]["query"] - assert "site:x.com" in query