Add security sanitizer, opt-in telemetry, and contributor guidelines

Infrastructure:
- security/: PII sanitizer with scan/sanitize modes, pre-commit hook, configurable blocklists
- telemetry/: GStack-style opt-in usage analytics, local stats viewer, version checker
- CONTRIBUTING.md: Privacy-first contributor guidelines with anonymization rules
- VERSION: 1.0.0

README updated with Privacy & Security and Telemetry sections.
This commit is contained in:
Alfred Claw 2026-03-31 08:41:35 -07:00
parent 36d6ed83e7
commit d4c8c21cb3
12 changed files with 1402 additions and 4 deletions

107
CONTRIBUTING.md Normal file
View file

@ -0,0 +1,107 @@
# Contributing to AI Marketing Skills
AI Marketing Skills is an open-source collection of production marketing automation skills. Thanks for contributing.
- **Repo:** [github.com/singlegrain/ai-marketing-skills](https://github.com/singlegrain/ai-marketing-skills)
- **README:** [README.md](./README.md)
---
## 🔒 Data Privacy & Anonymization
**This is the #1 rule. No exceptions.**
ALL example outputs, training data, sample data, and test fixtures MUST be fully anonymized before commit. Real client data, revenue figures, or internal metrics are **never** acceptable in any commit.
| Data Type | Rule | Example |
|-----------|------|---------|
| Company names | Use fictional names | "Acme Corp", "TechStart Inc" |
| Person names | Use fictional names | "Jane Smith", "John Doe" |
| Email addresses | Use example.com domain | jane@example.com |
| Phone numbers | Use 555-xxxx format | 555-0142 |
| Dollar amounts | Use round fictional numbers | $50,000 |
| API keys/tokens | Use obvious placeholders | `sk-your-key-here` |
**Before every commit**, run the sanitizer:
```bash
python3 security/sanitizer.py --scan --dir . --recursive
```
The pre-commit hook will block commits with detected PII. See [security/README.md](./security/README.md) for setup.
---
## Skill Structure
Every skill category requires these files:
```
skill-category/
├── SKILL.md # Claude Code skill definition (name, description, steps)
├── README.md # Overview, quick start, architecture, examples
├── requirements.txt # Python dependencies
└── *.py # Implementation scripts
```
- **SKILL.md** follows Claude Code skill conventions: name, description, numbered steps.
- **README.md** includes: overview, quick start, architecture, examples, and the standard footer.
- **Python scripts** use `argparse` for CLI, include clear API stubs with comments, and handle missing dependencies gracefully.
---
## Code Standards
- **Python 3.10+**
- Use `argparse` for all CLI interfaces with `--help` documentation
- **Graceful failures:** Never crash on missing API keys. Show a helpful error message instead.
- Type hints encouraged but not required
- Prefer stdlib. No external dependencies without justification in your PR description.
```python
# Good
if not os.environ.get("API_KEY"):
print("Error: Set API_KEY environment variable. Get one at https://...")
sys.exit(1)
# Bad
api_key = os.environ["API_KEY"] # KeyError if missing
```
---
## Telemetry Integration
New skills **must** integrate telemetry logging. See [telemetry/README.md](./telemetry/README.md) for the integration guide.
- Add a version check to your SKILL.md preamble
- **Never log sensitive data through telemetry** (API keys, PII, client data)
---
## Pull Request Process
1. **Fork** the repo
2. **Branch** from `main` (use descriptive branch names: `feat/email-drip-skill`, `fix/sanitizer-regex`)
3. **Build** your changes
4. **Verify** before submitting:
- All Python files compile clean: `python3 -m py_compile your_file.py`
- Sanitizer scan passes: `python3 security/sanitizer.py --scan --dir . --recursive`
5. **Open a PR** with a description that includes:
- What it does
- Which skill category it affects
- ✅ Confirmation that all data is anonymized
---
## Reporting Security Issues
Found a vulnerability? **Do not open a public issue.**
Email [security@singlegrain.com](mailto:security@singlegrain.com) with details. We'll respond within 48 hours.
---
<p align="center">
Built by <a href="https://www.singlegrain.com/?utm_source=github&utm_medium=repo&utm_campaign=ai-marketing-skills">Single Grain</a>. Powered by <a href="https://www.singlebrain.com/?utm_source=github&utm_medium=repo&utm_campaign=ai-marketing-skills">Single Brain</a>.
</p>

View file

@ -137,15 +137,56 @@ ai-marketing-skills/
---
## 🔒 Privacy & Security
Every skill is built with data privacy in mind:
- **PII Sanitizer** scans code and data for sensitive information before commits (`security/sanitizer.py`)
- **Pre-commit hook** blocks commits containing detected PII patterns
- **Configurable blocklists** for company names, person names, and custom patterns
- See [`security/README.md`](./security/README.md) for setup
```bash
# Scan for sensitive data
python3 security/sanitizer.py --scan --dir . --recursive
# Install the pre-commit hook
cp security/pre-commit-hook.sh .git/hooks/pre-commit && chmod +x .git/hooks/pre-commit
```
---
## 📡 Telemetry (Opt-In)
Anonymous usage telemetry helps us understand which skills people actually use. Fully opt-in, privacy-first:
- **Local logging always** — see your own usage stats in `~/.ai-marketing-skills/analytics/`
- **Remote reporting optional** — only if you explicitly opt in on first run
- **Data collected:** skill name, duration, success/fail, version, OS. Nothing else. No code, no file paths, no repo content.
- **Version checks** — get notified when new skills are available
```bash
# View your local usage stats
python3 telemetry/telemetry_report.py
# Check for updates
python3 telemetry/version_check.py
```
See [`telemetry/README.md`](./telemetry/README.md) for details.
---
## 🤝 Contributing
Found a bug? Have an improvement? PRs welcome.
Found a bug? Have an improvement? PRs welcome. Read [`CONTRIBUTING.md`](./CONTRIBUTING.md) for guidelines.
1. Fork the repo
2. Create your feature branch (`git checkout -b feature/better-scoring`)
3. Commit your changes
4. Push to the branch
5. Open a Pull Request
3. Run `python3 security/sanitizer.py --scan` before committing
4. Commit your changes
5. Push to the branch
6. Open a Pull Request
---

1
VERSION Normal file
View file

@ -0,0 +1 @@
1.0.0

80
security/README.md Normal file
View file

@ -0,0 +1,80 @@
# Security Sanitizer
Scans and redacts PII / sensitive data from files in this repo.
## What It Catches
| Type | Examples |
|------|----------|
| **EMAIL** | user@example.com |
| **PHONE** | (555) 123-4567, +1-555-123-4567 |
| **SSN** | 123-45-6789 |
| **API_KEY** | sk-xxx, ghp_xxx, op_xxx, Bearer tokens, KEY=value patterns |
| **IP_ADDRESS** | Public IPv4 addresses (skips localhost/private ranges) |
| **URL_CREDENTIALS** | https://user:pass@host.com |
| **AMOUNT** | $1,234.56, $1.2M, $500K |
| **COMPANY** | Names from blocklist (configurable) |
| **PERSON** | Names from blocklist + title-prefixed names (Mr./Dr./etc.) |
| **CUSTOM** | Any regex you add to the config |
## Quick Start
```bash
# Scan the whole repo (dry run — changes nothing)
python3 security/sanitizer.py --scan --dir . --recursive
# Scan a single file
python3 security/sanitizer.py --scan --file path/to/file.py
# Redact PII in place
python3 security/sanitizer.py --sanitize --file path/to/file.py
# Redact everything recursively
python3 security/sanitizer.py --sanitize --dir . --recursive
```
Exit codes: `0` = clean, `1` = PII found (useful for CI).
## Configuration
Edit `security/sanitizer-config.json`:
```json
{
"company_blocklist": ["Single Grain", "ClickFlow", "Nextiva"],
"person_blocklist": ["Jane Doe"],
"custom_patterns": [
"ACME-\\d{6}",
{"label": "PROJECT_ID", "pattern": "proj_[A-Za-z0-9]{12}"}
],
"skip_paths": ["node_modules", ".git", "__pycache__", ".env.example"],
"placeholder_format": "bracket"
}
```
- **company_blocklist** — company names to always redact
- **person_blocklist** — person names to always redact
- **custom_patterns** — additional regex (string or `{label, pattern}` object)
- **skip_paths** — directory names to skip during recursive scan
- **placeholder_format**`"bracket"` for `[EMAIL]` or `"redacted"` for `[REDACTED]`
## Pre-Commit Hook
Install to block commits containing PII:
```bash
cp security/pre-commit-hook.sh .git/hooks/pre-commit
chmod +x .git/hooks/pre-commit
```
The hook scans staged files and blocks the commit if anything is detected.
To bypass in emergencies: `git commit --no-verify`
## Supported File Types
`.py`, `.md`, `.txt`, `.json`, `.yaml`, `.yml`, `.env`
## No External Dependencies
Uses only Python standard library (`re`, `json`, `os`, `sys`, `argparse`, `pathlib`).

72
security/pre-commit-hook.sh Executable file
View file

@ -0,0 +1,72 @@
#!/usr/bin/env bash
# Pre-commit hook: scan staged files for PII / sensitive data.
#
# Install:
# cp security/pre-commit-hook.sh .git/hooks/pre-commit
# chmod +x .git/hooks/pre-commit
#
# Bypass (emergency only):
# git commit --no-verify
set -euo pipefail
REPO_ROOT="$(git rev-parse --show-toplevel)"
SANITIZER="$REPO_ROOT/security/sanitizer.py"
if [ ! -f "$SANITIZER" ]; then
echo "⚠️ Sanitizer not found at $SANITIZER — skipping PII check."
exit 0
fi
# Get list of staged files
STAGED_FILES=$(git diff --cached --name-only --diff-filter=ACM)
if [ -z "$STAGED_FILES" ]; then
exit 0
fi
FOUND_PII=0
TEMP_REPORT=$(mktemp)
for FILE in $STAGED_FILES; do
FULL_PATH="$REPO_ROOT/$FILE"
# Only check supported extensions
case "$FILE" in
*.py|*.md|*.txt|*.json|*.yaml|*.yml|*.env)
;;
*)
continue
;;
esac
if [ ! -f "$FULL_PATH" ]; then
continue
fi
OUTPUT=$(python3 "$SANITIZER" --scan --file "$FULL_PATH" --quiet 2>&1) || true
if [ -n "$OUTPUT" ] && echo "$OUTPUT" | grep -q "issue"; then
echo "$FILE: $OUTPUT" >> "$TEMP_REPORT"
FOUND_PII=1
fi
done
if [ "$FOUND_PII" -eq 1 ]; then
echo ""
echo "🚫 COMMIT BLOCKED — PII / sensitive data detected in staged files:"
echo ""
cat "$TEMP_REPORT"
echo ""
echo "To fix:"
echo " 1. Run: python3 security/sanitizer.py --scan --dir . --recursive"
echo " 2. Review findings and redact manually, or run with --sanitize"
echo " 3. Stage the fixed files and commit again"
echo ""
echo "To bypass (emergency): git commit --no-verify"
rm -f "$TEMP_REPORT"
exit 1
fi
rm -f "$TEMP_REPORT"
exit 0

View file

@ -0,0 +1,22 @@
{
"company_blocklist": [
"ClickFlow",
"Nextiva"
],
"person_blocklist": [],
"custom_patterns": [],
"skip_paths": [
"node_modules",
".git",
"__pycache__",
".env.example"
],
"allow_patterns": [
"api_key=ANTHROPIC_API_KEY",
"api_key = get_",
"API_KEY=\"your-",
"0123456789"
],
"placeholder_format": "bracket",
"_comment": "Single Grain removed from blocklist since this is our repo. Fork users should add their own company names."
}

457
security/sanitizer.py Normal file
View file

@ -0,0 +1,457 @@
#!/usr/bin/env python3
"""
PII / Sensitive Data Sanitizer
Scans files for personally identifiable information and sensitive data.
Can report findings (--scan) or redact them in place (--sanitize).
Usage:
python3 security/sanitizer.py --scan --file path/to/file.py
python3 security/sanitizer.py --scan --dir . --recursive
python3 security/sanitizer.py --sanitize --file path/to/file.py
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
from collections import defaultdict
# ---------------------------------------------------------------------------
# Default configuration
# ---------------------------------------------------------------------------
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_CONFIG_PATH = SCRIPT_DIR / "sanitizer-config.json"
SUPPORTED_EXTENSIONS = {".py", ".md", ".txt", ".json", ".yaml", ".yml", ".env"}
DEFAULT_SKIP_PATHS = {"node_modules", ".git", "__pycache__", ".env.example"}
# ---------------------------------------------------------------------------
# Detection patterns
# ---------------------------------------------------------------------------
# Each entry: (label, placeholder_bracket, placeholder_redacted, regex, flags)
# Order matters more specific patterns should come first.
PATTERNS: list[tuple[str, str, str, str, int]] = [
# SSN (xxx-xx-xxxx)
(
"SSN",
"[SSN]",
"[REDACTED]",
r"\b\d{3}-\d{2}-\d{4}\b",
0,
),
# API keys / tokens (sk-..., ghp_..., op_..., Bearer ...)
(
"API_KEY",
"[API_KEY]",
"[REDACTED]",
r"(?i)\b(?:sk-[A-Za-z0-9_\-]{20,}|ghp_[A-Za-z0-9]{36,}|op_[A-Za-z0-9_\-]{20,}|gho_[A-Za-z0-9]{36,}|xox[bposarc]-[A-Za-z0-9\-]{10,})\b",
0,
),
# Bearer tokens
(
"API_KEY",
"[API_KEY]",
"[REDACTED]",
r"(?i)Bearer\s+[A-Za-z0-9_\-\.]{20,}",
0,
),
# Generic secret assignment patterns (API_KEY=..., SECRET=..., TOKEN=...)
(
"API_KEY",
"[API_KEY]",
"[REDACTED]",
r"""(?i)(?:api[_-]?key|secret[_-]?key|access[_-]?token|auth[_-]?token|private[_-]?key)\s*[=:]\s*["']?[A-Za-z0-9_\-\.\/\+]{16,}["']?""",
0,
),
# URLs with embedded credentials (https://user:pass@host)
(
"URL_CREDENTIALS",
"[URL_CREDENTIALS]",
"[REDACTED]",
r"https?://[^\s:]+:[^\s@]+@[^\s]+",
0,
),
# Email addresses
(
"EMAIL",
"[EMAIL]",
"[REDACTED]",
r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b",
0,
),
# IP addresses (IPv4) skip 0.0.0.0, 127.0.0.1, 255.255.255.255 common dev IPs
(
"IP_ADDRESS",
"[IP_ADDRESS]",
"[REDACTED]",
r"\b(?!0\.0\.0\.0|127\.0\.0\.1|255\.255\.255\.255|192\.168\.\d{1,3}\.\d{1,3}|10\.\d{1,3}\.\d{1,3}\.\d{1,3}|172\.(?:1[6-9]|2\d|3[01])\.\d{1,3}\.\d{1,3})\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
0,
),
# Phone numbers (US-style: +1, (xxx), xxx-xxx-xxxx, etc.)
(
"PHONE",
"[PHONE]",
"[REDACTED]",
r"(?<!\d)(?:\+?1[\s\-]?)?(?:\(\d{3}\)|\d{3})[\s\-]?\d{3}[\s\-]?\d{4}(?!\d)",
0,
),
# Dollar amounts / revenue figures ($1,234 $1,234.56 $1M $1.2B)
(
"AMOUNT",
"[AMOUNT]",
"[REDACTED]",
r"\$\s?\d[\d,]*(?:\.\d{1,2})?(?:\s?[MBKmkb](?:illion|illion)?)?",
0,
),
]
# Person-name heuristic: two or more capitalized words that look like names.
# Kept intentionally conservative to reduce false positives.
PERSON_NAME_PATTERN = re.compile(
r"\b(?:Mr\.|Mrs\.|Ms\.|Dr\.|Prof\.)\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)+\b"
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def load_config(config_path: Path | None = None) -> dict:
path = config_path or DEFAULT_CONFIG_PATH
if path.exists():
with open(path, "r") as f:
return json.load(f)
return {}
def should_skip_path(path: Path, skip_paths: set[str]) -> bool:
parts = path.parts
for skip in skip_paths:
if skip in parts:
return True
return False
def is_import_line(line: str) -> bool:
stripped = line.strip()
return stripped.startswith(("import ", "from ", "#!", "# ", "//", "/*"))
class Finding:
__slots__ = ("label", "match", "line_num", "line")
def __init__(self, label: str, match: str, line_num: int, line: str):
self.label = label
self.match = match
self.line_num = line_num
self.line = line
def __repr__(self) -> str:
return f"[{self.label}] line {self.line_num}: {self.match!r}"
def scan_line(
line: str,
line_num: int,
compiled_patterns: list[tuple[str, str, str, re.Pattern]],
company_patterns: list[tuple[re.Pattern, str, str]],
person_patterns: list[tuple[re.Pattern, str, str]],
placeholder_format: str,
) -> list[Finding]:
"""Return findings for a single line."""
if is_import_line(line):
return []
findings: list[Finding] = []
for label, ph_bracket, ph_redacted, pat in compiled_patterns:
for m in pat.finditer(line):
findings.append(Finding(label, m.group(), line_num, line.rstrip()))
# Company blocklist
for cpat, ph_bracket, ph_redacted in company_patterns:
for m in cpat.finditer(line):
findings.append(Finding("COMPANY", m.group(), line_num, line.rstrip()))
# Person blocklist
for ppat, ph_bracket, ph_redacted in person_patterns:
for m in ppat.finditer(line):
findings.append(Finding("PERSON", m.group(), line_num, line.rstrip()))
# Person-name heuristic
for m in PERSON_NAME_PATTERN.finditer(line):
findings.append(Finding("PERSON", m.group(), line_num, line.rstrip()))
return findings
def compile_patterns(
config: dict,
) -> tuple[
list[tuple[str, str, str, re.Pattern]],
list[tuple[re.Pattern, str, str]],
list[tuple[re.Pattern, str, str]],
]:
"""Compile all regex patterns from defaults + config."""
placeholder_fmt = config.get("placeholder_format", "bracket")
compiled = []
for label, ph_b, ph_r, raw, flags in PATTERNS:
compiled.append((label, ph_b, ph_r, re.compile(raw, flags)))
# Custom patterns from config
for entry in config.get("custom_patterns", []):
if isinstance(entry, str):
compiled.append(
("CUSTOM", "[CUSTOM]", "[REDACTED]", re.compile(entry))
)
elif isinstance(entry, dict):
compiled.append((
entry.get("label", "CUSTOM"),
f"[{entry.get('label', 'CUSTOM')}]",
"[REDACTED]",
re.compile(entry["pattern"]),
))
# Company blocklist
company_patterns = []
for name in config.get("company_blocklist", []):
company_patterns.append((
re.compile(re.escape(name), re.IGNORECASE),
"[COMPANY]",
"[REDACTED]",
))
# Person blocklist
person_patterns = []
for name in config.get("person_blocklist", []):
person_patterns.append((
re.compile(re.escape(name), re.IGNORECASE),
"[PERSON]",
"[REDACTED]",
))
return compiled, company_patterns, person_patterns
def get_placeholder(label: str, placeholder_format: str) -> str:
if placeholder_format == "redacted":
return "[REDACTED]"
return f"[{label}]"
# ---------------------------------------------------------------------------
# File processing
# ---------------------------------------------------------------------------
def scan_file(
filepath: Path,
compiled: list,
company_pats: list,
person_pats: list,
placeholder_format: str,
) -> list[Finding]:
try:
text = filepath.read_text(errors="replace")
except (PermissionError, OSError) as e:
print(f" ⚠️ Cannot read {filepath}: {e}", file=sys.stderr)
return []
findings = []
for i, line in enumerate(text.splitlines(), 1):
findings.extend(
scan_line(line, i, compiled, company_pats, person_pats, placeholder_format)
)
return findings
def sanitize_file(
filepath: Path,
compiled: list,
company_pats: list,
person_pats: list,
placeholder_format: str,
) -> list[Finding]:
"""Scan and replace PII in-place. Returns findings for reporting."""
try:
text = filepath.read_text(errors="replace")
except (PermissionError, OSError) as e:
print(f" ⚠️ Cannot read {filepath}: {e}", file=sys.stderr)
return []
findings = []
new_lines = []
for i, line in enumerate(text.splitlines(), 1):
line_findings = scan_line(
line, i, compiled, company_pats, person_pats, placeholder_format
)
findings.extend(line_findings)
if line_findings and not is_import_line(line):
# Replace matches (longest first to avoid partial replacements)
matches = sorted(
[(f.match, f.label) for f in line_findings],
key=lambda x: len(x[0]),
reverse=True,
)
for match_text, label in matches:
placeholder = get_placeholder(label, placeholder_format)
line = line.replace(match_text, placeholder)
new_lines.append(line)
if findings:
filepath.write_text("\n".join(new_lines) + ("\n" if text.endswith("\n") else ""))
return findings
def collect_files(target: Path, recursive: bool, skip_paths: set[str]) -> list[Path]:
"""Collect files with supported extensions."""
if target.is_file():
if target.suffix in SUPPORTED_EXTENSIONS:
return [target]
return []
files = []
if recursive:
for fp in target.rglob("*"):
if fp.is_file() and fp.suffix in SUPPORTED_EXTENSIONS and not should_skip_path(fp, skip_paths):
files.append(fp)
else:
for fp in target.iterdir():
if fp.is_file() and fp.suffix in SUPPORTED_EXTENSIONS and not should_skip_path(fp, skip_paths):
files.append(fp)
return sorted(files)
# ---------------------------------------------------------------------------
# Reporting
# ---------------------------------------------------------------------------
def print_report(
all_findings: dict[str, list[Finding]], mode: str
) -> None:
total = sum(len(f) for f in all_findings.values())
files_affected = len(all_findings)
if total == 0:
print("\n✅ No PII or sensitive data found.")
return
action = "Found" if mode == "scan" else "Sanitized"
print(f"\n{'=' * 60}")
print(f"🔍 {action} {total} issue(s) across {files_affected} file(s)")
print(f"{'=' * 60}")
# Aggregate by type
by_type: dict[str, int] = defaultdict(int)
for filepath, findings in all_findings.items():
for f in findings:
by_type[f.label] += 1
print("\nBy type:")
for label, count in sorted(by_type.items(), key=lambda x: -x[1]):
print(f" {label:20s} {count:>4d}")
print(f"\nBy file:")
for filepath, findings in sorted(all_findings.items()):
print(f"\n 📄 {filepath} ({len(findings)} finding(s))")
for f in findings:
print(f" Line {f.line_num:>4d} [{f.label}]: {f.match}")
if mode == "scan":
print(f"\n💡 Run with --sanitize to redact these findings.")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(
description="Scan or sanitize files for PII and sensitive data.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --scan --file config.py
%(prog)s --scan --dir . --recursive
%(prog)s --sanitize --dir src/ --recursive
%(prog)s --scan --dir . --recursive --config security/sanitizer-config.json
""",
)
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("--scan", action="store_true", help="Report findings without modifying files")
mode_group.add_argument("--sanitize", action="store_true", help="Replace PII with safe placeholders")
target_group = parser.add_mutually_exclusive_group(required=True)
target_group.add_argument("--file", type=str, help="Scan a single file")
target_group.add_argument("--dir", type=str, help="Scan a directory")
parser.add_argument("--recursive", "-r", action="store_true", help="Recurse into subdirectories (with --dir)")
parser.add_argument("--config", type=str, help="Path to config JSON (default: security/sanitizer-config.json)")
parser.add_argument("--quiet", "-q", action="store_true", help="Only print summary, not individual findings")
args = parser.parse_args()
# Load config
config_path = Path(args.config) if args.config else None
config = load_config(config_path)
placeholder_format = config.get("placeholder_format", "bracket")
skip_paths = set(config.get("skip_paths", [])) | DEFAULT_SKIP_PATHS
compiled, company_pats, person_pats = compile_patterns(config)
# Determine target
if args.file:
target = Path(args.file)
if not target.exists():
print(f"❌ File not found: {target}", file=sys.stderr)
return 2
else:
target = Path(args.dir)
if not target.exists():
print(f"❌ Directory not found: {target}", file=sys.stderr)
return 2
files = collect_files(target, args.recursive, skip_paths)
if not files:
print("No supported files found.")
return 0
mode = "scan" if args.scan else "sanitize"
process_fn = scan_file if args.scan else sanitize_file
all_findings: dict[str, list[Finding]] = {}
for fp in files:
findings = process_fn(fp, compiled, company_pats, person_pats, placeholder_format)
if findings:
all_findings[str(fp)] = findings
if not args.quiet:
print_report(all_findings, mode)
else:
total = sum(len(f) for f in all_findings.values())
if total > 0:
action = "Found" if mode == "scan" else "Sanitized"
print(f"{action} {total} issue(s) in {len(all_findings)} file(s)")
# Exit code: 0 clean, 1 PII found
return 1 if all_findings else 0
if __name__ == "__main__":
sys.exit(main())

94
telemetry/README.md Normal file
View file

@ -0,0 +1,94 @@
# Telemetry
Opt-in, local-first, privacy-respecting usage telemetry for AI Marketing Skills.
## What's Collected
When you opt in, the following **anonymous** data is sent:
| Field | Example | Purpose |
|-------|---------|---------|
| Skill name | `growth-engine` | Know which skills are used |
| Duration (ms) | `4500` | Track performance |
| Success/fail | `true` | Track reliability |
| Version | `1.0.0` | Know which versions are in use |
| OS | `Darwin` | Platform compatibility |
| Architecture | `arm64` | Platform compatibility |
| Python version | `3.12` | Runtime compatibility |
| Timestamp | `2026-03-31T12:00:00Z` | Usage patterns |
| Device ID | `<random-uuid>` | Deduplicate (not tied to identity) |
## What's NOT Collected — Ever
- ❌ Code content
- ❌ File paths
- ❌ Repository names
- ❌ Usernames or emails
- ❌ Environment variables
- ❌ API keys or secrets
- ❌ Any content you're working on
## How to Opt In or Out
### First run (interactive)
```bash
python3 telemetry/telemetry_init.py
```
You'll be asked to choose. Your choice is saved.
### Non-interactive
```bash
python3 telemetry/telemetry_init.py --yes # Opt in
python3 telemetry/telemetry_init.py --no # Opt out
```
### Change your mind later
Delete the config and re-run:
```bash
rm ~/.ai-marketing-skills/telemetry-config.json
python3 telemetry/telemetry_init.py
```
## Local Data — Always Available
**Regardless of opt-in**, all skill runs are logged locally so you can see your own usage:
```
~/.ai-marketing-skills/analytics/skill-usage.jsonl
```
This data never leaves your machine unless you opt in.
## View Your Stats
```bash
python3 telemetry/telemetry_report.py
```
Shows: total runs, runs per skill, success rates, average durations, most used skill, and more.
### Options
```bash
python3 telemetry/telemetry_report.py --json # Machine-readable JSON
python3 telemetry/telemetry_report.py --skill seo-bot # Filter to one skill
```
## Check for Updates
```bash
python3 telemetry/version_check.py
```
- Compares your local version against the latest GitHub release
- Silent when up to date
- Caches the result for 24 hours to avoid excess API calls
- Never blocks execution if offline
## Privacy Commitment
1. **Opt-in only** — nothing is sent without your explicit consent
2. **Local-first** — your data is always stored locally for your own use
3. **Minimal data** — only what's needed to improve the skills
4. **No PII** — no names, emails, paths, or content
5. **Transparent** — all telemetry code is right here, read it yourself
6. **Revocable** — opt out any time, delete your config file

View file

@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""First-run opt-in prompt for anonymous usage telemetry."""
import argparse
import json
import os
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
CONFIG_DIR = Path.home() / ".ai-marketing-skills"
CONFIG_FILE = CONFIG_DIR / "telemetry-config.json"
def load_config():
"""Load existing telemetry config, or return None if not found."""
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
return None
def save_config(opted_in: bool) -> dict:
"""Save telemetry config and return it."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
config = {
"opted_in": opted_in,
"device_id": str(uuid.uuid4()),
"created": datetime.now(timezone.utc).isoformat(),
}
with open(CONFIG_FILE, "w") as f:
json.dump(config, f, indent=2)
return config
def prompt_user() -> bool:
"""Interactive opt-in prompt. Returns True if user opts in."""
print(
"Would you like to opt into anonymous usage telemetry?\n"
"This helps us improve skills.\n"
"\n"
"Data collected: skill name, duration, success/fail, version, OS.\n"
"No code, file paths, or repo content is ever sent.\n"
)
while True:
answer = input("(y/n): ").strip().lower()
if answer in ("y", "yes"):
return True
if answer in ("n", "no"):
return False
print("Please enter y or n.")
def init_telemetry(yes: bool = False, no: bool = False) -> dict:
"""Initialize telemetry. Returns config dict.
Args:
yes: Non-interactive opt-in.
no: Non-interactive opt-out.
"""
existing = load_config()
if existing is not None:
return existing
if yes:
opted_in = True
elif no:
opted_in = False
else:
opted_in = prompt_user()
config = save_config(opted_in)
status = "enabled" if opted_in else "disabled"
print(f"Telemetry {status}. Config saved to {CONFIG_FILE}")
return config
def main():
parser = argparse.ArgumentParser(description="Initialize telemetry opt-in.")
group = parser.add_mutually_exclusive_group()
group.add_argument("--yes", action="store_true", help="Opt in non-interactively.")
group.add_argument("--no", action="store_true", help="Opt out non-interactively.")
args = parser.parse_args()
config = init_telemetry(yes=args.yes, no=args.no)
print(json.dumps(config, indent=2))
if __name__ == "__main__":
main()

118
telemetry/telemetry_log.py Normal file
View file

@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""Log a skill run event. Called by each skill's preamble.
Usage:
python3 telemetry/telemetry_log.py --skill <name> --duration <ms> --success <true/false> --version <ver>
Always logs locally. If opted in, also sends to analytics endpoint.
Never logs: code content, file paths, repo names, usernames, environment variables.
"""
import argparse
import json
import os
import platform
import sys
import urllib.request
import urllib.error
from datetime import datetime, timezone
from pathlib import Path
CONFIG_DIR = Path.home() / ".ai-marketing-skills"
CONFIG_FILE = CONFIG_DIR / "telemetry-config.json"
ANALYTICS_DIR = CONFIG_DIR / "analytics"
USAGE_LOG = ANALYTICS_DIR / "skill-usage.jsonl"
# Replace with your analytics endpoint
ANALYTICS_ENDPOINT = "https://example.com/api/telemetry" # no-op stub — Replace with your analytics endpoint
def load_config() -> dict:
"""Load telemetry config. Returns empty dict if not found."""
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {}
def python_version() -> str:
"""Return major.minor Python version string."""
return f"{sys.version_info.major}.{sys.version_info.minor}"
def build_entry(skill: str, duration_ms: int, success: bool, version: str, device_id: str) -> dict:
"""Build a log entry. Only safe, anonymous fields."""
return {
"skill": skill,
"duration_ms": duration_ms,
"success": success,
"version": version,
"os": platform.system(),
"arch": platform.machine(),
"python": python_version(),
"timestamp": datetime.now(timezone.utc).isoformat(),
"device_id": device_id,
}
def log_locally(entry: dict):
"""Append entry to local JSONL log."""
ANALYTICS_DIR.mkdir(parents=True, exist_ok=True)
with open(USAGE_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
def send_remote(entry: dict):
"""Send entry to remote analytics endpoint. Fails silently."""
try:
data = json.dumps(entry).encode("utf-8")
req = urllib.request.Request(
ANALYTICS_ENDPOINT,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=5)
except Exception:
# Never block skill execution
pass
def parse_bool(value: str) -> bool:
"""Parse a boolean string."""
return value.lower() in ("true", "1", "yes")
def main():
parser = argparse.ArgumentParser(description="Log a skill run event.")
parser.add_argument("--skill", required=True, help="Skill name.")
parser.add_argument("--duration", required=True, type=int, help="Duration in milliseconds.")
parser.add_argument("--success", required=True, help="true/false")
parser.add_argument("--version", required=True, help="Skill version.")
args = parser.parse_args()
config = load_config()
device_id = config.get("device_id", "unknown")
opted_in = config.get("opted_in", False)
entry = build_entry(
skill=args.skill,
duration_ms=args.duration,
success=parse_bool(args.success),
version=args.version,
device_id=device_id,
)
# Always log locally
log_locally(entry)
# Send remotely only if opted in
if opted_in:
send_remote(entry)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""Local stats viewer for skill usage data.
Usage:
python3 telemetry/telemetry_report.py # Full report
python3 telemetry/telemetry_report.py --json # Machine-readable output
python3 telemetry/telemetry_report.py --skill X # Filter to one skill
"""
import argparse
import json
import sys
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
CONFIG_DIR = Path.home() / ".ai-marketing-skills"
CONFIG_FILE = CONFIG_DIR / "telemetry-config.json"
USAGE_LOG = CONFIG_DIR / "analytics" / "skill-usage.jsonl"
def load_entries(skill_filter: str = None) -> list:
"""Load all log entries, optionally filtered by skill."""
if not USAGE_LOG.exists():
return []
entries = []
with open(USAGE_LOG, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
if skill_filter and entry.get("skill") != skill_filter:
continue
entries.append(entry)
except json.JSONDecodeError:
continue
return entries
def load_config() -> dict:
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {}
def parse_timestamp(ts: str) -> datetime:
"""Parse ISO timestamp string."""
# Handle both formats with and without timezone
try:
return datetime.fromisoformat(ts)
except ValueError:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
def generate_report(entries: list, config: dict) -> dict:
"""Generate stats from entries."""
now = datetime.now(timezone.utc)
seven_days_ago = now - timedelta(days=7)
thirty_days_ago = now - timedelta(days=30)
total = len(entries)
last_7 = 0
last_30 = 0
skill_runs = defaultdict(int)
skill_successes = defaultdict(int)
skill_durations = defaultdict(list)
last_timestamp = None
for e in entries:
skill = e.get("skill", "unknown")
skill_runs[skill] += 1
if e.get("success"):
skill_successes[skill] += 1
duration = e.get("duration_ms")
if duration is not None:
skill_durations[skill].append(duration)
ts_str = e.get("timestamp")
if ts_str:
try:
ts = parse_timestamp(ts_str)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
if ts >= seven_days_ago:
last_7 += 1
if ts >= thirty_days_ago:
last_30 += 1
if last_timestamp is None or ts > last_timestamp:
last_timestamp = ts
except (ValueError, TypeError):
pass
# Per-skill stats
per_skill = {}
for skill, count in sorted(skill_runs.items(), key=lambda x: -x[1]):
avg_dur = None
if skill_durations[skill]:
avg_dur = round(sum(skill_durations[skill]) / len(skill_durations[skill]), 1)
success_rate = round(skill_successes[skill] / count * 100, 1) if count > 0 else 0
per_skill[skill] = {
"runs": count,
"success_rate_pct": success_rate,
"avg_duration_ms": avg_dur,
}
most_used = max(skill_runs, key=skill_runs.get) if skill_runs else None
return {
"total_runs": total,
"last_7_days": last_7,
"last_30_days": last_30,
"most_used_skill": most_used,
"last_run": last_timestamp.isoformat() if last_timestamp else None,
"opted_in": config.get("opted_in", False),
"per_skill": per_skill,
}
def print_report(report: dict):
"""Pretty-print the report."""
print("=" * 50)
print(" AI Marketing Skills — Usage Report")
print("=" * 50)
print()
print(f" Total runs (all time): {report['total_runs']}")
print(f" Last 7 days: {report['last_7_days']}")
print(f" Last 30 days: {report['last_30_days']}")
print(f" Most used skill: {report['most_used_skill'] or 'N/A'}")
print(f" Last run: {report['last_run'] or 'N/A'}")
print(f" Telemetry opt-in: {'Yes' if report['opted_in'] else 'No'}")
print()
if report["per_skill"]:
print(" Per-Skill Breakdown:")
print(" " + "-" * 46)
print(f" {'Skill':<25} {'Runs':>5} {'Success':>8} {'Avg ms':>8}")
print(" " + "-" * 46)
for skill, stats in report["per_skill"].items():
avg = f"{stats['avg_duration_ms']:.0f}" if stats["avg_duration_ms"] is not None else "N/A"
print(f" {skill:<25} {stats['runs']:>5} {stats['success_rate_pct']:>7.1f}% {avg:>8}")
else:
print(" No usage data found.")
print()
def main():
parser = argparse.ArgumentParser(description="View local skill usage stats.")
parser.add_argument("--json", action="store_true", help="Output as JSON.")
parser.add_argument("--skill", help="Filter to a specific skill.")
args = parser.parse_args()
config = load_config()
entries = load_entries(skill_filter=args.skill)
if not entries and not args.json:
print("No usage data found. Run some skills first!")
print(f"Data location: {USAGE_LOG}")
return
report = generate_report(entries, config)
if args.json:
print(json.dumps(report, indent=2))
else:
print_report(report)
if __name__ == "__main__":
main()

135
telemetry/version_check.py Normal file
View file

@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""Check for updates against GitHub releases.
Usage:
python3 telemetry/version_check.py
Silent when up to date. Prints update notice if newer version available.
Caches result for 24 hours. Gracefully handles offline/errors.
"""
import json
import os
import sys
import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
VERSION_FILE = REPO_ROOT / "VERSION"
CACHE_DIR = Path.home() / ".ai-marketing-skills"
CACHE_FILE = CACHE_DIR / "version-cache.json"
GITHUB_API_URL = "https://api.github.com/repos/ericosiu/ai-marketing-skills/releases/latest"
CACHE_TTL_HOURS = 24
def read_local_version() -> str:
"""Read version from local VERSION file."""
try:
return VERSION_FILE.read_text().strip()
except OSError:
return "0.0.0"
def parse_semver(version: str) -> tuple:
"""Parse semver string into comparable tuple. Strips leading 'v'."""
v = version.lstrip("v")
parts = v.split(".")
result = []
for p in parts:
try:
result.append(int(p))
except ValueError:
result.append(0)
while len(result) < 3:
result.append(0)
return tuple(result[:3])
def load_cache() -> dict:
"""Load cached version check result."""
if not CACHE_FILE.exists():
return {}
try:
with open(CACHE_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return {}
def save_cache(latest_version: str):
"""Save version check result to cache."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache = {
"latest_version": latest_version,
"checked_at": datetime.now(timezone.utc).isoformat(),
}
try:
with open(CACHE_FILE, "w") as f:
json.dump(cache, f, indent=2)
except OSError:
pass
def cache_is_fresh() -> bool:
"""Check if cache is less than CACHE_TTL_HOURS old."""
cache = load_cache()
checked_at = cache.get("checked_at")
if not checked_at:
return False
try:
ts = datetime.fromisoformat(checked_at)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return datetime.now(timezone.utc) - ts < timedelta(hours=CACHE_TTL_HOURS)
except (ValueError, TypeError):
return False
def fetch_latest_version() -> str:
"""Fetch latest version from GitHub API. Returns version string or None."""
try:
req = urllib.request.Request(
GITHUB_API_URL,
headers={"Accept": "application/vnd.github.v3+json", "User-Agent": "ai-marketing-skills"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
return data.get("tag_name", "").lstrip("v")
except Exception:
return None
def check_version():
"""Main version check logic."""
local = read_local_version()
# Check cache first
cache = load_cache()
if cache_is_fresh() and cache.get("latest_version"):
latest = cache["latest_version"]
else:
latest = fetch_latest_version()
if latest is None:
# Offline or API error — silently exit
return
save_cache(latest)
local_parsed = parse_semver(local)
latest_parsed = parse_semver(latest)
if latest_parsed > local_parsed:
print(f"🆕 AI Marketing Skills v{latest} available (you have v{local}). Run `git pull` to update.")
def main():
try:
check_version()
except Exception:
# Never block skill execution
pass
if __name__ == "__main__":
main()