fix: auto-register skill on doctor + add XHS output formatter (#154, #134) (#199)

- Add 'agent-reach skill --install/--uninstall' command for explicit skill management
- Make 'agent-reach doctor' auto-install skill if not present (fixes #154)
- Add format_xhs_result() to strip bloated XHS JSON to essential fields (fixes #134)
- Add 'agent-reach format xhs' CLI command (pipe mcporter output to clean it)
- Update SKILL.md with XHS formatter usage tip
- Add tests for both features (11 new tests, 73/73 total pass)

Co-authored-by: Panniantong <panniantong@users.noreply.github.com>
This commit is contained in:
Pnant 2026-03-23 19:40:57 +08:00 committed by GitHub
parent 470c1288d0
commit 3be2a64120
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 421 additions and 0 deletions

View file

@ -8,6 +8,115 @@ import subprocess
from .base import Channel from .base import Channel
def format_xhs_result(data):
"""Clean XHS API response, keeping only useful fields.
Handles both single note objects and lists of notes (search results).
Drastically reduces token usage by stripping structural redundancy (#134).
"""
if isinstance(data, list):
return [_clean_note(item) for item in data]
if isinstance(data, dict):
# Handle search_feeds wrapper: {"items": [...]} or {"data": {"items": [...]}}
items = None
if "items" in data:
items = data["items"]
elif "data" in data and isinstance(data.get("data"), dict):
items = data["data"].get("items") or data["data"].get("notes")
if items and isinstance(items, list):
return [_clean_note(item) for item in items]
# Single note
return _clean_note(data)
return data
def _clean_note(note):
"""Extract useful fields from a single XHS note/feed item."""
if not isinstance(note, dict):
return note
# Some responses nest the note under "note_card" or "note"
inner = note.get("note_card") or note.get("note") or note
result = {}
# Basic info
for key in ("id", "note_id", "xsec_token", "title", "desc", "type", "time"):
if key in inner:
result[key] = inner[key]
# Content (may be in desc or content)
if "content" in inner and "desc" not in result:
result["content"] = inner["content"]
# Author
user = inner.get("user") or inner.get("author")
if isinstance(user, dict):
result["user"] = {
k: user[k] for k in ("nickname", "user_id", "nick_name") if k in user
}
# Engagement metrics
interact = inner.get("interact_info") or inner.get("note_interact_info") or {}
if isinstance(interact, dict):
for key in ("liked_count", "collected_count", "comment_count", "share_count"):
if key in interact:
result[key] = interact[key]
# Also check top-level (some API formats)
for key in ("liked_count", "collected_count", "comment_count", "share_count"):
if key in inner and key not in result:
result[key] = inner[key]
# Images — just URLs
images = inner.get("image_list") or inner.get("images_list") or []
if isinstance(images, list):
urls = []
for img in images:
if isinstance(img, dict):
url = img.get("url") or img.get("url_default") or img.get("original")
if url:
urls.append(url)
elif isinstance(img, str):
urls.append(img)
if urls:
result["images"] = urls
# Tags
tags = inner.get("tag_list") or inner.get("tags") or []
if isinstance(tags, list):
tag_names = []
for t in tags:
if isinstance(t, dict) and "name" in t:
tag_names.append(t["name"])
elif isinstance(t, str):
tag_names.append(t)
if tag_names:
result["tags"] = tag_names
# Comments (if present, e.g. from get_feed_detail with comments)
comments = inner.get("comments") or []
if isinstance(comments, list) and comments:
result["comments"] = [_clean_comment(c) for c in comments]
return result
def _clean_comment(comment):
"""Extract useful fields from a comment."""
if not isinstance(comment, dict):
return comment
result = {}
if "content" in comment:
result["content"] = comment["content"]
user = comment.get("user_info") or comment.get("user")
if isinstance(user, dict):
result["user"] = user.get("nickname") or user.get("nick_name", "")
for key in ("like_count", "sub_comment_count"):
if key in comment:
result[key] = comment[key]
return result
def _is_arm64() -> bool: def _is_arm64() -> bool:
"""Detect ARM64 architecture (e.g. Apple Silicon).""" """Detect ARM64 architecture (e.g. Apple Silicon)."""
machine = platform.machine().lower() machine = platform.machine().lower()

View file

@ -91,6 +91,18 @@ def main():
p_uninstall.add_argument("--keep-config", action="store_true", p_uninstall.add_argument("--keep-config", action="store_true",
help="Remove skill files only, keep ~/.agent-reach/ config and tokens") help="Remove skill files only, keep ~/.agent-reach/ config and tokens")
# ── skill ──
p_skill = sub.add_parser("skill", help="Manage agent skill registration")
p_skill_group = p_skill.add_mutually_exclusive_group(required=True)
p_skill_group.add_argument("--install", action="store_true",
help="Install SKILL.md to agent skill directories")
p_skill_group.add_argument("--uninstall", action="store_true",
help="Remove SKILL.md from agent skill directories")
# ── format ──
p_format = sub.add_parser("format", help="Clean and format platform API output")
p_format.add_argument("platform", choices=["xhs"], help="Platform to format (xhs)")
# ── check-update ── # ── check-update ──
sub.add_parser("check-update", help="Check for new versions and changes") sub.add_parser("check-update", help="Check for new versions and changes")
@ -127,6 +139,10 @@ def main():
_cmd_configure(args) _cmd_configure(args)
elif args.command == "uninstall": elif args.command == "uninstall":
_cmd_uninstall(args) _cmd_uninstall(args)
elif args.command == "skill":
_cmd_skill(args)
elif args.command == "format":
_cmd_format(args)
# ── Command handlers ──────────────────────────────── # ── Command handlers ────────────────────────────────
@ -315,6 +331,69 @@ def _install_skill():
print(" -- Could not install agent skill (optional)") print(" -- Could not install agent skill (optional)")
def _uninstall_skill():
"""Remove SKILL.md from all known agent skill directories."""
import shutil
skill_dirs = [
("~/.openclaw/skills/agent-reach", "OpenClaw"),
("~/.claude/skills/agent-reach", "Claude Code"),
("~/.agents/skills/agent-reach", "Agent"),
]
# Also check OPENCLAW_HOME
openclaw_home = os.environ.get("OPENCLAW_HOME")
if openclaw_home:
skill_dirs.insert(
0,
(os.path.join(openclaw_home, ".openclaw", "skills", "agent-reach"), "OpenClaw"),
)
removed = False
for skill_path_template, platform_name in skill_dirs:
skill_path = os.path.expanduser(skill_path_template)
if os.path.isdir(skill_path):
try:
shutil.rmtree(skill_path)
print(f" Removed {platform_name} skill: {skill_path}")
removed = True
except Exception as e:
print(f" Could not remove {skill_path}: {e}")
if not removed:
print(" No skill installations found.")
def _cmd_skill(args):
"""Manage agent skill registration."""
if args.install:
_install_skill()
elif args.uninstall:
_uninstall_skill()
def _cmd_format(args):
"""Clean and format platform API output from stdin."""
import json
import sys
if args.platform == "xhs":
from agent_reach.channels.xiaohongshu import format_xhs_result
raw = sys.stdin.read().strip()
if not raw:
print("Error: no input on stdin", file=sys.stderr)
sys.exit(1)
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
print(f"Error: invalid JSON: {e}", file=sys.stderr)
sys.exit(1)
cleaned = format_xhs_result(data)
print(json.dumps(cleaned, ensure_ascii=False, indent=2))
def _install_system_deps(): def _install_system_deps():
"""Install system-level dependencies: gh CLI, Node.js (for mcporter).""" """Install system-level dependencies: gh CLI, Node.js (for mcporter)."""
import shutil import shutil
@ -1301,6 +1380,9 @@ def _cmd_doctor():
results = check_all(config) results = check_all(config)
rprint(format_report(results)) rprint(format_report(results))
# Auto-install skill if not already present (fixes #154)
_install_skill()
def _cmd_setup(): def _cmd_setup():
from agent_reach.config import Config from agent_reach.config import Config

View file

@ -97,6 +97,13 @@ mcporter call 'xiaohongshu.publish_content(title: "标题", content: "正文", i
> Requires login. Use Cookie-Editor to import cookies. > Requires login. Use Cookie-Editor to import cookies.
> **Tip: Clean bloated output.** XHS API returns large JSON with many unused fields.
> Pipe through the formatter to save context:
> ```bash
> mcporter call 'xiaohongshu.search_feeds(keyword: "query")' | agent-reach format xhs
> ```
> This keeps only: title, content, author, engagement counts, image URLs, and tags.
## 抖音 / Douyin (mcporter) ## 抖音 / Douyin (mcporter)
```bash ```bash

View file

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
"""Tests for 'agent-reach skill' command and _install_skill / _uninstall_skill."""
import os
import tempfile
import unittest
from unittest.mock import patch
from agent_reach.cli import _install_skill, _uninstall_skill
class TestSkillCommand(unittest.TestCase):
"""Test skill install and uninstall via CLI helpers."""
def test_install_skill_creates_skill_md(self):
"""_install_skill should create SKILL.md in the first available skill dir."""
with tempfile.TemporaryDirectory() as tmpdir:
skill_dir = os.path.join(tmpdir, "skills")
os.makedirs(skill_dir)
with patch(
"agent_reach.cli.os.path.expanduser",
side_effect=lambda p: p.replace("~", tmpdir),
), patch.dict(os.environ, {}, clear=False):
# Remove OPENCLAW_HOME to avoid interference
env = os.environ.copy()
env.pop("OPENCLAW_HOME", None)
with patch.dict(os.environ, env, clear=True):
_install_skill()
target = os.path.join(skill_dir, "agent-reach", "SKILL.md")
# Check at least one known skill dir pattern
found = False
for dirpath, _, filenames in os.walk(tmpdir):
if "SKILL.md" in filenames:
found = True
# Verify content is non-empty
with open(os.path.join(dirpath, "SKILL.md")) as f:
content = f.read()
self.assertIn("Agent Reach", content)
# _install_skill may or may not find dirs depending on mock; just ensure no crash
# The important test is that the function runs without error
def test_uninstall_skill_removes_dir(self):
"""_uninstall_skill should remove skill directories."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create a fake skill installation
skill_path = os.path.join(tmpdir, ".openclaw", "skills", "agent-reach")
os.makedirs(skill_path)
with open(os.path.join(skill_path, "SKILL.md"), "w") as f:
f.write("test")
self.assertTrue(os.path.exists(skill_path))
with patch(
"agent_reach.cli.os.path.expanduser",
side_effect=lambda p: p.replace("~", tmpdir),
), patch.dict(os.environ, {}, clear=False):
env = os.environ.copy()
env.pop("OPENCLAW_HOME", None)
with patch.dict(os.environ, env, clear=True):
_uninstall_skill()
self.assertFalse(os.path.exists(skill_path))
def test_install_creates_dir_if_parent_exists(self):
"""_install_skill should create agent-reach dir inside existing skill dir."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create the .openclaw/skills parent but not agent-reach subdir
skill_parent = os.path.join(tmpdir, ".openclaw", "skills")
os.makedirs(skill_parent)
with patch(
"agent_reach.cli.os.path.expanduser",
side_effect=lambda p: p.replace("~", tmpdir),
), patch.dict(os.environ, {}, clear=False):
env = os.environ.copy()
env.pop("OPENCLAW_HOME", None)
with patch.dict(os.environ, env, clear=True):
_install_skill()
target = os.path.join(skill_parent, "agent-reach", "SKILL.md")
self.assertTrue(os.path.exists(target))
with open(target) as f:
content = f.read()
self.assertIn("Agent Reach", content)
if __name__ == "__main__":
unittest.main()

133
tests/test_xhs_format.py Normal file
View file

@ -0,0 +1,133 @@
# -*- coding: utf-8 -*-
"""Tests for XiaoHongShu output formatter (issue #134)."""
import unittest
from agent_reach.channels.xiaohongshu import format_xhs_result
class TestFormatXhsResult(unittest.TestCase):
"""Test format_xhs_result strips redundant fields."""
SAMPLE_NOTE = {
"id": "abc123",
"title": "测试笔记",
"desc": "这是正文内容",
"type": "normal",
"xsec_token": "tok_xxx",
"user": {
"nickname": "小红",
"user_id": "u123",
"avatar": "https://example.com/avatar.jpg",
"extra_field": "should be dropped",
},
"interact_info": {
"liked_count": "100",
"collected_count": "50",
"comment_count": "20",
"share_count": "10",
"sticky_count": "0",
"relation": "none",
},
"image_list": [
{
"url": "https://img.example.com/1.jpg",
"info_list": [{"url": "https://img.example.com/1_small.jpg", "image_scene": "WB_DFT"}],
"width": 1080,
"height": 1440,
"trace_id": "tr_123",
},
{
"url": "https://img.example.com/2.jpg",
"info_list": [{"url": "https://img.example.com/2_small.jpg"}],
"width": 1080,
"height": 1080,
},
],
"tag_list": [
{"id": "t1", "name": "旅行", "type": "topic"},
{"id": "t2", "name": "美食", "type": "topic"},
],
"at_user_list": [],
"geo_info": {"latitude": 0, "longitude": 0},
"audit_info": {"audit_status": 0},
"model_type": None,
"note_flow_source": "search",
}
def test_single_note_keeps_useful_fields(self):
result = format_xhs_result(self.SAMPLE_NOTE)
self.assertEqual(result["id"], "abc123")
self.assertEqual(result["title"], "测试笔记")
self.assertEqual(result["desc"], "这是正文内容")
self.assertEqual(result["type"], "normal")
self.assertEqual(result["user"]["nickname"], "小红")
self.assertEqual(result["liked_count"], "100")
self.assertEqual(result["collected_count"], "50")
self.assertEqual(result["images"], [
"https://img.example.com/1.jpg",
"https://img.example.com/2.jpg",
])
self.assertEqual(result["tags"], ["旅行", "美食"])
def test_single_note_drops_useless_fields(self):
result = format_xhs_result(self.SAMPLE_NOTE)
self.assertNotIn("at_user_list", result)
self.assertNotIn("geo_info", result)
self.assertNotIn("audit_info", result)
self.assertNotIn("model_type", result)
self.assertNotIn("note_flow_source", result)
# User should not have extra fields
self.assertNotIn("avatar", result.get("user", {}))
self.assertNotIn("extra_field", result.get("user", {}))
def test_search_results_wrapper(self):
"""Handle {"items": [...]} wrapper from search_feeds."""
wrapped = {"items": [self.SAMPLE_NOTE, self.SAMPLE_NOTE]}
result = format_xhs_result(wrapped)
self.assertIsInstance(result, list)
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["title"], "测试笔记")
def test_list_input(self):
result = format_xhs_result([self.SAMPLE_NOTE])
self.assertIsInstance(result, list)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["title"], "测试笔记")
def test_note_card_wrapper(self):
"""Handle notes nested under 'note_card'."""
wrapped = {"note_card": self.SAMPLE_NOTE}
result = format_xhs_result(wrapped)
self.assertEqual(result["title"], "测试笔记")
def test_with_comments(self):
note = dict(self.SAMPLE_NOTE)
note["comments"] = [
{
"content": "写得好!",
"user_info": {"nickname": "路人甲", "user_id": "u456"},
"like_count": 5,
"sub_comment_count": 1,
"ip_location": "上海",
"status": 0,
}
]
result = format_xhs_result(note)
self.assertEqual(len(result["comments"]), 1)
self.assertEqual(result["comments"][0]["content"], "写得好!")
self.assertEqual(result["comments"][0]["user"], "路人甲")
self.assertEqual(result["comments"][0]["like_count"], 5)
self.assertNotIn("ip_location", result["comments"][0])
def test_empty_input(self):
self.assertEqual(format_xhs_result({}), {})
self.assertEqual(format_xhs_result([]), [])
def test_non_dict_passthrough(self):
self.assertEqual(format_xhs_result("hello"), "hello")
self.assertIsNone(format_xhs_result(None))
if __name__ == "__main__":
unittest.main()