Based on x-reader by @runes_leo (MIT License). Extended with: - Reddit support (posts + comments, proxy support) - GitHub support (repos, issues, PRs) - Web search via Exa semantic search - Reddit search (bypasses IP blocks via Exa) - GitHub search (repos by stars) - Renamed package: x_reader → agent_eyes - New MCP tools: search, search_reddit, search_github - Agent-first positioning and documentation
190 lines
6.1 KiB
Python
190 lines
6.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""GitHub fetcher — extracts repo info, issues, PRs, and README content.
|
|
|
|
Uses GitHub public API (no token needed for public repos).
|
|
For higher rate limits, set GITHUB_TOKEN env var.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import base64
|
|
import requests
|
|
from loguru import logger
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
|
|
API_BASE = "https://api.github.com"
|
|
|
|
|
|
def _get_headers() -> Dict[str, str]:
|
|
"""Get request headers, optionally with auth token."""
|
|
headers = {
|
|
"Accept": "application/vnd.github.v3+json",
|
|
"User-Agent": "AgentEyes/1.0",
|
|
}
|
|
token = os.environ.get("GITHUB_TOKEN")
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
return headers
|
|
|
|
|
|
def _parse_github_url(url: str) -> Dict[str, str]:
|
|
"""Parse GitHub URL into components."""
|
|
# Match: github.com/owner/repo[/type/number]
|
|
match = re.search(
|
|
r'github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(issues|pull|tree|blob)/(.+))?/?$',
|
|
url
|
|
)
|
|
if not match:
|
|
raise ValueError(f"Cannot parse GitHub URL: {url}")
|
|
|
|
return {
|
|
"owner": match.group(1),
|
|
"repo": match.group(2),
|
|
"type": match.group(3), # issues, pull, tree, blob, or None
|
|
"ref": match.group(4), # issue number, branch, file path, or None
|
|
}
|
|
|
|
|
|
async def fetch_github(url: str) -> Dict[str, Any]:
|
|
"""Fetch content from a GitHub URL."""
|
|
logger.info(f"Fetching GitHub: {url}")
|
|
|
|
parsed = _parse_github_url(url)
|
|
owner = parsed["owner"]
|
|
repo = parsed["repo"]
|
|
content_type = parsed["type"]
|
|
ref = parsed["ref"]
|
|
|
|
headers = _get_headers()
|
|
|
|
if content_type == "issues" and ref:
|
|
return await _fetch_issue(owner, repo, ref, headers)
|
|
elif content_type == "pull" and ref:
|
|
return await _fetch_pull(owner, repo, ref, headers)
|
|
else:
|
|
return await _fetch_repo(owner, repo, headers)
|
|
|
|
|
|
async def _fetch_repo(owner: str, repo: str, headers: Dict) -> Dict[str, Any]:
|
|
"""Fetch repo info + README."""
|
|
# Get repo info
|
|
repo_resp = requests.get(f"{API_BASE}/repos/{owner}/{repo}", headers=headers, timeout=10)
|
|
repo_resp.raise_for_status()
|
|
repo_data = repo_resp.json()
|
|
|
|
# Get README
|
|
readme_content = ""
|
|
try:
|
|
readme_resp = requests.get(
|
|
f"{API_BASE}/repos/{owner}/{repo}/readme",
|
|
headers=headers, timeout=10,
|
|
)
|
|
if readme_resp.status_code == 200:
|
|
readme_data = readme_resp.json()
|
|
readme_content = base64.b64decode(readme_data.get("content", "")).decode("utf-8")
|
|
except Exception as e:
|
|
logger.warning(f"Could not fetch README: {e}")
|
|
|
|
return {
|
|
"title": f"{owner}/{repo}",
|
|
"content": readme_content or repo_data.get("description", ""),
|
|
"description": repo_data.get("description", ""),
|
|
"author": owner,
|
|
"url": repo_data.get("html_url", ""),
|
|
"stars": repo_data.get("stargazers_count", 0),
|
|
"forks": repo_data.get("forks_count", 0),
|
|
"language": repo_data.get("language", ""),
|
|
"topics": repo_data.get("topics", []),
|
|
"license": (repo_data.get("license") or {}).get("spdx_id", ""),
|
|
"platform": "github",
|
|
}
|
|
|
|
|
|
async def _fetch_issue(owner: str, repo: str, number: str, headers: Dict) -> Dict[str, Any]:
|
|
"""Fetch a GitHub issue with comments."""
|
|
issue_num = re.match(r'(\d+)', number).group(1)
|
|
|
|
# Get issue
|
|
resp = requests.get(
|
|
f"{API_BASE}/repos/{owner}/{repo}/issues/{issue_num}",
|
|
headers=headers, timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
issue = resp.json()
|
|
|
|
# Get comments
|
|
comments_text = ""
|
|
if issue.get("comments", 0) > 0:
|
|
c_resp = requests.get(
|
|
f"{API_BASE}/repos/{owner}/{repo}/issues/{issue_num}/comments",
|
|
headers=headers, params={"per_page": 20}, timeout=10,
|
|
)
|
|
if c_resp.status_code == 200:
|
|
comments = c_resp.json()
|
|
parts = ["\n---\n## Comments\n"]
|
|
for c in comments:
|
|
parts.append(f"**@{c.get('user', {}).get('login', '?')}**:\n{c.get('body', '')}\n")
|
|
comments_text = "\n".join(parts)
|
|
|
|
return {
|
|
"title": f"[{owner}/{repo}#{issue_num}] {issue.get('title', '')}",
|
|
"content": (issue.get("body", "") or "") + comments_text,
|
|
"author": issue.get("user", {}).get("login", ""),
|
|
"url": issue.get("html_url", ""),
|
|
"state": issue.get("state", ""),
|
|
"labels": [l.get("name", "") for l in issue.get("labels", [])],
|
|
"platform": "github",
|
|
}
|
|
|
|
|
|
async def _fetch_pull(owner: str, repo: str, number: str, headers: Dict) -> Dict[str, Any]:
|
|
"""Fetch a GitHub pull request."""
|
|
pr_num = re.match(r'(\d+)', number).group(1)
|
|
|
|
resp = requests.get(
|
|
f"{API_BASE}/repos/{owner}/{repo}/pulls/{pr_num}",
|
|
headers=headers, timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
pr = resp.json()
|
|
|
|
return {
|
|
"title": f"[{owner}/{repo}#{pr_num}] {pr.get('title', '')}",
|
|
"content": pr.get("body", "") or "",
|
|
"author": pr.get("user", {}).get("login", ""),
|
|
"url": pr.get("html_url", ""),
|
|
"state": pr.get("state", ""),
|
|
"merged": pr.get("merged", False),
|
|
"additions": pr.get("additions", 0),
|
|
"deletions": pr.get("deletions", 0),
|
|
"changed_files": pr.get("changed_files", 0),
|
|
"platform": "github",
|
|
}
|
|
|
|
|
|
async def search_github(query: str, limit: int = 5) -> List[Dict[str, Any]]:
|
|
"""Search GitHub repositories."""
|
|
logger.info(f"Searching GitHub: {query}")
|
|
|
|
resp = requests.get(
|
|
f"{API_BASE}/search/repositories",
|
|
headers=_get_headers(),
|
|
params={"q": query, "sort": "stars", "per_page": limit},
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
results = []
|
|
for item in data.get("items", []):
|
|
results.append({
|
|
"title": item.get("full_name", ""),
|
|
"description": item.get("description", ""),
|
|
"url": item.get("html_url", ""),
|
|
"stars": item.get("stargazers_count", 0),
|
|
"language": item.get("language", ""),
|
|
"updated_at": item.get("updated_at", ""),
|
|
})
|
|
|
|
return results
|