docs: add cc-sessions discover + GitHub repo (v1.0.0)
- New subsection "Session Pattern Discovery" in §2.x (Session Management): n-gram mode, --llm mode via claude --print, example output, 20% rule framework - Cross-reference added after the 20% rule callout in §5.1 Skills - examples/scripts/cc-sessions.py synced: 498 → 1225 lines (full discover subcommand) - examples/scripts/README.md: discover examples + curl install + GitHub link - machine-readable/reference.yaml: cc_sessions_github + cc_sessions_discover entries - GitHub repo created: https://github.com/FlorianBruniaux/cc-sessions (v1.0.0 released) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
1728b6de39
commit
13efb5a774
5 changed files with 1029 additions and 9 deletions
|
|
@ -271,9 +271,11 @@ Search across all Claude Code session histories.
|
|||
|
||||
## Session Manager (Advanced)
|
||||
|
||||
Advanced CLI for session search, browse & resume with incremental indexing.
|
||||
Advanced CLI for session search, browse, resume & pattern discovery with incremental indexing.
|
||||
|
||||
**vs session-search.sh**: Faster search (~200ms vs ~400ms), partial ID resume, branch filter, worktree support, incremental JSONL index.
|
||||
**vs session-search.sh**: Faster search (~200ms vs ~400ms), partial ID resume, branch filter, worktree support, incremental JSONL index, and `discover` subcommand for automated config optimization.
|
||||
|
||||
**GitHub**: [FlorianBruniaux/cc-sessions](https://github.com/FlorianBruniaux/cc-sessions)
|
||||
|
||||
```bash
|
||||
# Search in current project
|
||||
|
|
@ -293,11 +295,26 @@ cc-sessions resume 8d472d
|
|||
|
||||
# JSON output for scripting
|
||||
cc-sessions --json search "prisma" | jq -r '.[].id'
|
||||
|
||||
# Discover recurring patterns (n-gram, local, free)
|
||||
cc-sessions --all discover
|
||||
|
||||
# Discover with semantic analysis via claude --print
|
||||
cc-sessions --all discover --llm
|
||||
|
||||
# JSON output for scripting
|
||||
cc-sessions --all discover --json | jq '.[] | select(.category == "skill")'
|
||||
```
|
||||
|
||||
**Installation**: `cp cc-sessions.py ~/bin/cc-sessions && chmod +x ~/bin/cc-sessions`
|
||||
**Install from GitHub**:
|
||||
```bash
|
||||
curl -sL https://raw.githubusercontent.com/FlorianBruniaux/cc-sessions/main/cc-sessions \
|
||||
-o ~/.local/bin/cc-sessions && chmod +x ~/.local/bin/cc-sessions
|
||||
```
|
||||
|
||||
> [Gist source](https://gist.github.com/FlorianBruniaux/992d4d1107592d9e98ca9d89838871c6)
|
||||
**Or copy locally**: `cp cc-sessions.py ~/bin/cc-sessions && chmod +x ~/bin/cc-sessions`
|
||||
|
||||
> [GitHub repo](https://github.com/FlorianBruniaux/cc-sessions) · [Gist](https://gist.github.com/FlorianBruniaux/992d4d1107592d9e98ca9d89838871c6)
|
||||
|
||||
---
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
#!/usr/bin/env python3
|
||||
# Source: https://github.com/FlorianBruniaux/cc-sessions
|
||||
"""
|
||||
cc-sessions — Fast CLI to search, browse & resume Claude Code session history
|
||||
|
||||
|
|
@ -10,6 +11,7 @@ This tool indexes those sessions for fast search and provides a clean CLI interf
|
|||
- Filter by date, branch, or project
|
||||
- View recent sessions
|
||||
- Resume past sessions with partial ID matching
|
||||
- Discover recurring patterns to extract as skills, commands, or CLAUDE.md rules
|
||||
|
||||
FEATURES
|
||||
--------
|
||||
|
|
@ -19,6 +21,7 @@ FEATURES
|
|||
- 🎯 Partial ID matching: 'cc-sessions resume 8d472d' finds full session ID
|
||||
- 🌳 Worktree support: includes git worktree sessions automatically
|
||||
- 📊 JSON output: pipe to jq/fzf for advanced workflows
|
||||
- 🔭 Pattern discovery: analyze sessions to suggest skills/commands/rules
|
||||
- 🐍 Zero dependencies: Python stdlib only (json, argparse, pathlib)
|
||||
|
||||
USAGE
|
||||
|
|
@ -44,6 +47,12 @@ cc-sessions resume 8d472d2c
|
|||
# Force rebuild index
|
||||
cc-sessions reindex
|
||||
|
||||
# Discover patterns (all projects, last 90 days)
|
||||
cc-sessions discover
|
||||
|
||||
# Discover with custom filters
|
||||
cc-sessions --all discover --since 60d --min-count 2 --top 15
|
||||
|
||||
# JSON output for composition
|
||||
cc-sessions --json search "prisma" | jq -r '.[].id'
|
||||
|
||||
|
|
@ -54,6 +63,10 @@ INSTALLATION
|
|||
3. Run: cc-sessions recent 5
|
||||
(First run builds index ~10s for 1500 sessions, then <200ms)
|
||||
|
||||
Or install from GitHub:
|
||||
curl -sL https://raw.githubusercontent.com/FlorianBruniaux/cc-sessions/main/cc-sessions \
|
||||
-o ~/.local/bin/cc-sessions && chmod +x ~/.local/bin/cc-sessions
|
||||
|
||||
INDEX ARCHITECTURE
|
||||
------------------
|
||||
- Location: ~/.claude/sessions-index.jsonl (~360KB for 1300 sessions)
|
||||
|
|
@ -109,19 +122,28 @@ cc-sessions positioning: Unix-style CLI, fast search, powerful filters, no depen
|
|||
AUTHOR
|
||||
------
|
||||
Created for terminal power users who prefer CLI over GUI.
|
||||
GitHub: https://github.com/FlorianBruniaux/cc-sessions
|
||||
Gist: https://gist.github.com/FlorianBruniaux/992d4d1107592d9e98ca9d89838871c6
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from collections import Counter, defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
CLAUDE_DIR = Path.home() / ".claude"
|
||||
INDEX_PATH = CLAUDE_DIR / "sessions-index.jsonl"
|
||||
DISCOVER_CACHE_PATH = CLAUDE_DIR / "discover-cache.jsonl"
|
||||
|
||||
LLM_BATCH_SIZE = 60
|
||||
LLM_DEFAULT_MODEL = '' # empty = use claude CLI default model
|
||||
|
||||
|
||||
def parse_duration(duration_str: str) -> datetime:
|
||||
|
|
@ -204,6 +226,49 @@ def get_first_user_message(filepath: Path) -> Optional[str]:
|
|||
return None
|
||||
|
||||
|
||||
def extract_all_user_messages(filepath: Path) -> List[str]:
|
||||
"""Extract all significant user messages from a session JSONL file."""
|
||||
messages = []
|
||||
try:
|
||||
with open(filepath, 'r') as f:
|
||||
for line in f:
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
|
||||
if entry.get('type') != 'user':
|
||||
continue
|
||||
|
||||
content = entry.get('message', {}).get('content', '')
|
||||
|
||||
if not isinstance(content, str):
|
||||
continue
|
||||
|
||||
if content.startswith('<'):
|
||||
continue
|
||||
|
||||
stripped = content.strip()
|
||||
|
||||
# Skip very short messages (likely acknowledgements)
|
||||
if len(stripped) < 10:
|
||||
continue
|
||||
|
||||
# Skip system-injected messages (compact summaries, reminders)
|
||||
# These are injected as plain-text user messages but aren't real user input
|
||||
if len(stripped) > 800:
|
||||
continue
|
||||
if _is_system_injection(stripped):
|
||||
continue
|
||||
|
||||
messages.append(stripped)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
return messages
|
||||
|
||||
|
||||
def parse_session(filepath: Path) -> Optional[Dict]:
|
||||
"""Extract session metadata."""
|
||||
session_id = filepath.stem
|
||||
|
|
@ -438,6 +503,631 @@ def cmd_reindex():
|
|||
print(f"Indexed {len(index)} sessions", file=sys.stderr)
|
||||
|
||||
|
||||
# ─── DISCOVER subcommand ──────────────────────────────────────────────────────
|
||||
|
||||
# Boilerplate phrases that identify system-injected messages (compact summaries,
|
||||
# system-reminder injections, plan mode prompts, task tool notifications...).
|
||||
# These appear as plain-text user messages in JSONL but are not real user input.
|
||||
_SYSTEM_INJECTION_MARKERS = (
|
||||
'this session is being continued',
|
||||
'read the full transcript',
|
||||
'context summary below covers',
|
||||
'exact snippets error messages content',
|
||||
'exiting plan mode',
|
||||
'task tools haven',
|
||||
'teamcreate tool team parallelize',
|
||||
'florianbruniaux/.claude/projects', # path fragments in compact prompts
|
||||
'florianbruniaux/sites/', # project path fragments
|
||||
'-users-florianbruniaux-', # encoded path in compact messages
|
||||
)
|
||||
|
||||
|
||||
def _is_system_injection(text: str) -> bool:
|
||||
"""Return True if this looks like a Claude Code system message, not real user input."""
|
||||
lower = text.lower()
|
||||
return any(marker in lower for marker in _SYSTEM_INJECTION_MARKERS)
|
||||
|
||||
|
||||
# Stop words to exclude from n-gram analysis
|
||||
_STOP_WORDS = frozenset({
|
||||
'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
|
||||
'of', 'with', 'by', 'from', 'is', 'it', 'its', 'be', 'as', 'was',
|
||||
'are', 'were', 'been', 'have', 'has', 'had', 'do', 'does', 'did',
|
||||
'will', 'would', 'could', 'should', 'may', 'might', 'can', 'shall',
|
||||
'this', 'that', 'these', 'those', 'i', 'you', 'we', 'they', 'he',
|
||||
'she', 'my', 'your', 'our', 'their', 'his', 'her', 'me', 'us', 'them',
|
||||
'so', 'if', 'then', 'than', 'when', 'what', 'how', 'why', 'where',
|
||||
'who', 'which', 'not', 'no', 'also', 'just', 'now', 'up', 'out',
|
||||
'about', 'into', 'after', 'before', 'all', 'any', 'some', 'more',
|
||||
'new', 'add', 'use', 'make', 'get', 'go', 'run', 'see', 'here',
|
||||
'there', 'need', 'want', 'please', 'ok', 'okay', 'yes', 'yeah',
|
||||
'let', 'can', 'help', 'look', 'check', 'same', 'like', 'very',
|
||||
'much', 'only', 'other', 'also', 'each', 'file', 'code', 'create',
|
||||
'update', 'change', 'think', 'know', 'give', 'take', 'put', 'keep',
|
||||
})
|
||||
|
||||
|
||||
def normalize_text(text: str) -> List[str]:
|
||||
"""Lowercase, strip punctuation, tokenize, remove stop words."""
|
||||
text = text.lower()
|
||||
# Replace punctuation/special chars with spaces (keep alphanumeric and hyphens)
|
||||
text = re.sub(r'[^a-z0-9\s\-]', ' ', text)
|
||||
# Collapse whitespace
|
||||
tokens = text.split()
|
||||
# Filter stop words and very short tokens
|
||||
return [t for t in tokens if t not in _STOP_WORDS and len(t) > 2]
|
||||
|
||||
|
||||
def extract_ngrams(tokens: List[str], n: int) -> List[Tuple[str, ...]]:
|
||||
"""Extract n-grams from a token list."""
|
||||
return [tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1)]
|
||||
|
||||
|
||||
def token_overlap(tokens_a: List[str], tokens_b: List[str]) -> float:
|
||||
"""Jaccard similarity between two token sets."""
|
||||
if not tokens_a or not tokens_b:
|
||||
return 0.0
|
||||
set_a, set_b = set(tokens_a), set(tokens_b)
|
||||
return len(set_a & set_b) / len(set_a | set_b)
|
||||
|
||||
|
||||
def load_discover_cache() -> Dict[str, Dict]:
|
||||
"""Load the discover cache (session_id -> {mtime, messages[]})."""
|
||||
if not DISCOVER_CACHE_PATH.exists():
|
||||
return {}
|
||||
cache = {}
|
||||
try:
|
||||
with open(DISCOVER_CACHE_PATH, 'r') as f:
|
||||
for line in f:
|
||||
if not line.strip():
|
||||
continue
|
||||
entry = json.loads(line)
|
||||
cache[entry['id']] = entry
|
||||
except Exception:
|
||||
return {}
|
||||
return cache
|
||||
|
||||
|
||||
def save_discover_cache(cache: Dict[str, Dict]):
|
||||
"""Persist the discover cache to disk."""
|
||||
CLAUDE_DIR.mkdir(exist_ok=True)
|
||||
with open(DISCOVER_CACHE_PATH, 'w') as f:
|
||||
for entry in cache.values():
|
||||
f.write(json.dumps(entry) + '\n')
|
||||
|
||||
|
||||
def collect_sessions_data(
|
||||
project_dirs: List[Path],
|
||||
since_dt: Optional[datetime],
|
||||
) -> List[Dict]:
|
||||
"""
|
||||
Collect {session_id, project, mtime, messages[]} for all sessions.
|
||||
|
||||
Uses mtime-based cache to avoid re-reading unchanged files.
|
||||
Returns one dict per session that has at least one user message.
|
||||
"""
|
||||
cache = load_discover_cache()
|
||||
updated_cache = {}
|
||||
sessions_data = []
|
||||
|
||||
for project_dir in project_dirs:
|
||||
project_name = project_dir.name
|
||||
jsonl_files = list(project_dir.glob("*.jsonl"))
|
||||
|
||||
for filepath in jsonl_files:
|
||||
session_id = filepath.stem
|
||||
|
||||
# Skip subagent sessions
|
||||
if session_id.startswith('agent-'):
|
||||
continue
|
||||
|
||||
try:
|
||||
file_mtime = filepath.stat().st_mtime
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
# Apply date filter: skip if file is older than since_dt
|
||||
if since_dt:
|
||||
file_dt = datetime.fromtimestamp(file_mtime)
|
||||
if file_dt < since_dt:
|
||||
continue
|
||||
|
||||
# Cache hit: file unchanged since last analysis
|
||||
if session_id in cache and cache[session_id].get('mtime', 0) >= file_mtime:
|
||||
entry = cache[session_id]
|
||||
if entry.get('messages'):
|
||||
sessions_data.append({
|
||||
'session_id': session_id,
|
||||
'project': project_name,
|
||||
'mtime': file_mtime,
|
||||
'messages': entry['messages'],
|
||||
})
|
||||
updated_cache[session_id] = entry
|
||||
continue
|
||||
|
||||
# Cache miss: parse file
|
||||
messages = extract_all_user_messages(filepath)
|
||||
cache_entry = {
|
||||
'id': session_id,
|
||||
'mtime': file_mtime,
|
||||
'messages': messages,
|
||||
}
|
||||
updated_cache[session_id] = cache_entry
|
||||
|
||||
if messages:
|
||||
sessions_data.append({
|
||||
'session_id': session_id,
|
||||
'project': project_name,
|
||||
'mtime': file_mtime,
|
||||
'messages': messages,
|
||||
})
|
||||
|
||||
save_discover_cache(updated_cache)
|
||||
return sessions_data
|
||||
|
||||
|
||||
def discover_patterns(
|
||||
sessions_data: List[Dict],
|
||||
min_count: int = 3,
|
||||
top: int = 20,
|
||||
) -> List[Dict]:
|
||||
"""
|
||||
Analyze sessions data and return pattern suggestions.
|
||||
|
||||
Each suggestion has:
|
||||
- pattern: human-readable phrase
|
||||
- count: number of occurrences
|
||||
- session_count: number of distinct sessions
|
||||
- project_count: number of distinct projects
|
||||
- cross_project: bool
|
||||
- category: 'CLAUDE.md rule' | 'skill' | 'command'
|
||||
- score: float (frequency × cross-project bonus)
|
||||
- example_sessions: list of up to 2 session_ids
|
||||
"""
|
||||
total_sessions = len(sessions_data)
|
||||
if total_sessions == 0:
|
||||
return []
|
||||
|
||||
# ── Step 1: Build per-session token lists and n-gram index ────────────────
|
||||
# ngram_index: ngram_tuple -> list of {session_id, project, original_message}
|
||||
ngram_index: Dict[Tuple, List[Dict]] = defaultdict(list)
|
||||
|
||||
for sd in sessions_data:
|
||||
session_id = sd['session_id']
|
||||
project = sd['project']
|
||||
for msg in sd['messages']:
|
||||
tokens = normalize_text(msg)
|
||||
if len(tokens) < 3:
|
||||
continue
|
||||
for n in range(3, 7): # 3-6 word n-grams
|
||||
for ngram in extract_ngrams(tokens, n):
|
||||
# Filter n-grams that are all stop words (shouldn't happen after normalize)
|
||||
if all(t in _STOP_WORDS for t in ngram):
|
||||
continue
|
||||
ngram_index[ngram].append({
|
||||
'session_id': session_id,
|
||||
'project': project,
|
||||
'msg': msg[:80],
|
||||
})
|
||||
|
||||
# ── Step 2: Filter n-grams below min_count ────────────────────────────────
|
||||
frequent_ngrams = {
|
||||
ng: occurrences
|
||||
for ng, occurrences in ngram_index.items()
|
||||
if len(occurrences) >= min_count
|
||||
}
|
||||
|
||||
# ── Step 3: Deduplicate — prefer longer n-gram if it subsumes shorter ─────
|
||||
# Sort by length desc, then count desc
|
||||
sorted_ngrams = sorted(
|
||||
frequent_ngrams.items(),
|
||||
key=lambda x: (len(x[0]), len(x[1])),
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
kept_ngrams: List[Tuple[Tuple, List[Dict]]] = []
|
||||
subsumed: set = set()
|
||||
|
||||
for ngram, occurrences in sorted_ngrams:
|
||||
if ngram in subsumed:
|
||||
continue
|
||||
kept_ngrams.append((ngram, occurrences))
|
||||
# Mark all sub-ngrams of this ngram as subsumed
|
||||
for n in range(3, len(ngram)):
|
||||
for i in range(len(ngram) - n + 1):
|
||||
sub = ngram[i:i+n]
|
||||
subsumed.add(sub)
|
||||
|
||||
# ── Step 4: Similarity clustering — merge near-duplicate phrases ──────────
|
||||
# Group kept_ngrams by token overlap > 60%
|
||||
clusters: List[List[int]] = []
|
||||
assigned = set()
|
||||
|
||||
for i, (ng_i, _) in enumerate(kept_ngrams):
|
||||
if i in assigned:
|
||||
continue
|
||||
cluster = [i]
|
||||
for j, (ng_j, _) in enumerate(kept_ngrams):
|
||||
if j <= i or j in assigned:
|
||||
continue
|
||||
overlap = token_overlap(list(ng_i), list(ng_j))
|
||||
if overlap > 0.6:
|
||||
cluster.append(j)
|
||||
assigned.add(j)
|
||||
clusters.append(cluster)
|
||||
assigned.add(i)
|
||||
|
||||
# ── Step 5: Build suggestion per cluster (representative = highest count) ──
|
||||
suggestions = []
|
||||
for cluster in clusters:
|
||||
# Pick representative: longest ngram with most occurrences
|
||||
best_idx = max(cluster, key=lambda i: (len(kept_ngrams[i][0]), len(kept_ngrams[i][1])))
|
||||
ngram, occurrences = kept_ngrams[best_idx]
|
||||
|
||||
# Aggregate across cluster members
|
||||
all_occurrences = []
|
||||
for idx in cluster:
|
||||
all_occurrences.extend(kept_ngrams[idx][1])
|
||||
|
||||
distinct_sessions = list({o['session_id'] for o in all_occurrences})
|
||||
distinct_projects = list({o['project'] for o in all_occurrences})
|
||||
count = len(all_occurrences)
|
||||
session_count = len(distinct_sessions)
|
||||
project_count = len(distinct_projects)
|
||||
cross_project = project_count >= 2
|
||||
|
||||
if session_count < min_count:
|
||||
continue
|
||||
|
||||
session_pct = session_count / total_sessions
|
||||
|
||||
# Categorize
|
||||
if session_pct > 0.20:
|
||||
category = 'CLAUDE.md rule'
|
||||
elif session_pct >= 0.05:
|
||||
category = 'skill'
|
||||
else:
|
||||
category = 'command'
|
||||
|
||||
# Score: base = session_pct, bonus × 1.5 if cross-project
|
||||
score = session_pct * (1.5 if cross_project else 1.0)
|
||||
|
||||
phrase = ' '.join(ngram)
|
||||
|
||||
suggestions.append({
|
||||
'pattern': phrase,
|
||||
'count': count,
|
||||
'session_count': session_count,
|
||||
'project_count': project_count,
|
||||
'cross_project': cross_project,
|
||||
'category': category,
|
||||
'score': round(score, 4),
|
||||
'example_sessions': distinct_sessions[:2],
|
||||
})
|
||||
|
||||
# ── Step 6: Sort by score desc, truncate ──────────────────────────────────
|
||||
suggestions.sort(key=lambda x: x['score'], reverse=True)
|
||||
return suggestions[:top]
|
||||
|
||||
|
||||
def cmd_discover(
|
||||
project_dirs: List[Path],
|
||||
since: str = '90d',
|
||||
min_count: int = 3,
|
||||
top: int = 20,
|
||||
json_output: bool = False,
|
||||
):
|
||||
"""Analyze sessions and surface patterns worth extracting as skills/commands/rules."""
|
||||
since_dt = parse_duration(since)
|
||||
|
||||
print(f"Scanning sessions since {since_dt.strftime('%Y-%m-%d')}...", file=sys.stderr)
|
||||
|
||||
sessions_data = collect_sessions_data(project_dirs, since_dt)
|
||||
|
||||
if not sessions_data:
|
||||
print("No sessions found in the given time range.", file=sys.stderr)
|
||||
return
|
||||
|
||||
print(f"Analyzing {len(sessions_data)} sessions across "
|
||||
f"{len({sd['project'] for sd in sessions_data})} project(s)...", file=sys.stderr)
|
||||
|
||||
suggestions = discover_patterns(sessions_data, min_count=min_count, top=top)
|
||||
|
||||
if not suggestions:
|
||||
print("No recurring patterns found (try --min-count 2 or --since 180d).", file=sys.stderr)
|
||||
return
|
||||
|
||||
if json_output:
|
||||
print(json.dumps(suggestions, indent=2))
|
||||
return
|
||||
|
||||
# ── Human-readable output ─────────────────────────────────────────────────
|
||||
by_category: Dict[str, List[Dict]] = defaultdict(list)
|
||||
for s in suggestions:
|
||||
by_category[s['category']].append(s)
|
||||
|
||||
category_order = ['CLAUDE.md rule', 'skill', 'command']
|
||||
category_icons = {
|
||||
'CLAUDE.md rule': '📋',
|
||||
'skill': '🧩',
|
||||
'command': '⚡',
|
||||
}
|
||||
|
||||
total_sessions = len(sessions_data)
|
||||
total_projects = len({sd['project'] for sd in sessions_data})
|
||||
|
||||
print()
|
||||
print(f" cc-sessions discover — {total_sessions} sessions · {total_projects} project(s) · since {since}")
|
||||
print()
|
||||
|
||||
for cat in category_order:
|
||||
items = by_category.get(cat, [])
|
||||
if not items:
|
||||
continue
|
||||
|
||||
icon = category_icons[cat]
|
||||
print(f" {icon} {cat.upper()}")
|
||||
print(f" {'─' * 60}")
|
||||
|
||||
for item in items:
|
||||
tag = ' [cross-project]' if item['cross_project'] else ''
|
||||
pct = item['session_count'] / total_sessions * 100
|
||||
print(
|
||||
f" {item['pattern']}"
|
||||
f"{tag}"
|
||||
)
|
||||
print(
|
||||
f" {item['session_count']} sessions ({pct:.0f}%) · "
|
||||
f"{item['count']} occurrences · "
|
||||
f"score {item['score']:.3f}"
|
||||
)
|
||||
for ex in item['example_sessions']:
|
||||
print(f" → {ex[:36]}")
|
||||
print()
|
||||
|
||||
print()
|
||||
|
||||
print(f" Run with --json to pipe to jq for further processing.")
|
||||
print()
|
||||
|
||||
|
||||
# ─── DISCOVER --llm subcommand ────────────────────────────────────────────────
|
||||
|
||||
def deduplicate_messages_for_llm(sessions_data: List[Dict], max_messages: int = 300) -> List[Dict]:
|
||||
"""
|
||||
Deduplicate semantically similar messages using Jaccard similarity.
|
||||
Returns list of {text, count, projects} sorted by frequency desc.
|
||||
"""
|
||||
all_msgs = []
|
||||
for sd in sessions_data:
|
||||
for msg in sd.get('messages', []):
|
||||
all_msgs.append({
|
||||
'text': msg[:500],
|
||||
'project': sd['project'],
|
||||
'tokens': normalize_text(msg),
|
||||
})
|
||||
|
||||
if not all_msgs:
|
||||
return []
|
||||
|
||||
clusters: List[List[int]] = []
|
||||
assigned = set()
|
||||
|
||||
for i, m in enumerate(all_msgs):
|
||||
if i in assigned:
|
||||
continue
|
||||
cluster = [i]
|
||||
# Limit comparison window for performance on large sets
|
||||
window_end = min(i + 300, len(all_msgs))
|
||||
for j in range(i + 1, window_end):
|
||||
if j in assigned:
|
||||
continue
|
||||
if token_overlap(m['tokens'], all_msgs[j]['tokens']) > 0.65:
|
||||
cluster.append(j)
|
||||
assigned.add(j)
|
||||
clusters.append(cluster)
|
||||
assigned.add(i)
|
||||
|
||||
deduped = []
|
||||
for cluster in clusters:
|
||||
representative = all_msgs[cluster[0]]
|
||||
projects = list({all_msgs[i]['project'] for i in cluster})
|
||||
deduped.append({
|
||||
'text': representative['text'],
|
||||
'count': len(cluster),
|
||||
'projects': projects,
|
||||
})
|
||||
|
||||
deduped.sort(key=lambda x: x['count'], reverse=True)
|
||||
return deduped[:max_messages]
|
||||
|
||||
|
||||
def build_analysis_prompt(messages: List[Dict]) -> str:
|
||||
lines = []
|
||||
for i, m in enumerate(messages, 1):
|
||||
count_info = f" (x{m['count']})" if m['count'] > 1 else ""
|
||||
cross = " [multi-project]" if len(m['projects']) > 1 else ""
|
||||
text = m['text'][:200].replace('\n', ' ')
|
||||
lines.append(f"{i}. {text}{count_info}{cross}")
|
||||
|
||||
messages_block = '\n'.join(lines)
|
||||
|
||||
return f"""You are analyzing a developer's Claude Code session history to find recurring patterns worth extracting as reusable configurations.
|
||||
|
||||
Below are user messages (deduplicated). Numbers in parentheses show how many times a similar message appeared. [multi-project] means it appeared across different codebases.
|
||||
|
||||
MESSAGES:
|
||||
{messages_block}
|
||||
|
||||
Identify recurring patterns and suggest what to extract. For each suggestion, choose the category:
|
||||
- CLAUDE.md rule: a behavioral instruction that should always be active (broad constraint or guideline)
|
||||
- skill: specialized expertise loaded on-demand (domain-specific, not always needed)
|
||||
- command: a repeatable step-by-step workflow with clear inputs/outputs
|
||||
|
||||
Return ONLY a JSON array, no prose outside it:
|
||||
[
|
||||
{{
|
||||
"pattern": "short description of the recurring intent (max 60 chars)",
|
||||
"category": "CLAUDE.md rule",
|
||||
"suggested_name": "kebab-case-name",
|
||||
"rationale": "one sentence explaining why this should be extracted",
|
||||
"frequency": "high",
|
||||
"example_messages": ["example 1", "example 2"],
|
||||
"suggested_content": "what the skill/command/rule would contain (2-3 sentences)"
|
||||
}}
|
||||
]
|
||||
|
||||
Rules:
|
||||
- Only include genuinely recurring patterns (at least 2 messages with similar intent)
|
||||
- Prefer specific, actionable suggestions over generic ones
|
||||
- Maximum 15 suggestions, sorted by impact (most valuable first)"""
|
||||
|
||||
|
||||
def call_claude_cli(messages_batch: List[Dict], model: str) -> List[Dict]:
|
||||
"""
|
||||
Call the local `claude --print` CLI (uses your existing subscription).
|
||||
No API key required.
|
||||
"""
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
prompt = build_analysis_prompt(messages_batch)
|
||||
|
||||
# claude --print accepts the prompt as a positional argument
|
||||
cmd = ['claude', '--print', prompt]
|
||||
if model:
|
||||
cmd += ['--model', model]
|
||||
|
||||
# Remove CLAUDECODE so the subprocess isn't blocked by nested-session detection
|
||||
env = os.environ.copy()
|
||||
env.pop('CLAUDECODE', None)
|
||||
env.pop('CLAUDE_CODE_ENTRYPOINT', None)
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
env=env,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
raise RuntimeError("'claude' CLI not found. Make sure Claude Code is installed and in PATH.")
|
||||
except subprocess.TimeoutExpired:
|
||||
raise RuntimeError("claude CLI timed out after 120s.")
|
||||
|
||||
if result.returncode != 0:
|
||||
detail = (result.stderr or result.stdout)[:500]
|
||||
raise RuntimeError(f"claude CLI error (exit {result.returncode}):\n{detail}")
|
||||
|
||||
text = result.stdout.strip()
|
||||
|
||||
# Catch runtime errors reported on stdout (returncode 0 but failed)
|
||||
if text.lower().startswith('execution error') or text.lower().startswith('error:'):
|
||||
stderr_hint = f"\nstderr: {result.stderr[:300]}" if result.stderr.strip() else ""
|
||||
raise RuntimeError(f"claude CLI reported an error:\n{text[:300]}{stderr_hint}")
|
||||
|
||||
# Strip markdown code fences if present
|
||||
if text.startswith('```'):
|
||||
text = re.sub(r'^```(?:json)?\n?', '', text)
|
||||
text = re.sub(r'\n?```$', '', text)
|
||||
|
||||
# Extract JSON array if surrounded by prose
|
||||
match = re.search(r'\[.*\]', text, re.DOTALL)
|
||||
if match:
|
||||
text = match.group(0)
|
||||
|
||||
try:
|
||||
suggestions = json.loads(text)
|
||||
except json.JSONDecodeError as e:
|
||||
raise RuntimeError(f"Failed to parse CLI response as JSON: {e}\nRaw: {text[:500]}") from e
|
||||
|
||||
if not isinstance(suggestions, list):
|
||||
raise RuntimeError(f"Expected JSON array, got: {type(suggestions)}")
|
||||
|
||||
return suggestions
|
||||
|
||||
|
||||
def cmd_discover_llm(
|
||||
project_dirs: List[Path],
|
||||
since: str = '90d',
|
||||
top: int = 15,
|
||||
model: str = LLM_DEFAULT_MODEL,
|
||||
json_output: bool = False,
|
||||
):
|
||||
"""LLM-powered pattern discovery via `claude --print` (uses your subscription)."""
|
||||
since_dt = parse_duration(since)
|
||||
print(f"Scanning sessions since {since_dt.strftime('%Y-%m-%d')}...", file=sys.stderr)
|
||||
|
||||
sessions_data = collect_sessions_data(project_dirs, since_dt)
|
||||
if not sessions_data:
|
||||
print("No sessions found in the given time range.", file=sys.stderr)
|
||||
return
|
||||
|
||||
print(f"Collected {len(sessions_data)} sessions — deduplicating messages...", file=sys.stderr)
|
||||
deduped = deduplicate_messages_for_llm(sessions_data, max_messages=300)
|
||||
if not deduped:
|
||||
print("No user messages found.", file=sys.stderr)
|
||||
return
|
||||
|
||||
batch = deduped[:LLM_BATCH_SIZE]
|
||||
print(f"Sending {len(batch)} unique messages to claude --print ({model})...", file=sys.stderr)
|
||||
|
||||
try:
|
||||
suggestions = call_claude_cli(batch, model)
|
||||
except RuntimeError as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
suggestions = suggestions[:top]
|
||||
|
||||
if json_output:
|
||||
print(json.dumps(suggestions, indent=2))
|
||||
return
|
||||
|
||||
total_sessions = len(sessions_data)
|
||||
total_projects = len({sd['project'] for sd in sessions_data})
|
||||
|
||||
print()
|
||||
print(f" cc-sessions discover --llm — {total_sessions} sessions · {total_projects} project(s) · {model}")
|
||||
print()
|
||||
|
||||
by_category: Dict[str, List[Dict]] = defaultdict(list)
|
||||
for s in suggestions:
|
||||
by_category[s.get('category', 'command')].append(s)
|
||||
|
||||
category_order = ['CLAUDE.md rule', 'skill', 'command']
|
||||
category_icons = {'CLAUDE.md rule': '📋', 'skill': '🧩', 'command': '⚡'}
|
||||
|
||||
for cat in category_order:
|
||||
items = by_category.get(cat, [])
|
||||
if not items:
|
||||
continue
|
||||
|
||||
icon = category_icons.get(cat, '•')
|
||||
print(f" {icon} {cat.upper()}")
|
||||
print(f" {'─' * 70}")
|
||||
|
||||
for item in items:
|
||||
freq = item.get('frequency', '')
|
||||
freq_tag = f" [{freq}]" if freq else ""
|
||||
print(f" {item.get('pattern', '?')}{freq_tag}")
|
||||
print(f" -> /{item.get('suggested_name', '?')}")
|
||||
print(f" {item.get('rationale', '')}")
|
||||
if item.get('suggested_content'):
|
||||
print(f" Content: {item['suggested_content']}")
|
||||
for ex in item.get('example_messages', [])[:2]:
|
||||
print(f" e.g. \"{ex[:100].replace(chr(10), ' ')}\"")
|
||||
print()
|
||||
|
||||
print()
|
||||
|
||||
print(f" Run with --json to pipe to jq.")
|
||||
print()
|
||||
|
||||
|
||||
# ─── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Search Claude Code session history")
|
||||
parser.add_argument('--all', action='store_true', help="Search all projects")
|
||||
|
|
@ -467,10 +1157,36 @@ def main():
|
|||
# reindex
|
||||
subparsers.add_parser('reindex', help="Force rebuild index")
|
||||
|
||||
# discover
|
||||
discover_parser = subparsers.add_parser(
|
||||
'discover',
|
||||
help="Analyze sessions to suggest skills, commands, and CLAUDE.md rules",
|
||||
)
|
||||
discover_parser.add_argument(
|
||||
'--since', default='90d',
|
||||
help="Time window to analyze (default: 90d)",
|
||||
)
|
||||
discover_parser.add_argument(
|
||||
'--min-count', type=int, default=3,
|
||||
help="Minimum occurrences to surface a pattern (default: 3)",
|
||||
)
|
||||
discover_parser.add_argument(
|
||||
'--top', type=int, default=20,
|
||||
help="Maximum number of suggestions to show (default: 20)",
|
||||
)
|
||||
discover_parser.add_argument(
|
||||
'--llm', action='store_true',
|
||||
help="Use 'claude --print' for semantic analysis (uses your subscription)",
|
||||
)
|
||||
discover_parser.add_argument(
|
||||
'--model', default=LLM_DEFAULT_MODEL,
|
||||
help="Claude model to use with --llm, e.g. 'haiku' or 'sonnet' (default: CLI default)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Get project dirs
|
||||
if args.command in ['search', 'recent']:
|
||||
if args.command in ['search', 'recent', 'discover']:
|
||||
project_dirs = get_project_dirs(args.all)
|
||||
|
||||
if not project_dirs:
|
||||
|
|
@ -492,6 +1208,23 @@ def main():
|
|||
cmd_resume(args.session_id)
|
||||
elif args.command == 'reindex':
|
||||
cmd_reindex()
|
||||
elif args.command == 'discover':
|
||||
if args.llm:
|
||||
cmd_discover_llm(
|
||||
project_dirs,
|
||||
since=args.since,
|
||||
top=args.top,
|
||||
model=args.model,
|
||||
json_output=args.json,
|
||||
)
|
||||
else:
|
||||
cmd_discover(
|
||||
project_dirs,
|
||||
since=args.since,
|
||||
min_count=args.min_count,
|
||||
top=args.top,
|
||||
json_output=args.json,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue