fix: YouTube JS runtime check, Douyin health check, cli bare except, config permission race (#104)
- YouTube: warn when only Node.js is installed but yt-dlp config file is missing (previously returned "ok" incorrectly) - Douyin: use `mcporter list` instead of calling with a hardcoded invalid share URL that always fails - cli: replace bare `except:` with `except Exception:` in `_detect_environment` to avoid catching KeyboardInterrupt/SystemExit - cli: fix unclosed file handle for cloud VM detection - config: use `os.open()` with 0o600 mode to eliminate permission race window when saving credentials
This commit is contained in:
parent
4b7d55111f
commit
3a3d38acce
6 changed files with 123 additions and 19 deletions
|
|
@ -42,13 +42,15 @@ class DouyinChannel(Channel):
|
|||
)
|
||||
except Exception:
|
||||
return "off", "mcporter 连接异常"
|
||||
# Verify MCP connectivity by listing available tools instead of
|
||||
# calling with a hardcoded (invalid) share link that always fails.
|
||||
try:
|
||||
r = subprocess.run(
|
||||
[mcporter, "call", "douyin.parse_douyin_video_info(share_link: \"https://www.douyin.com\")"],
|
||||
[mcporter, "list", "douyin"],
|
||||
capture_output=True, encoding="utf-8", errors="replace", timeout=15
|
||||
)
|
||||
if r.returncode == 0:
|
||||
if r.returncode == 0 and r.stdout.strip():
|
||||
return "ok", "完整可用(视频解析、下载链接获取)"
|
||||
return "warn", "MCP 已连接但调用异常,检查 douyin-mcp-server 服务是否在运行"
|
||||
return "warn", "MCP 已连接但工具列表为空,检查 douyin-mcp-server 服务是否在运行"
|
||||
except Exception:
|
||||
return "warn", "MCP 连接异常,检查 douyin-mcp-server 服务是否在运行"
|
||||
|
|
|
|||
|
|
@ -28,13 +28,17 @@ class YouTubeChannel(Channel):
|
|||
" 安装 Node.js 或 deno,然后运行:agent-reach install"
|
||||
)
|
||||
# Check yt-dlp config for --js-runtimes
|
||||
ytdlp_config = os.path.expanduser("~/.config/yt-dlp/config")
|
||||
# Deno works out of the box; Node.js requires explicit config
|
||||
has_deno = shutil.which("deno")
|
||||
if not has_deno and os.path.exists(ytdlp_config):
|
||||
with open(ytdlp_config, "r") as f:
|
||||
if "--js-runtimes" not in f.read():
|
||||
return "warn", (
|
||||
"yt-dlp 已安装但未配置 JS runtime。运行:\n"
|
||||
" mkdir -p ~/.config/yt-dlp && echo '--js-runtimes node' >> ~/.config/yt-dlp/config"
|
||||
)
|
||||
if not has_deno:
|
||||
ytdlp_config = os.path.expanduser("~/.config/yt-dlp/config")
|
||||
has_js_config = False
|
||||
if os.path.exists(ytdlp_config):
|
||||
with open(ytdlp_config, "r") as f:
|
||||
has_js_config = "--js-runtimes" in f.read()
|
||||
if not has_js_config:
|
||||
return "warn", (
|
||||
"yt-dlp 已安装但未配置 JS runtime。运行:\n"
|
||||
" mkdir -p ~/.config/yt-dlp && echo '--js-runtimes node' >> ~/.config/yt-dlp/config"
|
||||
)
|
||||
return "ok", "可提取视频信息和字幕"
|
||||
|
|
|
|||
|
|
@ -720,10 +720,11 @@ def _detect_environment():
|
|||
for cloud_file in ["/sys/hypervisor/uuid", "/sys/class/dmi/id/product_name"]:
|
||||
if os.path.exists(cloud_file):
|
||||
try:
|
||||
content = open(cloud_file).read().lower()
|
||||
with open(cloud_file) as f:
|
||||
content = f.read().lower()
|
||||
if any(x in content for x in ["amazon", "google", "microsoft", "digitalocean", "linode", "vultr", "hetzner"]):
|
||||
indicators += 2
|
||||
except:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# systemd-detect-virt
|
||||
|
|
@ -732,7 +733,7 @@ def _detect_environment():
|
|||
result = subprocess.run(["systemd-detect-virt"], capture_output=True, encoding="utf-8", errors="replace", timeout=3)
|
||||
if result.returncode == 0 and result.stdout.strip() != "none":
|
||||
indicators += 1
|
||||
except:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return "server" if indicators >= 2 else "local"
|
||||
|
|
|
|||
|
|
@ -49,14 +49,22 @@ class Config:
|
|||
def save(self):
|
||||
"""Save config to YAML file."""
|
||||
self._ensure_dir()
|
||||
with open(self.config_path, "w", encoding="utf-8") as f:
|
||||
yaml.dump(self.data, f, default_flow_style=False, allow_unicode=True)
|
||||
# Restrict permissions — config may contain credentials
|
||||
# Create file with restricted permissions from the start to avoid
|
||||
# a race window where credentials are briefly world-readable.
|
||||
try:
|
||||
import stat
|
||||
self.config_path.chmod(stat.S_IRUSR | stat.S_IWUSR) # 0o600
|
||||
fd = os.open(
|
||||
str(self.config_path),
|
||||
os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
|
||||
stat.S_IRUSR | stat.S_IWUSR, # 0o600
|
||||
)
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
||||
yaml.dump(self.data, f, default_flow_style=False, allow_unicode=True)
|
||||
except OSError:
|
||||
pass # Windows or permission edge cases
|
||||
# Fallback for Windows or other edge cases where os.open flags
|
||||
# are not fully supported.
|
||||
with open(self.config_path, "w", encoding="utf-8") as f:
|
||||
yaml.dump(self.data, f, default_flow_style=False, allow_unicode=True)
|
||||
|
||||
def get(self, key: str, default: Any = None) -> Any:
|
||||
"""Get a config value. Also checks environment variables (uppercase)."""
|
||||
|
|
|
|||
|
|
@ -29,6 +29,82 @@ def test_channel_check_contract_with_minimal_runtime(monkeypatch, tmp_path):
|
|||
assert isinstance(message, str) and message.strip()
|
||||
|
||||
|
||||
def test_youtube_warns_when_node_only_and_no_config(monkeypatch, tmp_path):
|
||||
"""YouTube should warn when only Node.js is installed but no yt-dlp config exists."""
|
||||
from agent_reach.channels.youtube import YouTubeChannel
|
||||
|
||||
def fake_which(cmd):
|
||||
if cmd == "yt-dlp":
|
||||
return "/usr/bin/yt-dlp"
|
||||
if cmd == "node":
|
||||
return "/usr/bin/node"
|
||||
return None # deno not installed
|
||||
|
||||
monkeypatch.setattr("shutil.which", fake_which)
|
||||
# Point to a non-existent config file
|
||||
monkeypatch.setattr("os.path.expanduser", lambda p: str(tmp_path / ".config/yt-dlp/config"))
|
||||
|
||||
ch = YouTubeChannel()
|
||||
status, message = ch.check()
|
||||
assert status == "warn"
|
||||
assert "--js-runtimes" in message
|
||||
|
||||
|
||||
def test_youtube_ok_when_deno_installed(monkeypatch):
|
||||
"""YouTube should return ok when Deno is installed (no config needed)."""
|
||||
from agent_reach.channels.youtube import YouTubeChannel
|
||||
|
||||
def fake_which(cmd):
|
||||
if cmd == "yt-dlp":
|
||||
return "/usr/bin/yt-dlp"
|
||||
if cmd == "deno":
|
||||
return "/usr/bin/deno"
|
||||
return None
|
||||
|
||||
monkeypatch.setattr("shutil.which", fake_which)
|
||||
|
||||
ch = YouTubeChannel()
|
||||
status, _msg = ch.check()
|
||||
assert status == "ok"
|
||||
|
||||
|
||||
def test_douyin_check_does_not_call_with_invalid_url(monkeypatch, tmp_path):
|
||||
"""Douyin check should use 'mcporter list' instead of calling with a hardcoded URL."""
|
||||
import subprocess
|
||||
|
||||
from agent_reach.channels.douyin import DouyinChannel
|
||||
|
||||
calls = []
|
||||
original_run = subprocess.run
|
||||
|
||||
def tracking_run(cmd, **kwargs):
|
||||
calls.append(cmd)
|
||||
# Simulate mcporter config list returning douyin
|
||||
if "config" in cmd and "list" in cmd:
|
||||
class R:
|
||||
stdout = "douyin http://localhost:18070/mcp"
|
||||
returncode = 0
|
||||
return R()
|
||||
# Simulate mcporter list douyin returning tools
|
||||
if "list" in cmd and "douyin" in cmd:
|
||||
class R:
|
||||
stdout = "parse_douyin_video_info"
|
||||
returncode = 0
|
||||
return R()
|
||||
return original_run(cmd, **kwargs)
|
||||
|
||||
monkeypatch.setattr("shutil.which", lambda cmd: "/usr/bin/mcporter" if cmd == "mcporter" else None)
|
||||
monkeypatch.setattr("subprocess.run", tracking_run)
|
||||
|
||||
ch = DouyinChannel()
|
||||
status, _msg = ch.check()
|
||||
|
||||
# Should NOT contain any hardcoded douyin.com URL in subprocess calls
|
||||
for call in calls:
|
||||
call_str = " ".join(call) if isinstance(call, list) else str(call)
|
||||
assert "https://www.douyin.com" not in call_str
|
||||
|
||||
|
||||
def test_channel_can_handle_contract():
|
||||
url_samples = {
|
||||
"github": "https://github.com/panniantong/agent-reach",
|
||||
|
|
|
|||
|
|
@ -78,3 +78,16 @@ class TestConfig:
|
|||
masked = tmp_config.to_dict()
|
||||
assert masked["exa_api_key"] == "super-se..."
|
||||
assert masked["normal_setting"] == "visible"
|
||||
|
||||
def test_save_creates_file_with_restricted_permissions(self, tmp_path):
|
||||
import stat
|
||||
import sys
|
||||
config_file = tmp_path / "secure_config.yaml"
|
||||
config = Config(config_path=config_file)
|
||||
config.set("secret_key", "my-secret")
|
||||
|
||||
if sys.platform != "win32":
|
||||
mode = config_file.stat().st_mode
|
||||
# File should be owner-only read/write (0o600)
|
||||
assert not (mode & stat.S_IRGRP), "group read should not be set"
|
||||
assert not (mode & stat.S_IROTH), "other read should not be set"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue