From ee2ad83b1206f439a0df2a22da0dcadc8bae783e Mon Sep 17 00:00:00 2001 From: Panniantong Date: Tue, 24 Feb 2026 03:00:05 +0100 Subject: [PATCH] Initial: forked from runesleo/x-reader (MIT License) - thank you @runes_leo! --- .env.example | 14 ++ .gitignore | 32 ++++ LICENSE | 21 +++ README.md | 247 ++++++++++++++++++++++++++++ mcp_server.py | 102 ++++++++++++ pyproject.toml | 38 +++++ skills/analyzer/skill.md | 131 +++++++++++++++ skills/video/skill.md | 292 ++++++++++++++++++++++++++++++++++ x_reader/__init__.py | 3 + x_reader/cli.py | 139 ++++++++++++++++ x_reader/fetchers/__init__.py | 0 x_reader/fetchers/bilibili.py | 46 ++++++ x_reader/fetchers/browser.py | 88 ++++++++++ x_reader/fetchers/jina.py | 63 ++++++++ x_reader/fetchers/rss.py | 47 ++++++ x_reader/fetchers/telegram.py | 71 +++++++++ x_reader/fetchers/twitter.py | 217 +++++++++++++++++++++++++ x_reader/fetchers/wechat.py | 62 ++++++++ x_reader/fetchers/xhs.py | 78 +++++++++ x_reader/fetchers/youtube.py | 211 ++++++++++++++++++++++++ x_reader/login.py | 91 +++++++++++ x_reader/reader.py | 154 ++++++++++++++++++ x_reader/schema.py | 277 ++++++++++++++++++++++++++++++++ x_reader/utils/__init__.py | 0 x_reader/utils/storage.py | 88 ++++++++++ 25 files changed, 2512 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 mcp_server.py create mode 100644 pyproject.toml create mode 100644 skills/analyzer/skill.md create mode 100644 skills/video/skill.md create mode 100644 x_reader/__init__.py create mode 100644 x_reader/cli.py create mode 100644 x_reader/fetchers/__init__.py create mode 100644 x_reader/fetchers/bilibili.py create mode 100644 x_reader/fetchers/browser.py create mode 100644 x_reader/fetchers/jina.py create mode 100644 x_reader/fetchers/rss.py create mode 100644 x_reader/fetchers/telegram.py create mode 100644 x_reader/fetchers/twitter.py create mode 100644 x_reader/fetchers/wechat.py create mode 100644 x_reader/fetchers/xhs.py create mode 100644 x_reader/fetchers/youtube.py create mode 100644 x_reader/login.py create mode 100644 x_reader/reader.py create mode 100644 x_reader/schema.py create mode 100644 x_reader/utils/__init__.py create mode 100644 x_reader/utils/storage.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..248671e --- /dev/null +++ b/.env.example @@ -0,0 +1,14 @@ +# === Telegram (optional, for TG channel fetching) === +TG_API_ID= +TG_API_HASH= + +# === Groq Whisper (optional, for video/audio transcription) === +# Free API key: https://console.groq.com/keys +GROQ_API_KEY= + +# === Gemini (optional, for AI-powered analysis) === +GEMINI_API_KEY= + +# === Output === +OUTPUT_DIR=./output +INBOX_FILE=./unified_inbox.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..45357b9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ +.venv/ +venv/ + +# Environment +.env + +# Telethon session +*.session +tg_session* + +# Data +*.db +*.log +data/ +unified_inbox.json +output/ + +# Audio/video temp files +/tmp/media_* + +# OS +.DS_Store + +# IDE +.vscode/ +.idea/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5fa59ae --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Leo (@runes_leo) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..d28fc57 --- /dev/null +++ b/README.md @@ -0,0 +1,247 @@ +# x-reader + +[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/) +[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE) + +Universal content reader — fetch, transcribe, and digest content from any platform. + +Give it a URL (article, video, podcast, tweet), get back structured content. Works as CLI, Python library, MCP server, or Claude Code skills. + +## What It Does + +``` +Any URL → Platform Detection → Fetch Content → Unified Output + ↓ ↓ + auto-detect text: Jina Reader + 7+ platforms video: yt-dlp subtitles + audio: Whisper transcription + API: Bilibili / RSS / Telegram +``` + +The Python layer handles text fetching and YouTube subtitle extraction. The **Claude Code skills** (optional) add full Whisper transcription for video/podcast and AI-powered content analysis. + +## Three Layers + +x-reader is composable. Use the layers you need: + +| Layer | What | Format | Install | +|-------|------|--------|---------| +| **Python CLI/Library** | Basic content fetching + unified schema | See [Install](#install) | Required | +| **Claude Code Skills** | Video transcription + AI analysis | Copy `skills/` to `~/.claude/skills/` | Optional | +| **MCP Server** | Expose reading as MCP tools | `python mcp_server.py` | Optional | + +### Layer 1: Python CLI + +```bash +# Fetch any URL +x-reader https://mp.weixin.qq.com/s/abc123 + +# Fetch a tweet +x-reader https://x.com/elonmusk/status/123456 + +# Fetch multiple URLs +x-reader https://url1.com https://url2.com + +# Login to a platform (one-time, for browser fallback) +x-reader login xhs + +# View inbox +x-reader list +``` + +### Layer 2: Claude Code Skills + +> Requires cloning the repo (not included in pip install). + +For video/podcast transcription and content analysis: + +``` +skills/ +├── video/ # YouTube/Bilibili/podcast → full transcript via Whisper +└── analyzer/ # Any content → structured analysis report +``` + +Install: +```bash +cp -r skills/video ~/.claude/skills/video +cp -r skills/analyzer ~/.claude/skills/analyzer +``` + +Then in Claude Code, just send a YouTube/Bilibili/podcast link — the video skill auto-triggers and produces a full transcript + summary. + +### Layer 3: MCP Server + +> Requires cloning the repo (mcp_server.py is not included in pip install). + +```bash +git clone https://github.com/runesleo/x-reader.git +cd x-reader +pip install -e ".[mcp]" +python mcp_server.py +``` + +Tools exposed: +- `read_url(url)` — fetch any URL +- `read_batch(urls)` — fetch multiple URLs concurrently +- `list_inbox()` — view previously fetched content +- `detect_platform(url)` — identify platform from URL + +Claude Code config (`~/.claude/claude_desktop_config.json`): +```json +{ + "mcpServers": { + "x-reader": { + "command": "python", + "args": ["/path/to/x-reader/mcp_server.py"] + } + } +} +``` + +## Supported Platforms + +| Platform | Text Fetch | Video/Audio Transcript | +|----------|-----------|----------------------| +| YouTube | ✅ Jina | ✅ yt-dlp subtitles → Groq Whisper fallback | +| Bilibili (B站) | ✅ API | ✅ via Claude Code skill | +| X / Twitter | ✅ Jina → Playwright | — | +| WeChat (微信公众号) | ✅ Jina → Playwright | — | +| Xiaohongshu (小红书) | ✅ Jina → Playwright* | — | +| Telegram | ✅ Telethon | — | +| RSS | ✅ feedparser | — | +| 小宇宙 (Xiaoyuzhou) | — | ✅ via Claude Code skill | +| Apple Podcasts | — | ✅ via Claude Code skill | +| Any web page | ✅ Jina fallback | — | + +> \*XHS requires a one-time login: `x-reader login xhs` (saves session for Playwright fallback) +> +> YouTube Whisper transcription requires `GROQ_API_KEY` — get a free key from [Groq](https://console.groq.com/keys) + +## Install + +```bash +# From GitHub (recommended) +pip install git+https://github.com/runesleo/x-reader.git + +# With Telegram support +pip install "x-reader[telegram] @ git+https://github.com/runesleo/x-reader.git" + +# With browser fallback (Playwright — for XHS/WeChat anti-scraping) +pip install "x-reader[browser] @ git+https://github.com/runesleo/x-reader.git" +playwright install chromium + +# With all optional dependencies +pip install "x-reader[all] @ git+https://github.com/runesleo/x-reader.git" +playwright install chromium +``` + +Or clone and install locally: +```bash +git clone https://github.com/runesleo/x-reader.git +cd x-reader +pip install -e ".[all]" +playwright install chromium +``` + +### Dependencies for video/audio (optional) + +```bash +# macOS +brew install yt-dlp ffmpeg + +# Linux +pip install yt-dlp +apt install ffmpeg +``` + +For Whisper transcription, get a free API key from [Groq](https://console.groq.com/keys) and set: +```bash +export GROQ_API_KEY=your_key_here +``` + +## Use as Library + +```python +import asyncio +from x_reader.reader import UniversalReader + +async def main(): + reader = UniversalReader() + content = await reader.read("https://mp.weixin.qq.com/s/abc123") + print(content.title) + print(content.content[:200]) + +asyncio.run(main()) +``` + +## Configuration + +Copy `.env.example` to `.env`: + +```bash +cp .env.example .env +``` + +| Variable | Required | Description | +|----------|----------|-------------| +| `TG_API_ID` | Telegram only | From https://my.telegram.org | +| `TG_API_HASH` | Telegram only | From https://my.telegram.org | +| `GROQ_API_KEY` | Whisper only | From https://console.groq.com/keys (free) | +| `INBOX_FILE` | No | Path to inbox JSON (default: `./unified_inbox.json`) | +| `OUTPUT_DIR` | No | Directory for Markdown output (default: disabled) | +| `OBSIDIAN_VAULT` | No | Path to Obsidian vault (writes to `01-收集箱/x-reader-inbox.md`) | + +## Architecture + +``` +x-reader/ +├── x_reader/ # Python package +│ ├── cli.py # CLI entry point +│ ├── reader.py # URL dispatcher (UniversalReader) +│ ├── schema.py # Unified data model (UnifiedContent + Inbox) +│ ├── login.py # Browser login manager (saves sessions) +│ ├── fetchers/ +│ │ ├── jina.py # Jina Reader (universal fallback) +│ │ ├── browser.py # Playwright headless (anti-scraping fallback) +│ │ ├── bilibili.py # Bilibili API +│ │ ├── youtube.py # yt-dlp subtitle extraction +│ │ ├── rss.py # feedparser +│ │ ├── telegram.py # Telethon +│ │ ├── twitter.py # Jina-based +│ │ ├── wechat.py # Jina → Playwright fallback +│ │ └── xhs.py # Jina → Playwright + session fallback +│ └── utils/ +│ └── storage.py # JSON + Markdown dual output +├── skills/ # Claude Code skills +│ ├── video/ # Video/podcast → transcript + summary +│ └── analyzer/ # Content → structured analysis +├── mcp_server.py # MCP server entry point +└── pyproject.toml +``` + +## How the Layers Work Together + +``` +User sends URL + │ + ├─ Text content (article, tweet, WeChat) + │ └─ Python fetcher → UnifiedContent → inbox + │ + ├─ Video (YouTube, Bilibili, X video) + │ ├─ Python fetcher → metadata (title, description) + │ └─ Video skill → full transcript via subtitles/Whisper + │ + ├─ Podcast (小宇宙, Apple Podcasts) + │ └─ Video skill → full transcript via Whisper + │ + └─ Analysis requested + └─ Analyzer skill → structured report + action items +``` + +## Author + +Built by [@runes_leo](https://x.com/runes_leo) — more AI tools at [leolabs.me](https://leolabs.me) + +## License + +MIT diff --git a/mcp_server.py b/mcp_server.py new file mode 100644 index 0000000..1cde552 --- /dev/null +++ b/mcp_server.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +""" +x-reader MCP Server — expose content reading as MCP tools. + +Usage: + python mcp_server.py # stdio transport (for Claude Code) + python mcp_server.py --transport sse # SSE transport (for web clients) + +Claude Code config (~/.claude/claude_desktop_config.json): + { + "mcpServers": { + "x-reader": { + "command": "python", + "args": ["/path/to/x-reader/mcp_server.py"] + } + } + } +""" + +import asyncio +from dotenv import load_dotenv +from mcp.server.fastmcp import FastMCP + +load_dotenv() + +from x_reader.reader import UniversalReader +from x_reader.schema import UnifiedInbox + +mcp = FastMCP( + "x-reader", + instructions="Universal content reader — give it any URL, get structured content back.", +) + +reader = UniversalReader(inbox=UnifiedInbox()) + + +@mcp.tool() +async def read_url(url: str) -> str: + """ + Read content from any URL and return structured result. + + Supports: YouTube, Bilibili, X/Twitter, WeChat, Xiaohongshu, + Telegram, RSS, and any generic web page. + + Returns JSON with: title, content, url, source_type, platform metadata. + """ + import json + + content = await reader.read(url) + result = content.to_dict() + # Keep it readable + return json.dumps(result, ensure_ascii=False, indent=2) + + +@mcp.tool() +async def read_batch(urls: list[str]) -> str: + """ + Read multiple URLs concurrently. Returns JSON array of results. + + Failed URLs are logged but don't block other results. + """ + import json + + contents = await reader.read_batch(urls) + results = [c.to_dict() for c in contents] + return json.dumps(results, ensure_ascii=False, indent=2) + + +@mcp.tool() +async def list_inbox() -> str: + """ + List all items in the content inbox. + + Returns JSON array of previously fetched content. + """ + import json + + items = [item.to_dict() for item in reader.inbox.items] + return json.dumps(items, ensure_ascii=False, indent=2) + + +@mcp.tool() +async def detect_platform(url: str) -> str: + """ + Detect which platform a URL belongs to. + + Returns the platform name: youtube, bilibili, twitter, wechat, + xhs, telegram, rss, or generic. + """ + return reader._detect_platform(url) + + +if __name__ == "__main__": + import sys + + transport = "stdio" + if "--transport" in sys.argv: + idx = sys.argv.index("--transport") + if idx + 1 < len(sys.argv): + transport = sys.argv[idx + 1] + + mcp.run(transport=transport) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..3155637 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,38 @@ +[project] +name = "x-reader" +version = "0.2.0" +description = "Universal content reader — fetch, normalize, and digest content from 7+ platforms" +readme = "README.md" +license = {text = "MIT"} +requires-python = ">=3.10" +authors = [ + {name = "Leo", email = "runes.leo@gmail.com"} +] +keywords = ["content-reader", "rss", "telegram", "bilibili", "xiaohongshu", "digest"] +dependencies = [ + "requests>=2.28", + "feedparser>=6.0", + "python-dotenv>=1.0", + "loguru>=0.7", +] + +[project.urls] +Homepage = "https://github.com/runesleo/x-reader" +Repository = "https://github.com/runesleo/x-reader" +Issues = "https://github.com/runesleo/x-reader/issues" + +[project.optional-dependencies] +telegram = ["telethon>=1.34"] +mcp = ["mcp[cli]>=1.0"] +browser = ["playwright>=1.40"] +all = ["telethon>=1.34", "mcp[cli]>=1.0", "playwright>=1.40"] + +[project.scripts] +x-reader = "x_reader.cli:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["x_reader"] diff --git a/skills/analyzer/skill.md b/skills/analyzer/skill.md new file mode 100644 index 0000000..ed1953b --- /dev/null +++ b/skills/analyzer/skill.md @@ -0,0 +1,131 @@ +# Content Analyzer Skill + +> Any content → structured analysis report with actionable insights + +## Trigger + +When user sends content (URL, text, or transcript) with analysis intent: +- `/analyze [URL]` +- "Analyze this article" +- "What are the key takeaways?" +- Auto-triggered after video/podcast transcription (from video skill) + +## Pipeline + +### Step 1: Get Content + +Choose tool based on input type: + +| Input | Tool | +|-------|------| +| Tweet URL | `fetch_tweet` or Jina Reader | +| Web URL | `WebFetch` or Jina Reader | +| Local file | Read file directly | +| Transcript from video skill | Use directly | + +### Step 2: Multi-Dimensional Analysis + +Scan content across these dimensions. Only output dimensions with actual content — skip empty ones. + +```markdown +## 📖 Summary + +[1-3 sentence core thesis] + +**Source**: [author/publisher] · [date] +**Type**: [tweet/article/video/podcast/report] + +--- + +## 💡 Key Insights + +### 🎯 Core Arguments +- **Thesis**: [Main argument or finding] +- **Evidence**: [Supporting data or reasoning] +- **Strength**: [How convincing? What's missing?] + +### 🤖 Tools & Methods +- **What**: [Tools, frameworks, or techniques mentioned] +- **How**: [How they're used or applied] +- **Relevance**: [Could you use this?] + +### ⚙️ Workflow Ideas +- **Optimization**: [Process improvements mentioned] +- **Automation**: [What could be automated] +- **Integration**: [How to fit into existing workflow] + +### 📊 Data & Numbers +- **Key metrics**: [Important numbers mentioned] +- **Trends**: [Patterns in the data] +- **Gaps**: [What data is missing] + +### ⚠️ Risks & Warnings +- **Pitfalls**: [Explicitly mentioned risks] +- **Blind spots**: [What the author might be missing] +- **Counter-arguments**: [Alternative perspectives] + +### 🔗 Resources +- **Tools/APIs**: [Mentioned tools or data sources] +- **People**: [Worth following or referencing] +- **Further reading**: [Related content] + +### 🧠 Mental Model Shifts +- **Before**: [Common assumption] +- **After**: [New understanding from this content] +- **Impact**: [How this changes decisions] + +--- + +## ✅ Action Items + +### Quick Wins (< 30 min) +- [ ] [Action 1] — Impact: ★★★★ | Effort: Easy +- [ ] [Action 2] — Impact: ★★★ | Effort: Easy + +### Deeper Work (1-3 hours) +- [ ] [Action 3] — Impact: ★★★ | Effort: Medium +- [ ] [Action 4] — Impact: ★★ | Effort: Medium + +### Exploration (needs validation) +- [ ] [Action 5] — Impact: ★★★ | Effort: Hard | Nature: Exploratory +``` + +### Step 3: Personalized Relevance (Customizable) + +Map insights to YOUR context. Edit the dimensions below to match your own projects, interests, and systems. + +```markdown +## 🔄 How This Applies to Me + +### My Projects +- **[Project A]**: [How this insight connects] +- **[Project B]**: [What I could apply] + +### My Knowledge Base +- **Update**: [Which notes/docs to update] +- **New entry**: [What to add to my knowledge system] + +### My Decision Log +- **Changed my mind about**: [what and why] +- **Confirmed my belief that**: [what] +``` + +> **Customization**: Edit the dimensions in Step 2 and Step 3 to match your own +> domain. A trader might add "Market Impact" and "Risk Assessment". A developer +> might add "Architecture Patterns" and "Tech Debt". Make it yours. + +## Output Modes + +| Mode | Trigger | Output | +|------|---------|--------| +| **Full** (default) | `/analyze [URL]` | All dimensions | +| **Sparse** | `/analyze [URL] --sparse` | Only hit dimensions, skip empty | +| **Brief** | `/analyze [URL] --brief` | Action items only | + +## Best Practices + +1. **Scan all dimensions, but don't force-fill** — skip empty dimensions cleanly +2. **Actions must be specific** — not "learn about X" but "read X docs chapter Y" +3. **Distinguish fact from opinion** — mark the author's claims vs verified facts +4. **Source everything** — tag where each insight comes from in the original content +5. **ROI awareness** — not every action is worth doing, assess effort vs impact diff --git a/skills/video/skill.md b/skills/video/skill.md new file mode 100644 index 0000000..9729802 --- /dev/null +++ b/skills/video/skill.md @@ -0,0 +1,292 @@ +# Video & Podcast Digest Skill + +> Send a video/podcast link → get full transcript + structured summary + +## Supported Platforms + +| Platform | Type | Subtitles | Whisper Transcription | +|----------|------|-----------|----------------------| +| YouTube | Video | ✅ | ✅ | +| Bilibili | Video | ✅ | ✅ | +| X/Twitter | Video | ❌ | ✅ | +| Xiaoyuzhou (小宇宙) | Podcast | ❌ | ✅ | +| Apple Podcasts | Podcast | ❌ | ✅ | +| Direct links (mp3/mp4/m3u8) | Any | ❌ | ✅ | + +## Trigger + +Auto-triggered when a media URL is detected: +- YouTube: `youtube.com`, `youtu.be` +- Bilibili: `bilibili.com`, `b23.tv` +- X/Twitter: `x.com`, `twitter.com` (tweets with video) +- Xiaoyuzhou: `xiaoyuzhoufm.com` +- Apple Podcasts: `podcasts.apple.com` +- Direct: `.mp3`, `.mp4`, `.m3u8`, `.m4a`, `.webm` + +## Pipeline + +### Step 0: Detect Media Type + +| URL Pattern | Type | Pipeline | +|-------------|------|----------| +| `xiaoyuzhoufm.com/episode/` | Podcast | → Step 1b (Xiaoyuzhou) | +| `podcasts.apple.com` | Podcast | → Step 1c (Apple) | +| `bilibili.com`, `b23.tv` | Video | → Step 1d (Bilibili API) | +| `.mp3`, `.m4a` direct link | Audio | → Step 2b (direct download) | +| Other | Video | → Step 1a (subtitle extraction) | + +### Step 1a: Video — Extract Subtitles + +```bash +# Clean up temp files +rm -f /tmp/media_sub*.vtt /tmp/media_audio.mp3 /tmp/media_transcript*.json /tmp/media_segment_*.mp3 2>/dev/null || true + +# YouTube (prefer English, fallback Chinese) +yt-dlp --skip-download --write-auto-sub --sub-lang "en,zh-Hans" -o "/tmp/media_sub" "VIDEO_URL" + +# Bilibili +yt-dlp --skip-download --write-auto-sub --sub-lang "zh-Hans,zh" -o "/tmp/media_sub" "VIDEO_URL" +``` + +Check for subtitles: +```bash +ls /tmp/media_sub*.vtt 2>/dev/null +``` +- **Has subtitles** → Read VTT content, skip to Step 3 +- **No subtitles** → Step 2a (download audio) + +### Step 1b: Xiaoyuzhou (小宇宙) — Extract Audio URL + +```bash +# Extract CDN direct link from __NEXT_DATA__ +# Xiaoyuzhou is a Next.js SPA, but initial HTML contains audio URL in __NEXT_DATA__ +AUDIO_URL=$(curl -sL -H "User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" \ + "EPISODE_URL" \ + | grep -oE 'https://media\.xyzcdn\.net/[^"]+\.(m4a|mp3)' \ + | head -1) + +echo "Audio URL: $AUDIO_URL" + +# Download audio +curl -L -o /tmp/media_audio.mp3 "$AUDIO_URL" +``` + +> If curl extraction is empty (rare), fallback: use Puppeteer/browser to get rendered page and extract. + +→ Step 2b (check size & transcribe) + +### Step 1c: Apple Podcasts — via yt-dlp + +```bash +yt-dlp -f "ba[ext=m4a]/ba/b" --extract-audio --audio-format mp3 --audio-quality 5 \ + -o "/tmp/media_audio.%(ext)s" "APPLE_PODCAST_URL" +``` + +→ Step 2b (check size & transcribe) + +### Step 1d: Bilibili — API Direct Audio Stream + +yt-dlp returns 412 for Bilibili even with cookies. Use Bilibili's API instead: + +```bash +# 1. Extract BV number from URL +BV="BV1xxxxx" # Replace with actual BV number + +# 2. Get video info (title, duration, CID) +curl -s "https://api.bilibili.com/x/web-interface/view?bvid=$BV" \ + -H "User-Agent: Mozilla/5.0" -H "Referer: https://www.bilibili.com/" \ + | python3 -c "import json,sys; d=json.load(sys.stdin)['data']; print(f\"Title: {d['title']}\nDuration: {d['duration']}s\nCID: {d['cid']}\")" + +# 3. Get audio stream URL +CID= +AUDIO_URL=$(curl -s "https://api.bilibili.com/x/player/playurl?bvid=$BV&cid=$CID&fnval=16&qn=64" \ + -H "User-Agent: Mozilla/5.0" -H "Referer: https://www.bilibili.com/" \ + | python3 -c "import json,sys; print(json.load(sys.stdin)['data']['dash']['audio'][0]['baseUrl'])") + +# 4. Download audio (Referer header required, otherwise 403) +curl -L -o /tmp/media_audio.m4s \ + -H "User-Agent: Mozilla/5.0" -H "Referer: https://www.bilibili.com/" "$AUDIO_URL" + +# 5. Convert to mp3 +ffmpeg -y -i /tmp/media_audio.m4s -acodec libmp3lame -q:a 5 /tmp/media_audio.mp3 +``` + +→ Step 2b (check size & transcribe) + +### Step 2a: Video — Download Audio (when no subtitles) + +```bash +# YouTube may need --cookies-from-browser chrome to bypass bot detection +yt-dlp --cookies-from-browser chrome -f "ba[ext=m4a]/ba/b" --extract-audio --audio-format mp3 --audio-quality 5 \ + -o "/tmp/media_audio.%(ext)s" "VIDEO_URL" +``` + +### Step 2b: Check Audio Size & Segment + +```bash +FILE_SIZE=$(stat -f%z /tmp/media_audio.* 2>/dev/null || stat -c%s /tmp/media_audio.* 2>/dev/null) +echo "File size: $FILE_SIZE bytes" +``` + +- **≤ 25MB (25000000)** → Step 2c (transcribe directly) +- **> 25MB** → Split first, then transcribe each segment + +**Splitting large audio (>25MB)**: +```bash +# Get total duration +DURATION=$(ffprobe -v error -show_entries format=duration -of csv=p=0 /tmp/media_audio.* | head -1) + +# Split into 10-minute segments (keeps each under 25MB) +SEGMENT_SEC=600 +SEGMENTS=$(python3 -c "import math; print(math.ceil(float('$DURATION')/$SEGMENT_SEC))") + +# Cut segments +for i in $(seq 0 $((SEGMENTS-1))); do + START=$((i * SEGMENT_SEC)) + ffmpeg -y -i /tmp/media_audio.* -ss $START -t $SEGMENT_SEC -acodec libmp3lame -q:a 5 \ + "/tmp/media_segment_${i}.mp3" 2>/dev/null +done +``` + +→ Call Step 2c for each segment **sequentially** (parallel triggers Groq 524 timeout), concatenate results + +### Step 2c: Whisper Transcription + +**Prerequisite**: `GROQ_API_KEY` environment variable + +```bash +# Check API key +if [ -z "$GROQ_API_KEY" ]; then + echo "❌ GROQ_API_KEY not set. Get one at: https://console.groq.com/keys" + exit 1 +fi + +# Transcribe single file (replace AUDIO_FILE with actual path) +curl -s -X POST "https://api.groq.com/openai/v1/audio/transcriptions" \ + -H "Authorization: Bearer $GROQ_API_KEY" \ + -H "Content-Type: multipart/form-data" \ + -F "file=@AUDIO_FILE" \ + -F "model=whisper-large-v3-turbo" \ + -F "response_format=verbose_json" \ + -F "language=zh" \ + > /tmp/media_transcript.json + +# Extract plain text +python3 -c "import json; print(json.load(open('/tmp/media_transcript.json'))['text'])" +``` + +**Whisper model options**: +| Model | Speed | Accuracy | Use Case | +|-------|-------|----------|----------| +| `whisper-large-v3-turbo` | 10x realtime | High | Default choice | +| `whisper-large-v3` | 5x realtime | Highest | Professional/noisy content | + +**Language parameter**: +- Chinese: `language=zh` +- English: `language=en` +- Auto-detect: omit language parameter + +### Step 3: Structured Summary + +Choose output format based on media type: + +**Video (≤20 min)**: +1. **Overview** (1-2 sentences) +2. **Key Points** (3-5 bullet points) +3. **Notable Quotes** (if any) +4. **Action Items** (if applicable) + +**Podcast (>20 min)**: +1. **Overview** (2-3 sentences: who discussed what) +2. **Chapter Summary** (segmented by topic, 2-3 sentences each) +3. **Key Points** (5-8 bullet points) +4. **Notable Quotes** +5. **Action Items** (if applicable) + +## Output Format + +### Video + +``` +## 📺 Video Digest + +**Title**: [Video Title] +**Duration**: [x minutes] +**Language**: [Chinese/English] + +### Overview +[1-2 sentence summary] + +### Key Points +1. [Point 1] +2. [Point 2] +... + +### Notable Quotes +> "xxx" — [timestamp] + +### Action Items +- [if applicable] +``` + +### Podcast + +``` +## 🎙️ Podcast Digest + +**Show**: [Podcast Name] +**Episode**: [Episode Title] +**Duration**: [x minutes] +**Guests**: [if any] + +### Overview +[2-3 sentences: who discussed what, core conclusions] + +### Chapter Summary +#### 1. [Topic] (~xx:xx-xx:xx) +[2-3 sentences of core content] + +#### 2. [Topic] (~xx:xx-xx:xx) +[2-3 sentences of core content] +... + +### Key Points +1. [Point 1] +2. [Point 2] +... + +### Notable Quotes +> "xxx" + +### Action Items +- [if applicable] +``` + +## Error Handling + +| Situation | Action | +|-----------|--------| +| No subtitles + no GROQ_API_KEY | Prompt user to set API key | +| No subtitles + has API key | Auto Whisper transcription | +| Xiaoyuzhou curl extraction empty | Use Puppeteer/browser to get rendered HTML | +| Audio >25MB | ffmpeg segment (10min/segment), transcribe sequentially | +| Podcast >2 hours | Warn user about duration, confirm before proceeding | +| Groq 524 timeout | Do NOT parallelize — transcribe sequentially, sleep 5-8s between segments | +| Groq 429 rate limit | 7200s/hour limit, wait for retry-after header, then retry | +| yt-dlp Bilibili 412 | Use Bilibili API instead (Step 1d) | +| yt-dlp YouTube bot detection | Add `--cookies-from-browser chrome` | +| Network timeout | Retry once | +| Spotify links | Inform user: not supported (DRM protected) | + +## Groq Whisper Limits + +- Max 25MB per request +- Free tier: 7200 seconds of audio/hour (rolling window), ~20 hours/day +- Supported formats: mp3, mp4, mpeg, mpga, m4a, wav, webm + +## Dependencies + +- `yt-dlp`: video download + subtitle extraction +- `ffmpeg`: audio conversion + segmentation +- `curl`: Xiaoyuzhou audio download, Bilibili API +- `GROQ_API_KEY`: Whisper transcription API (free at https://console.groq.com/keys) diff --git a/x_reader/__init__.py b/x_reader/__init__.py new file mode 100644 index 0000000..8ab0f3e --- /dev/null +++ b/x_reader/__init__.py @@ -0,0 +1,3 @@ +"""x-reader: Universal content reader for 7+ platforms.""" + +__version__ = "0.1.0" diff --git a/x_reader/cli.py b/x_reader/cli.py new file mode 100644 index 0000000..c0b28d4 --- /dev/null +++ b/x_reader/cli.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +""" +x-reader CLI — fetch content from any platform. + +Usage: + x-reader # Fetch a single URL + x-reader ... # Fetch multiple URLs + x-reader list # Show inbox contents + x-reader clear # Clear inbox +""" + +import sys +import asyncio +import json +from pathlib import Path + +from dotenv import load_dotenv +load_dotenv() + +from x_reader.reader import UniversalReader +from x_reader.schema import UnifiedInbox, SourceType + + +def get_inbox_path() -> str: + import os + return os.getenv("INBOX_FILE", "unified_inbox.json") + + +def cmd_fetch(urls: list[str]): + """Fetch one or more URLs.""" + inbox = UnifiedInbox(get_inbox_path()) + reader = UniversalReader(inbox=inbox) + + async def run(): + if len(urls) == 1: + item = await reader.read(urls[0]) + print(f"✅ [{item.source_type.value}] {item.title[:60]}") + print(f" {item.url}") + print(f" {item.content[:200]}...") + else: + items = await reader.read_batch(urls) + for item in items: + print(f"✅ [{item.source_type.value}] {item.title[:60]}") + print(f"\n📦 Fetched {len(items)}/{len(urls)} URLs") + + try: + asyncio.run(run()) + except KeyboardInterrupt: + print("\n⏹ Cancelled") + except Exception as e: + print(f"❌ {e}") + sys.exit(1) + + +def cmd_list(): + """Show inbox contents.""" + inbox = UnifiedInbox(get_inbox_path()) + if not inbox.items: + print("📦 Inbox is empty") + return + + print(f"📦 Inbox: {len(inbox.items)} items\n") + + emoji_map = { + SourceType.TELEGRAM: "📢", SourceType.RSS: "📰", + SourceType.BILIBILI: "🎬", SourceType.XIAOHONGSHU: "📕", + SourceType.TWITTER: "🐦", SourceType.WECHAT: "💬", + SourceType.YOUTUBE: "▶️", SourceType.MANUAL: "✏️", + } + + for i, item in enumerate(inbox.items[-20:], 1): + emoji = emoji_map.get(item.source_type, "📄") + print(f" {i:2d}. {emoji} [{item.source_type.value:8s}] {item.title[:50]}") + + +def cmd_clear(): + """Clear inbox.""" + path = Path(get_inbox_path()) + if path.exists(): + confirm = input("Clear inbox? (y/N) ") + if confirm.lower() == 'y': + path.write_text("[]") + print("✅ Inbox cleared") + else: + print("📦 Inbox is already empty") + + +def cmd_login(platform: str): + """Open browser for manual login to a platform.""" + from x_reader.login import login + login(platform) + + +def main(): + if len(sys.argv) < 2: + print(""" +📖 x-reader — Universal content reader + +Usage: + x-reader Fetch content from any URL + x-reader Fetch multiple URLs + x-reader login Login to a platform (saves session for browser fallback) + x-reader list Show inbox contents + x-reader clear Clear inbox + +Supported platforms: + WeChat, Telegram, X/Twitter, YouTube, + Bilibili, Xiaohongshu, RSS, and any web page + +Examples: + x-reader https://mp.weixin.qq.com/s/abc123 + x-reader https://x.com/elonmusk/status/123456 + x-reader https://www.xiaohongshu.com/explore/abc123 + x-reader login xhs +""") + return + + cmd = sys.argv[1].lower() + + if cmd == "login": + if len(sys.argv) < 3: + print("❌ Usage: x-reader login ") + print(" Supported: xhs, wechat") + sys.exit(1) + cmd_login(sys.argv[2]) + elif cmd == "list": + cmd_list() + elif cmd == "clear": + cmd_clear() + elif cmd.startswith("http") or cmd.startswith("www.") or "." in cmd: + urls = [arg for arg in sys.argv[1:] if arg.startswith(("http", "www.")) or "." in arg] + cmd_fetch(urls) + else: + print(f"❌ Unknown command: {cmd}") + print(" Run 'x-reader' with no args for help") + + +if __name__ == "__main__": + main() diff --git a/x_reader/fetchers/__init__.py b/x_reader/fetchers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/x_reader/fetchers/bilibili.py b/x_reader/fetchers/bilibili.py new file mode 100644 index 0000000..34305ae --- /dev/null +++ b/x_reader/fetchers/bilibili.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +"""Bilibili video fetcher — uses official web API.""" + +import re +import requests +from loguru import logger +from typing import Dict, Any + + +API_URL = "https://api.bilibili.com/x/web-interface/view" +HEADERS = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" +} + + +async def fetch_bilibili(url_or_bv: str) -> Dict[str, Any]: + """Fetch Bilibili video metadata via official API.""" + logger.info(f"Fetching Bilibili: {url_or_bv}") + + bv_id = url_or_bv + if "bilibili.com" in url_or_bv or "b23.tv" in url_or_bv: + match = re.search(r'BV\w+', url_or_bv) + if match: + bv_id = match.group() + else: + raise ValueError(f"Cannot extract BV ID from: {url_or_bv}") + + resp = requests.get(API_URL, params={"bvid": bv_id}, headers=HEADERS, timeout=10) + resp.raise_for_status() + data = resp.json() + + if data.get("code") != 0: + raise ValueError(f"Bilibili API error: {data.get('message')}") + + video = data["data"] + return { + "title": video.get("title", ""), + "description": video.get("desc", ""), + "author": video.get("owner", {}).get("name", ""), + "url": f"https://www.bilibili.com/video/{bv_id}", + "cover": video.get("pic", ""), + "bvid": bv_id, + "duration": video.get("duration", 0), + "view_count": video.get("stat", {}).get("view", 0), + "platform": "bilibili", + } diff --git a/x_reader/fetchers/browser.py b/x_reader/fetchers/browser.py new file mode 100644 index 0000000..10ac347 --- /dev/null +++ b/x_reader/fetchers/browser.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +""" +Playwright browser fetcher — headless Chromium fallback for anti-scraping sites. + +Used when Jina Reader fails (403/451/timeout). Supports persistent login +sessions via Playwright's storage_state for platforms requiring authentication. + +Install: pip install "x-reader[browser]" && playwright install chromium +""" + +from loguru import logger +from pathlib import Path + +SESSION_DIR = Path.home() / ".x-reader" / "sessions" +TIMEOUT_MS = 30_000 + + +async def fetch_via_browser(url: str, storage_state: str = None) -> dict: + """ + Fetch a URL using headless Chromium via Playwright. + + Args: + url: Target URL to fetch. + storage_state: Path to a Playwright storage state JSON file (cookies/localStorage). + If provided, the browser context will load this session. + + Returns: + dict with keys: title, content, url, author + """ + try: + from playwright.async_api import async_playwright + except ImportError: + raise RuntimeError( + "Playwright is not installed. Run:\n" + ' pip install "x-reader[browser]"\n' + " playwright install chromium" + ) + + logger.info(f"Browser fetch: {url}") + + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True) + + context_kwargs = {} + if storage_state and Path(storage_state).exists(): + context_kwargs["storage_state"] = storage_state + logger.info(f"Using session: {storage_state}") + + context = await browser.new_context( + user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36", + **context_kwargs, + ) + page = await context.new_page() + + try: + await page.goto(url, wait_until="domcontentloaded", timeout=TIMEOUT_MS) + # Extra wait for JS-heavy pages + await page.wait_for_timeout(2000) + + title = await page.title() + # Extract main text content, stripping scripts/styles + content = await page.evaluate("""() => { + const el = document.querySelector('article') + || document.querySelector('main') + || document.querySelector('.content') + || document.body; + return el ? el.innerText : ''; + }""") + + result = { + "title": (title or "").strip()[:200], + "content": (content or "").strip(), + "url": url, + "author": "", + } + logger.info(f"Browser fetch OK: {title[:60]}") + return result + + finally: + await context.close() + await browser.close() + + +def get_session_path(platform: str) -> str: + """Get the session file path for a platform.""" + return str(SESSION_DIR / f"{platform}.json") diff --git a/x_reader/fetchers/jina.py b/x_reader/fetchers/jina.py new file mode 100644 index 0000000..3ee6882 --- /dev/null +++ b/x_reader/fetchers/jina.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +""" +Jina Reader — universal fallback for content extraction. + +Uses https://r.jina.ai/{url} to extract markdown from any web page. +Free, no API key required, handles JS rendering and anti-scraping. +""" + +import requests +from loguru import logger + + +JINA_BASE = "https://r.jina.ai" +TIMEOUT = 30 + +HEADERS = { + "Accept": "text/markdown", + "User-Agent": "x-reader/0.1", +} + + +def fetch_via_jina(url: str) -> dict: + """ + Fetch any URL via Jina Reader and return structured data. + + Returns: + dict with keys: title, content, url, author (best-effort) + """ + jina_url = f"{JINA_BASE}/{url}" + logger.info(f"Jina fetch: {url}") + + try: + resp = requests.get(jina_url, headers=HEADERS, timeout=TIMEOUT) + resp.raise_for_status() + text = resp.text + + # Jina returns markdown; first line is usually the title + lines = text.strip().split("\n") + title = "" + content_lines = [] + + for line in lines: + if not title and line.strip(): + # First non-empty line as title, strip markdown heading + title = line.lstrip("#").strip() + else: + content_lines.append(line) + + content = "\n".join(content_lines).strip() + + return { + "title": title[:200], + "content": content, + "url": url, + "author": "", + } + + except requests.Timeout: + logger.error(f"Jina timeout: {url}") + raise + except requests.RequestException as e: + logger.error(f"Jina fetch failed: {url} — {e}") + raise diff --git a/x_reader/fetchers/rss.py b/x_reader/fetchers/rss.py new file mode 100644 index 0000000..666155d --- /dev/null +++ b/x_reader/fetchers/rss.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +"""RSS feed fetcher — uses feedparser.""" + +import feedparser +from loguru import logger +from typing import Dict, Any, List + + +async def fetch_rss(url: str, limit: int = 20) -> List[Dict[str, Any]]: + """ + Fetch and parse an RSS/Atom feed. + + Args: + url: RSS feed URL + limit: Max number of entries to return + + Returns: + List of article dicts with: title, summary, url, source, published + """ + logger.info(f"Fetching RSS: {url}") + + feed = feedparser.parse(url) + + if feed.bozo and not feed.entries: + raise ValueError(f"Failed to parse RSS feed: {feed.bozo_exception}") + + source_name = feed.feed.get("title", url) + articles = [] + + for entry in feed.entries[:limit]: + summary = "" + if hasattr(entry, "summary"): + summary = entry.summary + elif hasattr(entry, "content"): + summary = entry.content[0].get("value", "") + + articles.append({ + "title": entry.get("title", ""), + "summary": summary, + "url": entry.get("link", ""), + "source": source_name, + "published": entry.get("published", ""), + "platform": "rss", + }) + + logger.info(f"RSS: {len(articles)} articles from {source_name}") + return articles diff --git a/x_reader/fetchers/telegram.py b/x_reader/fetchers/telegram.py new file mode 100644 index 0000000..803339d --- /dev/null +++ b/x_reader/fetchers/telegram.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +""" +Telegram channel fetcher — uses Telethon. + +Requires: pip install x-reader[telegram] +Requires: TG_API_ID + TG_API_HASH in .env +""" + +import os +from datetime import datetime, timedelta, timezone +from loguru import logger +from typing import Dict, Any, List + + +async def fetch_telegram( + channel: str, + limit: int = 20, + hours: int = 24, + session_path: str = None, +) -> List[Dict[str, Any]]: + """ + Fetch recent messages from a Telegram channel. + + Args: + channel: Channel username (e.g. 'predictionmkt') + limit: Max messages per channel + hours: Only fetch messages from the last N hours + session_path: Path to Telethon session file + + Returns: + List of message dicts + """ + try: + from telethon import TelegramClient + from telethon.tl.types import Message + except ImportError: + raise ImportError( + "Telethon is required for Telegram fetching. " + "Install with: pip install x-reader[telegram]" + ) + + api_id = os.getenv("TG_API_ID", "") + api_hash = os.getenv("TG_API_HASH", "") + + if not api_id or not api_hash: + raise ValueError("TG_API_ID and TG_API_HASH must be set in .env") + + session = session_path or os.getenv("TG_SESSION_PATH", "./tg_session") + cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) + + messages = [] + async with TelegramClient(session, int(api_id), api_hash) as client: + logger.info(f"Fetching TG channel: {channel}") + entity = await client.get_entity(channel) + + async for msg in client.iter_messages(entity, limit=limit): + if not isinstance(msg, Message) or not msg.text: + continue + if msg.date < cutoff: + break + + messages.append({ + "text": msg.text, + "views": msg.views or 0, + "date": msg.date.isoformat(), + "url": f"https://t.me/{channel}/{msg.id}", + "platform": "telegram", + }) + + logger.info(f"TG {channel}: {len(messages)} messages") + return messages diff --git a/x_reader/fetchers/twitter.py b/x_reader/fetchers/twitter.py new file mode 100644 index 0000000..362881b --- /dev/null +++ b/x_reader/fetchers/twitter.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +""" +X/Twitter fetcher — three-tier fallback: + +1. X oEmbed API (fast, reliable for individual tweets, no login needed) +2. Jina Reader (handles non-tweet X pages like profiles) +3. Playwright + saved session (handles login-required content) + +Install browser tier: pip install "x-reader[browser]" && playwright install chromium +Save X session: x-reader login twitter +""" + +import re +import requests +from loguru import logger +from typing import Dict, Any + +from x_reader.fetchers.jina import fetch_via_jina + + +OEMBED_URL = "https://publish.twitter.com/oembed" + + +def _extract_author(url: str) -> str: + """Extract @username from tweet URL.""" + match = re.search(r'x\.com/(\w+)/status', url) + return f"@{match.group(1)}" if match else "" + + +def _is_tweet_url(url: str) -> bool: + """Check if this is a direct tweet/status URL (vs profile or other X page).""" + return bool(re.search(r'x\.com/\w+/status/\d+', url)) + + +def _fetch_via_oembed(url: str) -> Dict[str, Any]: + """ + Fetch tweet text via X's oEmbed API. + Free, reliable, no auth needed. Works for public tweets. + Note: oEmbed requires twitter.com URLs (not x.com). + """ + # oEmbed API requires twitter.com format + oembed_query_url = url.replace("x.com", "twitter.com") + resp = requests.get( + OEMBED_URL, + params={"url": oembed_query_url, "omit_script": "true"}, + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + + # Strip HTML tags from the embedded HTML to get clean text + html = data.get("html", "") + text = re.sub(r'<[^>]+>', ' ', html) + text = re.sub(r'\s+', ' ', text).strip() + + return { + "text": text, + "author": data.get("author_name", ""), + "author_url": data.get("author_url", ""), + "title": text[:100] if text else "", + } + + +async def _fetch_via_playwright(url: str) -> Dict[str, Any]: + """ + Fetch tweet via Playwright with X-specific DOM selectors. + Uses saved login session if available (~/.x-reader/sessions/twitter.json). + """ + try: + from playwright.async_api import async_playwright + except ImportError: + raise RuntimeError( + "Playwright not installed. Run:\n" + ' pip install "x-reader[browser]"\n' + " playwright install chromium" + ) + + from x_reader.fetchers.browser import get_session_path + from pathlib import Path + + session_path = get_session_path("twitter") + has_session = Path(session_path).exists() + if has_session: + logger.info(f"Using saved X session: {session_path}") + + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True) + + context_kwargs = {} + if has_session: + context_kwargs["storage_state"] = session_path + + context = await browser.new_context( + user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36", + **context_kwargs, + ) + page = await context.new_page() + + try: + await page.goto(url, wait_until="domcontentloaded", timeout=30_000) + + # Wait for tweet text to render (X is a SPA, needs JS execution) + try: + await page.wait_for_selector( + '[data-testid="tweetText"]', timeout=10_000 + ) + except Exception: + pass # May not appear if login required + + # Extract tweet content with X-specific selectors + tweet_text = await page.evaluate("""() => { + // Priority 1: tweet text element + const tweetEl = document.querySelector('[data-testid="tweetText"]'); + if (tweetEl) return tweetEl.innerText; + + // Priority 2: article element (thread view) + const article = document.querySelector('article'); + if (article) return article.innerText; + + // Priority 3: main content area + const main = document.querySelector('main'); + if (main) return main.innerText; + + return ''; + }""") + + title = await page.title() + + return { + "text": (tweet_text or "").strip(), + "title": (title or "").strip()[:200], + } + finally: + await context.close() + await browser.close() + + +async def fetch_twitter(url: str) -> Dict[str, Any]: + """ + Fetch a tweet or X post with three-tier fallback. + + Args: + url: Tweet URL (x.com or twitter.com) + + Returns: + Dict with: text, author, url, title, platform + """ + url = url.replace("twitter.com", "x.com") + author = _extract_author(url) + + # Tier 1: oEmbed API (best for individual tweets) + if _is_tweet_url(url): + try: + logger.info(f"[Twitter] Tier 1 — oEmbed: {url}") + data = _fetch_via_oembed(url) + if data.get("text") and len(data["text"].strip()) > 20: + return { + "text": data["text"], + "author": author or data.get("author", ""), + "url": url, + "title": data.get("title", ""), + "platform": "twitter", + } + logger.warning("[Twitter] oEmbed returned thin content") + except Exception as e: + logger.warning(f"[Twitter] oEmbed failed ({e})") + + # Tier 2: Jina Reader (handles profiles, threads, non-tweet pages) + try: + logger.info(f"[Twitter] Tier 2 — Jina: {url}") + data = fetch_via_jina(url) + content = data.get("content", "") + title = data.get("title", "") + jina_ok = ( + content + and len(content.strip()) > 100 + and "not yet fully loaded" not in content.lower() + and title.lower() not in ("x", "title: x", "") + ) + if jina_ok: + return { + "text": content, + "author": author, + "url": url, + "title": title, + "platform": "twitter", + } + logger.warning("[Twitter] Jina returned unusable content") + except Exception as e: + logger.warning(f"[Twitter] Jina failed ({e})") + + # Tier 3: Playwright + session with X-specific extraction + try: + logger.info(f"[Twitter] Tier 3 — Playwright: {url}") + data = await _fetch_via_playwright(url) + content = data.get("text", "") + if content and len(content.strip()) > 20: + return { + "text": content, + "author": author, + "url": url, + "title": data.get("title", ""), + "platform": "twitter", + } + logger.warning("[Twitter] Playwright returned empty content") + except RuntimeError: + raise + except Exception as e: + logger.error(f"[Twitter] All methods failed: {e}") + + raise RuntimeError( + f"❌ All Twitter fetch methods failed for: {url}\n" + f" Try: x-reader login twitter (to save session for browser fallback)\n" + f" Then retry: x-reader {url}" + ) diff --git a/x_reader/fetchers/wechat.py b/x_reader/fetchers/wechat.py new file mode 100644 index 0000000..9e23798 --- /dev/null +++ b/x_reader/fetchers/wechat.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +""" +WeChat article fetcher — two-tier fallback: + +1. Jina Reader (fast, no deps) +2. Playwright headless (no login needed for public articles) +""" + +from loguru import logger +from typing import Dict, Any + + +async def fetch_wechat(url: str) -> Dict[str, Any]: + """ + Fetch a WeChat public account article with fallback. + + Args: + url: mp.weixin.qq.com article URL + + Returns: + Dict with: title, content, author, url, platform + """ + # Tier 1: Jina Reader + try: + logger.info(f"[WeChat] Tier 1 — Jina: {url}") + from x_reader.fetchers.jina import fetch_via_jina + + data = fetch_via_jina(url) + if data.get("content"): + return { + "title": data["title"], + "content": data["content"], + "author": data.get("author", ""), + "url": url, + "platform": "wechat", + } + logger.warning("[WeChat] Jina returned empty content, falling back to browser") + except Exception as e: + logger.warning(f"[WeChat] Jina failed ({e}), falling back to browser") + + # Tier 2: Playwright headless (no session needed) + try: + logger.info(f"[WeChat] Tier 2 — Playwright headless: {url}") + from x_reader.fetchers.browser import fetch_via_browser + + data = await fetch_via_browser(url) + return { + "title": data["title"], + "content": data["content"], + "author": data.get("author", ""), + "url": url, + "platform": "wechat", + } + except RuntimeError: + # Playwright not installed — re-raise with original Jina error context + raise + except Exception as e: + logger.error(f"[WeChat] Browser fetch also failed: {e}") + raise RuntimeError( + f"❌ All WeChat fetch methods failed.\n" + f" Last error: {e}" + ) diff --git a/x_reader/fetchers/xhs.py b/x_reader/fetchers/xhs.py new file mode 100644 index 0000000..215a679 --- /dev/null +++ b/x_reader/fetchers/xhs.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +""" +Xiaohongshu (RED) note fetcher — three-tier fallback: + +1. Jina Reader (fast, no deps) +2. Playwright + saved session (handles 451/403) +3. Error with login instructions + +Install browser tier: pip install "x-reader[browser]" && playwright install chromium +""" + +from loguru import logger +from typing import Dict, Any +from pathlib import Path + +from x_reader.fetchers.jina import fetch_via_jina + + +async def fetch_xhs(url: str) -> Dict[str, Any]: + """ + Fetch a Xiaohongshu note with three-tier fallback. + + Args: + url: xiaohongshu.com or xhslink.com URL + + Returns: + Dict with: title, content, author, url, platform + """ + # Tier 1: Jina Reader + try: + logger.info(f"[XHS] Tier 1 — Jina: {url}") + data = fetch_via_jina(url) + if data.get("content"): + return { + "title": data["title"], + "content": data["content"], + "author": data.get("author", ""), + "url": url, + "platform": "xhs", + } + logger.warning("[XHS] Jina returned empty content, falling back to browser") + except Exception as e: + logger.warning(f"[XHS] Jina failed ({e}), falling back to browser") + + # Tier 2: Playwright with session + from x_reader.fetchers.browser import get_session_path, SESSION_DIR + + session_path = get_session_path("xhs") + if not Path(session_path).exists(): + # Tier 3: No session — guide user + raise RuntimeError( + f"❌ XHS blocked Jina and no saved session found.\n" + f" Run: x-reader login xhs\n" + f" Then retry this URL." + ) + + try: + logger.info(f"[XHS] Tier 2 — Playwright with session: {url}") + from x_reader.fetchers.browser import fetch_via_browser + + data = await fetch_via_browser(url, storage_state=session_path) + return { + "title": data["title"], + "content": data["content"], + "author": data.get("author", ""), + "url": url, + "platform": "xhs", + } + except RuntimeError: + # Playwright not installed + raise + except Exception as e: + logger.error(f"[XHS] Browser fetch also failed: {e}") + raise RuntimeError( + f"❌ All XHS fetch methods failed.\n" + f" Last error: {e}\n" + f" Try: x-reader login xhs (to refresh session)" + ) diff --git a/x_reader/fetchers/youtube.py b/x_reader/fetchers/youtube.py new file mode 100644 index 0000000..1282760 --- /dev/null +++ b/x_reader/fetchers/youtube.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +""" +YouTube video fetcher — three-tier content extraction: + +1. yt-dlp auto-subtitles (fastest, best quality for subtitled videos) +2. yt-dlp audio download → Groq Whisper API transcription (for non-subtitled videos) +3. Jina Reader fallback (page description only) + +Requires: yt-dlp installed (brew install yt-dlp / pip install yt-dlp) +Optional: GROQ_API_KEY env var for Whisper transcription +""" + +import re +import os +import subprocess +import tempfile +from loguru import logger +from typing import Dict, Any + +from x_reader.fetchers.jina import fetch_via_jina + + +def _extract_video_id(url: str) -> str: + """Extract video ID from YouTube URL.""" + match = re.search(r'(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})', url) + return match.group(1) if match else "" + + +def _get_subtitles_via_ytdlp(url: str, lang: str = "en") -> str: + """ + Download auto-generated subtitles using yt-dlp. + Returns subtitle text, or empty string if unavailable. + """ + with tempfile.TemporaryDirectory() as tmpdir: + output_path = os.path.join(tmpdir, "sub") + + cmd = [ + "yt-dlp", + "--write-auto-sub", + "--write-sub", + "--sub-lang", lang, + "--sub-format", "srt", + "--skip-download", + "-o", output_path, + url, + ] + + try: + subprocess.run(cmd, capture_output=True, text=True, timeout=60) + except FileNotFoundError: + logger.warning("yt-dlp not found. Install with: brew install yt-dlp") + return "" + except subprocess.TimeoutExpired: + logger.warning("yt-dlp subtitle download timed out") + return "" + + for ext in [f".{lang}.srt", f".{lang}.vtt"]: + sub_file = output_path + ext + if os.path.exists(sub_file): + return _parse_srt(sub_file) + + return "" + + +def _parse_srt(filepath: str) -> str: + """Parse SRT file into clean text (strip timestamps and sequence numbers).""" + with open(filepath, 'r', encoding='utf-8') as f: + lines = f.readlines() + + text_lines = [] + seen = set() + + for line in lines: + line = line.strip() + if not line or line.isdigit() or '-->' in line: + continue + if line.startswith('[') and line.endswith(']'): + continue + if line not in seen: + seen.add(line) + text_lines.append(line) + + return " ".join(text_lines) + + +def _transcribe_via_whisper(url: str) -> str: + """ + Download audio with yt-dlp and transcribe via Groq Whisper API. + + Requires: GROQ_API_KEY env var + yt-dlp + ffmpeg installed. + Groq Whisper limit: 25MB audio file. + Returns transcript text, or empty string if unavailable. + """ + api_key = os.getenv("GROQ_API_KEY") + if not api_key: + logger.info("GROQ_API_KEY not set, skipping Whisper transcription") + return "" + + with tempfile.TemporaryDirectory() as tmpdir: + output_template = os.path.join(tmpdir, "audio.%(ext)s") + + cmd = [ + "yt-dlp", + "-x", + "--audio-format", "m4a", + "--audio-quality", "5", + "-o", output_template, + "--no-playlist", + url, + ] + + try: + subprocess.run(cmd, capture_output=True, text=True, timeout=180) + except FileNotFoundError: + logger.warning("yt-dlp not found for audio download") + return "" + except subprocess.TimeoutExpired: + logger.warning("yt-dlp audio download timed out") + return "" + + # Find the downloaded audio file + audio_path = os.path.join(tmpdir, "audio.m4a") + if not os.path.exists(audio_path): + for f in os.listdir(tmpdir): + if f.startswith("audio."): + audio_path = os.path.join(tmpdir, f) + break + else: + logger.warning("No audio file downloaded") + return "" + + file_size = os.path.getsize(audio_path) + if file_size > 25 * 1024 * 1024: + logger.warning(f"Audio file too large ({file_size // 1024 // 1024}MB > 25MB limit)") + return "" + + logger.info(f"Transcribing {file_size // 1024}KB audio via Groq Whisper...") + + import requests + try: + with open(audio_path, "rb") as f: + response = requests.post( + "https://api.groq.com/openai/v1/audio/transcriptions", + headers={"Authorization": f"Bearer {api_key}"}, + files={"file": (os.path.basename(audio_path), f, "audio/mp4")}, + data={"model": "whisper-large-v3", "response_format": "text"}, + timeout=120, + ) + + if response.status_code == 200: + transcript = response.text.strip() + logger.info(f"Whisper transcript: {len(transcript)} chars") + return transcript + else: + logger.warning(f"Groq Whisper API error: {response.status_code} {response.text[:200]}") + return "" + except Exception as e: + logger.warning(f"Whisper transcription failed: {e}") + return "" + + +async def fetch_youtube(url: str, sub_lang: str = "en") -> Dict[str, Any]: + """ + Fetch YouTube video content with three-tier extraction. + + Strategy: + 1. yt-dlp auto-subtitles (full transcript, fastest) + 2. yt-dlp audio + Groq Whisper API (for non-subtitled videos) + 3. Jina Reader fallback (page description only) + + Args: + url: YouTube video URL + sub_lang: Subtitle language code (default: "en") + + Returns: + Dict with: title, description, author, url, video_id, has_transcript, platform + """ + logger.info(f"Fetching YouTube: {url}") + video_id = _extract_video_id(url) + + # Step 1: Get metadata via Jina (fast, always works) + jina_data = fetch_via_jina(url) + title = jina_data["title"] + + # Step 2: Try yt-dlp auto-subtitles + logger.info(f"Extracting subtitles ({sub_lang})...") + transcript = _get_subtitles_via_ytdlp(url, lang=sub_lang) + + # Step 3: No subtitles? Try Whisper transcription + if not transcript: + logger.info("No subtitles available, trying Whisper transcription...") + transcript = _transcribe_via_whisper(url) + + if transcript: + logger.info(f"Got transcript: {len(transcript)} chars") + content = transcript + has_transcript = True + else: + logger.info("No transcript available, using page description") + content = jina_data["content"] + has_transcript = False + + return { + "title": title, + "description": content, + "author": jina_data.get("author", ""), + "url": url, + "video_id": video_id, + "has_transcript": has_transcript, + "platform": "youtube", + } diff --git a/x_reader/login.py b/x_reader/login.py new file mode 100644 index 0000000..1d1d877 --- /dev/null +++ b/x_reader/login.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +""" +Login manager — opens a visible browser for manual login, saves session. + +Usage: + x-reader login xhs # Login to Xiaohongshu + x-reader login wechat # Login to WeChat (if needed) + +Sessions are saved as Playwright storage_state JSON files. +""" + +from pathlib import Path +from loguru import logger + +SESSION_DIR = Path.home() / ".x-reader" / "sessions" + +PLATFORM_URLS = { + "xhs": "https://www.xiaohongshu.com/explore", + "xiaohongshu": "https://www.xiaohongshu.com/explore", + "wechat": "https://mp.weixin.qq.com", + "twitter": "https://x.com/login", + "x": "https://x.com/login", +} + + +def login(platform: str) -> None: + """ + Open a visible browser for the user to log in manually. + After login, saves cookies/localStorage to a session file. + + Args: + platform: Platform key (e.g. 'xhs', 'wechat') + """ + try: + from playwright.sync_api import sync_playwright + except ImportError: + print( + "❌ Playwright is not installed. Run:\n" + ' pip install "x-reader[browser]"\n' + " playwright install chromium" + ) + return + + platform = platform.lower() + login_url = PLATFORM_URLS.get(platform) + if not login_url: + supported = ", ".join(sorted(PLATFORM_URLS.keys())) + print(f"❌ Unknown platform: {platform}") + print(f" Supported: {supported}") + return + + SESSION_DIR.mkdir(parents=True, exist_ok=True) + session_path = SESSION_DIR / f"{platform}.json" + # Normalize alias to canonical name + if platform in ("xhs", "xiaohongshu"): + canonical = "xhs" + elif platform in ("twitter", "x"): + canonical = "twitter" + else: + canonical = platform + session_path = SESSION_DIR / f"{canonical}.json" + + print(f"🌐 Opening {platform} login page: {login_url}") + print(" Please log in manually in the browser window.") + print(" When done, close the browser or press Ctrl+C.\n") + + with sync_playwright() as p: + browser = p.chromium.launch(headless=False) + context = browser.new_context( + user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36", + ) + page = context.new_page() + page.goto(login_url) + + try: + # Wait for user to log in — blocks until browser is closed + page.wait_for_event("close", timeout=300_000) # 5 min max + except KeyboardInterrupt: + pass + except Exception: + pass # Browser closed by user + + # Save session regardless of how we got here + context.storage_state(path=str(session_path)) + logger.info(f"Session saved: {session_path}") + print(f"\n✅ Session saved to {session_path}") + + context.close() + browser.close() diff --git a/x_reader/reader.py b/x_reader/reader.py new file mode 100644 index 0000000..150f3df --- /dev/null +++ b/x_reader/reader.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +""" +Universal Reader — routes any URL to the right fetcher. + +The core dispatcher: give it a URL, get back structured content. +""" + +import asyncio +from urllib.parse import urlparse +from loguru import logger +from typing import Dict, Any, Optional + +from x_reader.schema import ( + UnifiedContent, UnifiedInbox, SourceType, + from_bilibili, from_twitter, from_wechat, + from_xiaohongshu, from_youtube, from_rss, from_telegram, +) +from x_reader.fetchers.jina import fetch_via_jina + + +class UniversalReader: + """ + Routes URLs to platform-specific fetchers. + Falls back to Jina Reader for unknown platforms. + """ + + def __init__(self, inbox: Optional[UnifiedInbox] = None): + self.inbox = inbox + + def _detect_platform(self, url: str) -> str: + """Detect platform from URL.""" + domain = urlparse(url).netloc.lower() + + if "mp.weixin.qq.com" in domain: + return "wechat" + if "x.com" in domain or "twitter.com" in domain: + return "twitter" + if "youtube.com" in domain or "youtu.be" in domain: + return "youtube" + if "xiaohongshu.com" in domain or "xhslink.com" in domain: + return "xhs" + if "bilibili.com" in domain or "b23.tv" in domain: + return "bilibili" + if "xiaoyuzhoufm.com" in domain: + return "podcast" + if "podcasts.apple.com" in domain: + return "podcast" + if "t.me" in domain or "telegram.org" in domain: + return "telegram" + if url.endswith(".xml") or "/rss" in url or "/feed" in url or "/atom" in url: + return "rss" + return "generic" + + async def read(self, url: str) -> UnifiedContent: + """ + Fetch content from any URL and return as UnifiedContent. + + The main entry point — give it a URL, get back structured content. + """ + # Ensure URL has scheme + if not url.startswith(("http://", "https://")): + url = f"https://{url}" + + platform = self._detect_platform(url) + logger.info(f"[{platform}] {url[:60]}...") + + try: + content = await self._fetch(platform, url) + + # Save to inbox if configured + if self.inbox: + if self.inbox.add(content): + self.inbox.save() + logger.info(f"Saved to inbox: {content.title[:50]}") + + # Save to markdown output if configured + from x_reader.utils.storage import save_to_markdown + save_to_markdown(content) + + return content + + except Exception as e: + logger.error(f"[{platform}] Failed: {e}") + raise + + async def _fetch(self, platform: str, url: str) -> UnifiedContent: + """Dispatch to platform-specific fetcher.""" + + if platform == "bilibili": + from x_reader.fetchers.bilibili import fetch_bilibili + data = await fetch_bilibili(url) + return from_bilibili(data) + + if platform == "twitter": + from x_reader.fetchers.twitter import fetch_twitter + data = await fetch_twitter(url) + return from_twitter(data) + + if platform == "wechat": + from x_reader.fetchers.wechat import fetch_wechat + data = await fetch_wechat(url) + return from_wechat(data) + + if platform == "xhs": + from x_reader.fetchers.xhs import fetch_xhs + data = await fetch_xhs(url) + return from_xiaohongshu(data) + + if platform == "youtube": + from x_reader.fetchers.youtube import fetch_youtube + data = await fetch_youtube(url) + return from_youtube(data) + + if platform == "rss": + from x_reader.fetchers.rss import fetch_rss + articles = await fetch_rss(url, limit=1) + if articles: + return from_rss(articles[0]) + raise ValueError(f"No articles found in RSS feed: {url}") + + if platform == "telegram": + from x_reader.fetchers.telegram import fetch_telegram + # Extract channel username from t.me URL + path = urlparse(url).path.strip("/").split("/")[0] + channel = path if path else url + messages = await fetch_telegram(channel, limit=1) + if messages: + return from_telegram(messages[0], channel, channel) + raise ValueError(f"No messages from Telegram channel: {url}") + + # Fallback: Jina Reader for any unknown URL + logger.info(f"Using Jina fallback for: {url}") + data = fetch_via_jina(url) + return UnifiedContent( + source_type=SourceType.MANUAL, + source_name=urlparse(url).netloc, + title=data["title"], + content=data["content"], + url=url, + ) + + async def read_batch(self, urls: list[str]) -> list[UnifiedContent]: + """Fetch multiple URLs concurrently.""" + tasks = [self.read(url) for url in urls] + results = await asyncio.gather(*tasks, return_exceptions=True) + + contents = [] + for url, result in zip(urls, results): + if isinstance(result, Exception): + logger.error(f"Batch failed for {url}: {result}") + else: + contents.append(result) + + return contents diff --git a/x_reader/schema.py b/x_reader/schema.py new file mode 100644 index 0000000..4885d18 --- /dev/null +++ b/x_reader/schema.py @@ -0,0 +1,277 @@ +# -*- coding: utf-8 -*- +""" +Unified content schema for x-reader. + +Defines the standard data format for all content sources: +- Telegram channels +- RSS feeds +- Bilibili videos +- Xiaohongshu (RED) notes +- WeChat articles +- X/Twitter posts +- YouTube videos +- Manual input +""" + +from dataclasses import dataclass, field, asdict +from datetime import datetime, timedelta +from typing import Optional, List +from enum import Enum +import hashlib +import json + + +class SourceType(str, Enum): + """Content source types.""" + TELEGRAM = "telegram" + RSS = "rss" + BILIBILI = "bilibili" + XIAOHONGSHU = "xhs" + TWITTER = "twitter" + WECHAT = "wechat" + YOUTUBE = "youtube" + MANUAL = "manual" + + +class MediaType(str, Enum): + """Media types.""" + TEXT = "text" + VIDEO = "video" + AUDIO = "audio" + IMAGE = "image" + + +class Priority(str, Enum): + """Content priority levels.""" + HOT = "hot" + QUALITY = "quality" + DEEP = "deep" + NORMAL = "normal" + LOW = "low" + + +@dataclass +class UnifiedContent: + """Unified content format across all platforms.""" + + # === Required === + source_type: SourceType + source_name: str + title: str + content: str + url: str + + # === Auto-generated === + id: str = "" + fetched_at: str = "" + + # === Media === + media_type: MediaType = MediaType.TEXT + media_url: Optional[str] = None + + # === Scoring === + score: int = 0 + priority: Priority = Priority.NORMAL + category: str = "" + tags: List[str] = field(default_factory=list) + + # === Processing state === + processed: bool = False + digest_date: Optional[str] = None + + # === Translation === + title_cn: Optional[str] = None + content_cn: Optional[str] = None + + # === Metadata === + extra: dict = field(default_factory=dict) + + def __post_init__(self): + if not self.id: + self.id = hashlib.md5(self.url.encode()).hexdigest()[:12] + if not self.fetched_at: + self.fetched_at = datetime.now().isoformat() + + def to_dict(self) -> dict: + d = asdict(self) + d['source_type'] = self.source_type.value + d['media_type'] = self.media_type.value + d['priority'] = self.priority.value + return d + + @classmethod + def from_dict(cls, data: dict) -> 'UnifiedContent': + if isinstance(data.get('source_type'), str): + data['source_type'] = SourceType(data['source_type']) + if isinstance(data.get('media_type'), str): + data['media_type'] = MediaType(data['media_type']) + if isinstance(data.get('priority'), str): + data['priority'] = Priority(data['priority']) + known = {f.name for f in cls.__dataclass_fields__.values()} + data = {k: v for k, v in data.items() if k in known} + return cls(**data) + + +# ============================================================================= +# Converters: platform-specific dict → UnifiedContent +# ============================================================================= + +def from_telegram(msg: dict, channel_name: str, channel_username: str) -> UnifiedContent: + return UnifiedContent( + source_type=SourceType.TELEGRAM, + source_name=channel_name, + title=msg.get('text', '')[:100], + content=msg.get('text', ''), + url=msg.get('url', f"https://t.me/{channel_username}"), + extra={"views": msg.get('views', 0), "channel_username": channel_username}, + ) + + +def from_rss(article: dict) -> UnifiedContent: + return UnifiedContent( + source_type=SourceType.RSS, + source_name=article.get('source', ''), + title=article.get('title', ''), + content=article.get('summary', ''), + url=article.get('url', article.get('link', '')), + score=article.get('score', 0), + category=article.get('category', ''), + title_cn=article.get('title_cn'), + content_cn=article.get('summary_cn'), + ) + + +def from_bilibili(video: dict) -> UnifiedContent: + return UnifiedContent( + source_type=SourceType.BILIBILI, + source_name=video.get('author', ''), + title=video.get('title', ''), + content=video.get('description', ''), + url=video.get('url', ''), + media_type=MediaType.VIDEO, + media_url=video.get('cover', ''), + extra={ + "bvid": video.get('bvid', ''), + "duration": video.get('duration', 0), + "view_count": video.get('view_count', 0), + }, + ) + + +def from_twitter(data: dict) -> UnifiedContent: + return UnifiedContent( + source_type=SourceType.TWITTER, + source_name=data.get('author', ''), + title=data.get('text', '')[:100], + content=data.get('text', ''), + url=data.get('url', ''), + extra={ + "likes": data.get('likes', 0), + "retweets": data.get('retweets', 0), + }, + ) + + +def from_wechat(article: dict) -> UnifiedContent: + return UnifiedContent( + source_type=SourceType.WECHAT, + source_name=article.get('author', ''), + title=article.get('title', ''), + content=article.get('content', ''), + url=article.get('url', ''), + ) + + +def from_xiaohongshu(note: dict) -> UnifiedContent: + return UnifiedContent( + source_type=SourceType.XIAOHONGSHU, + source_name=note.get('author', ''), + title=note.get('title', ''), + content=note.get('content', ''), + url=note.get('url', ''), + media_type=MediaType.IMAGE if note.get('images') else MediaType.TEXT, + extra={ + "likes": note.get('likes', 0), + "collects": note.get('collects', 0), + }, + ) + + +def from_youtube(video: dict) -> UnifiedContent: + return UnifiedContent( + source_type=SourceType.YOUTUBE, + source_name=video.get('author', ''), + title=video.get('title', ''), + content=video.get('description', ''), + url=video.get('url', ''), + media_type=MediaType.VIDEO, + extra={ + "duration": video.get('duration', ''), + "view_count": video.get('view_count', 0), + }, + ) + + +def from_manual(title: str, content: str, url: str = "") -> UnifiedContent: + return UnifiedContent( + source_type=SourceType.MANUAL, + source_name="manual", + title=title, + content=content, + url=url or f"manual://{hashlib.md5(title.encode()).hexdigest()[:8]}", + ) + + +# ============================================================================= +# Unified Inbox +# ============================================================================= + +class UnifiedInbox: + """JSON-based content inbox with dedup.""" + + def __init__(self, filepath: str = "unified_inbox.json"): + self.filepath = filepath + self.items: List[UnifiedContent] = [] + self.load() + + def load(self): + import os + if os.path.exists(self.filepath): + try: + with open(self.filepath, 'r', encoding='utf-8') as f: + data = json.load(f) + self.items = [UnifiedContent.from_dict(d) for d in data] + except (json.JSONDecodeError, IOError): + self.items = [] + + def save(self): + with open(self.filepath, 'w', encoding='utf-8') as f: + json.dump([item.to_dict() for item in self.items], f, + ensure_ascii=False, indent=2) + + def add(self, item: UnifiedContent) -> bool: + if any(i.id == item.id for i in self.items): + return False + self.items.append(item) + return True + + def add_batch(self, items: List[UnifiedContent]) -> int: + return sum(1 for item in items if self.add(item)) + + def get_unprocessed(self) -> List[UnifiedContent]: + return [i for i in self.items if not i.processed] + + def get_by_source(self, source_type: SourceType) -> List[UnifiedContent]: + return [i for i in self.items if i.source_type == source_type] + + def mark_processed(self, item_id: str, digest_date: str = None): + for item in self.items: + if item.id == item_id: + item.processed = True + if digest_date: + item.digest_date = digest_date + break + + def clear_old(self, days: int = 7): + cutoff = (datetime.now() - timedelta(days=days)).isoformat() + self.items = [i for i in self.items if i.fetched_at > cutoff] diff --git a/x_reader/utils/__init__.py b/x_reader/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/x_reader/utils/storage.py b/x_reader/utils/storage.py new file mode 100644 index 0000000..9de0bf9 --- /dev/null +++ b/x_reader/utils/storage.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +""" +Storage utilities — save content to JSON inbox and optional Markdown file. + +Implements the "atomic archiving" from the tweet: +- unified_inbox.json (for AI/programmatic use) +- markdown file (for human reading, e.g. Obsidian) +""" + +import json +import os +from datetime import datetime +from pathlib import Path +from loguru import logger + +from x_reader.schema import UnifiedContent + + +def save_to_json(item: UnifiedContent, filepath: str = "unified_inbox.json"): + """Append content to JSON inbox file.""" + path = Path(filepath) + data = [] + + if path.exists(): + try: + with open(path, 'r', encoding='utf-8') as f: + data = json.load(f) + except (json.JSONDecodeError, IOError): + data = [] + + data.append(item.to_dict()) + + # Keep last 500 entries to prevent unbounded growth + data = data[-500:] + + with open(path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + logger.info(f"Saved to JSON: {path}") + + +def save_to_markdown(item: UnifiedContent, filepath: str = None): + """ + Append content to a Markdown file (e.g. Obsidian vault). + + Supports two output modes: + - OUTPUT_DIR: Write to {OUTPUT_DIR}/content_hub.md + - OBSIDIAN_VAULT: Write to {OBSIDIAN_VAULT}/01-收集箱/x-reader-inbox.md + + If neither is set, skips markdown output. + """ + if not filepath: + # Priority 1: Obsidian vault + vault_path = os.getenv("OBSIDIAN_VAULT", "") + if vault_path: + filepath = os.path.join(vault_path, "01-收集箱", "x-reader-inbox.md") + else: + # Priority 2: generic output dir + output_dir = os.getenv("OUTPUT_DIR", "") + if not output_dir: + return + filepath = os.path.join(output_dir, "content_hub.md") + + path = Path(filepath) + path.parent.mkdir(parents=True, exist_ok=True) + + emoji = { + "telegram": "📢", "rss": "📰", "bilibili": "🎬", + "xhs": "📕", "twitter": "🐦", "wechat": "💬", + "youtube": "▶️", "manual": "✏️", + }.get(item.source_type.value, "📄") + + with open(path, 'a', encoding='utf-8') as f: + f.write(f"\n## {emoji} {item.title}\n") + f.write(f"- Source: {item.source_name} ({item.source_type.value})\n") + f.write(f"- URL: {item.url}\n") + f.write(f"- Fetched: {item.fetched_at[:16]}\n\n") + f.write(f"{item.content[:2000]}\n") + f.write("\n---\n") + + logger.info(f"Saved to Markdown: {path}") + + +def save_content(item: UnifiedContent, json_path: str = None, md_path: str = None): + """Save content to both JSON and Markdown.""" + inbox_file = json_path or os.getenv("INBOX_FILE", "unified_inbox.json") + save_to_json(item, inbox_file) + save_to_markdown(item, md_path)