claude-code-ultimate-guide/examples/scripts/cc-sessions.py
Florian BRUNIAUX 13efb5a774 docs: add cc-sessions discover + GitHub repo (v1.0.0)
- New subsection "Session Pattern Discovery" in §2.x (Session Management):
  n-gram mode, --llm mode via claude --print, example output, 20% rule framework
- Cross-reference added after the 20% rule callout in §5.1 Skills
- examples/scripts/cc-sessions.py synced: 498 → 1225 lines (full discover subcommand)
- examples/scripts/README.md: discover examples + curl install + GitHub link
- machine-readable/reference.yaml: cc_sessions_github + cc_sessions_discover entries
- GitHub repo created: https://github.com/FlorianBruniaux/cc-sessions (v1.0.0 released)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 15:28:31 +01:00

1231 lines
41 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# Source: https://github.com/FlorianBruniaux/cc-sessions
"""
cc-sessions — Fast CLI to search, browse & resume Claude Code session history
OVERVIEW
--------
Claude Code stores all conversation history locally in ~/.claude/projects/ as JSONL files.
This tool indexes those sessions for fast search and provides a clean CLI interface to:
- Search by keyword across all conversations
- Filter by date, branch, or project
- View recent sessions
- Resume past sessions with partial ID matching
- Discover recurring patterns to extract as skills, commands, or CLAUDE.md rules
FEATURES
--------
- ⚡ Incremental indexing: ~200ms search on 1300+ sessions (vs ~5s full scan)
- 📁 Project-scoped by default: auto-detects current project from pwd
- 🔍 Powerful filters: --since 7d, --branch develop, --limit N
- 🎯 Partial ID matching: 'cc-sessions resume 8d472d' finds full session ID
- 🌳 Worktree support: includes git worktree sessions automatically
- 📊 JSON output: pipe to jq/fzf for advanced workflows
- 🔭 Pattern discovery: analyze sessions to suggest skills/commands/rules
- 🐍 Zero dependencies: Python stdlib only (json, argparse, pathlib)
USAGE
-----
# Search in current project
cc-sessions search "notion"
# Search all projects
cc-sessions --all search "stripe"
# Filter by date and branch
cc-sessions search "auth" --since 7d --branch develop --limit 5
# Recent sessions
cc-sessions recent 10
# Session details (partial ID match)
cc-sessions info 8d472d2c
# Resume session (execs: claude --resume <full-id>)
cc-sessions resume 8d472d2c
# Force rebuild index
cc-sessions reindex
# Discover patterns (all projects, last 90 days)
cc-sessions discover
# Discover with custom filters
cc-sessions --all discover --since 60d --min-count 2 --top 15
# JSON output for composition
cc-sessions --json search "prisma" | jq -r '.[].id'
INSTALLATION
------------
1. Save this script to ~/bin/cc-sessions
2. chmod +x ~/bin/cc-sessions
3. Run: cc-sessions recent 5
(First run builds index ~10s for 1500 sessions, then <200ms)
Or install from GitHub:
curl -sL https://raw.githubusercontent.com/FlorianBruniaux/cc-sessions/main/cc-sessions \
-o ~/.local/bin/cc-sessions && chmod +x ~/.local/bin/cc-sessions
INDEX ARCHITECTURE
------------------
- Location: ~/.claude/sessions-index.jsonl (~360KB for 1300 sessions)
- Format: One JSON object per line with session metadata
- Update strategy: Incremental (only re-parses modified files)
- Rebuild: Automatic on search/recent, manual with 'reindex'
Session metadata extracted:
- id: Full session UUID
- project: Encoded project path
- branch: Git branch from JSONL gitBranch field
- context: First significant user message (60 chars)
- timestamp: ISO 8601 datetime
- mtime: File modification time (for incremental updates)
FILTERING RULES
---------------
Significant user message = all 3 conditions:
1. entry['type'] == 'user'
2. content is string (not array = tool_result)
3. content doesn't start with '<' (excludes XML internal tags)
This covers all current and future Claude Code internal messages:
- <command-name>, <command-message>, <local-command-stdout>
- <bash-input>, <bash-stdout>, <task-notification>
- Any future XML-formatted system messages
Subagent sessions (prefix 'agent-') are excluded by default.
PERFORMANCE
-----------
- First run (build index): ~10s for 1500 sessions
- Subsequent searches: ~200ms (reads index)
- Incremental rebuild: <1s if no changes
- Index size: ~280 bytes per session
OUTPUT FORMAT
-------------
2026-02-05 17:15 8d472d2c-128b-4d9b-824d-3944e3409984 develop "Migration Support Slack → Notion..."
│ │ │ │
Date/Time Full Session ID (for --resume) Branch First user message (60 chars)
ECOSYSTEM
---------
Similar tools:
- claude-history (Rust): Fuzzy search with fzf
- cclog (Go): JSONL → HTML/Markdown + TUI
- claude-code-history-viewer (Tauri): Desktop GUI
- fast-resume (Rust): Tantivy index + TUI
cc-sessions positioning: Unix-style CLI, fast search, powerful filters, no dependencies.
AUTHOR
------
Created for terminal power users who prefer CLI over GUI.
GitHub: https://github.com/FlorianBruniaux/cc-sessions
Gist: https://gist.github.com/FlorianBruniaux/992d4d1107592d9e98ca9d89838871c6
"""
import argparse
import json
import os
import re
import sys
import urllib.error
import urllib.request
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
CLAUDE_DIR = Path.home() / ".claude"
INDEX_PATH = CLAUDE_DIR / "sessions-index.jsonl"
DISCOVER_CACHE_PATH = CLAUDE_DIR / "discover-cache.jsonl"
LLM_BATCH_SIZE = 60
LLM_DEFAULT_MODEL = '' # empty = use claude CLI default model
def parse_duration(duration_str: str) -> datetime:
"""Parse duration string like '7d', '30d' or ISO date."""
if duration_str.endswith('d'):
days = int(duration_str[:-1])
return datetime.now() - timedelta(days=days)
return datetime.fromisoformat(duration_str)
def encode_project_path(path: Path) -> str:
"""Encode project path to match Claude's format."""
return str(path).replace('/', '-') # Keep leading - from root /
def detect_project() -> Optional[str]:
"""Detect current project from pwd."""
pwd = Path.cwd()
encoded = encode_project_path(pwd)
project_dir = CLAUDE_DIR / "projects" / encoded
if not project_dir.exists():
return None
return encoded
def get_project_dirs(all_projects: bool = False) -> List[Path]:
"""Get project directories to scan."""
if all_projects:
projects_dir = CLAUDE_DIR / "projects"
if not projects_dir.exists():
return []
return [d for d in projects_dir.iterdir() if d.is_dir()]
current = detect_project()
if not current:
return []
dirs = []
base = CLAUDE_DIR / "projects" / current
dirs.append(base)
# Include worktrees (glob pattern: --worktrees*)
worktrees = list(base.parent.glob(f"{current}--worktrees*"))
dirs.extend(worktrees)
return dirs
def get_first_user_message(filepath: Path) -> Optional[str]:
"""Extract first significant user message from session JSONL."""
try:
with open(filepath, 'r') as f:
for line in f:
if not line.strip():
continue
try:
entry = json.loads(line)
# Rule 1: Must be user message
if entry.get('type') != 'user':
continue
content = entry.get('message', {}).get('content', '')
# Rule 2: Must be string (not array = tool_result)
if not isinstance(content, str):
continue
# Rule 3: Not internal XML message
if content.startswith('<'):
continue
# Found significant user message
return content[:60].replace('\n', ' ')
except json.JSONDecodeError:
continue
except Exception:
pass
return None
def extract_all_user_messages(filepath: Path) -> List[str]:
"""Extract all significant user messages from a session JSONL file."""
messages = []
try:
with open(filepath, 'r') as f:
for line in f:
if not line.strip():
continue
try:
entry = json.loads(line)
if entry.get('type') != 'user':
continue
content = entry.get('message', {}).get('content', '')
if not isinstance(content, str):
continue
if content.startswith('<'):
continue
stripped = content.strip()
# Skip very short messages (likely acknowledgements)
if len(stripped) < 10:
continue
# Skip system-injected messages (compact summaries, reminders)
# These are injected as plain-text user messages but aren't real user input
if len(stripped) > 800:
continue
if _is_system_injection(stripped):
continue
messages.append(stripped)
except json.JSONDecodeError:
continue
except Exception:
pass
return messages
def parse_session(filepath: Path) -> Optional[Dict]:
"""Extract session metadata."""
session_id = filepath.stem
# Skip subagent sessions
if session_id.startswith('agent-'):
return None
mtime = filepath.stat().st_mtime
context = get_first_user_message(filepath)
if not context:
return None
# Extract branch from gitBranch field in JSONL
branch = "unknown"
try:
with open(filepath, 'r') as f:
for line in f:
if not line.strip():
continue
try:
entry = json.loads(line)
git_branch = entry.get('gitBranch')
if git_branch:
branch = git_branch
break
except json.JSONDecodeError:
continue
except Exception:
pass
project = filepath.parent.name
return {
"id": session_id,
"project": project,
"mtime": mtime,
"branch": branch,
"context": context,
"timestamp": datetime.fromtimestamp(mtime).isoformat()
}
def build_index(project_dirs: List[Path], existing_index: Dict[str, Dict]) -> Dict[str, Dict]:
"""Build or update index incrementally."""
index = existing_index.copy()
for project_dir in project_dirs:
jsonl_files = list(project_dir.glob("*.jsonl"))
for filepath in jsonl_files:
session_id = filepath.stem
file_mtime = filepath.stat().st_mtime
# Skip if already indexed and not modified
if session_id in index and index[session_id]['mtime'] >= file_mtime:
continue
# Parse session
session = parse_session(filepath)
if session:
index[session_id] = session
return index
def load_index() -> Dict[str, Dict]:
"""Load existing index."""
if not INDEX_PATH.exists():
return {}
index = {}
try:
with open(INDEX_PATH, 'r') as f:
for line in f:
if not line.strip():
continue
entry = json.loads(line)
index[entry['id']] = entry
except Exception as e:
print(f"Warning: Failed to load index: {e}", file=sys.stderr)
return {}
return index
def save_index(index: Dict[str, Dict]):
"""Save index to disk."""
CLAUDE_DIR.mkdir(exist_ok=True)
with open(INDEX_PATH, 'w') as f:
for entry in index.values():
f.write(json.dumps(entry) + '\n')
def cmd_search(keyword: str, project_dirs: List[Path], limit: int = 10,
since: Optional[str] = None, branch: Optional[str] = None,
json_output: bool = False):
"""Search sessions by keyword."""
# Build/update index
existing = load_index()
index = build_index(project_dirs, existing)
save_index(index)
# Filter
matches = []
since_dt = parse_duration(since) if since else None
for entry in index.values():
# Filter by project
if not any(entry['project'] == d.name for d in project_dirs):
continue
# Filter by keyword (case-insensitive in context)
if keyword.lower() not in entry['context'].lower():
continue
# Filter by date
if since_dt:
entry_dt = datetime.fromisoformat(entry['timestamp'])
if entry_dt < since_dt:
continue
# Filter by branch
if branch and entry['branch'] != branch:
continue
matches.append(entry)
# Sort by timestamp desc
matches.sort(key=lambda x: x['timestamp'], reverse=True)
matches = matches[:limit]
# Output
if json_output:
print(json.dumps(matches, indent=2))
else:
for m in matches:
dt = datetime.fromisoformat(m['timestamp'])
print(f"{dt.strftime('%Y-%m-%d %H:%M')} {m['id']} {m['branch']:12} \"{m['context']}\"")
def cmd_recent(project_dirs: List[Path], limit: int = 10, json_output: bool = False):
"""Show recent sessions."""
# Build/update index
existing = load_index()
index = build_index(project_dirs, existing)
save_index(index)
# Filter by project
sessions = [e for e in index.values()
if any(e['project'] == d.name for d in project_dirs)]
# Sort by timestamp desc
sessions.sort(key=lambda x: x['timestamp'], reverse=True)
sessions = sessions[:limit]
# Output
if json_output:
print(json.dumps(sessions, indent=2))
else:
for s in sessions:
dt = datetime.fromisoformat(s['timestamp'])
print(f"{dt.strftime('%Y-%m-%d %H:%M')} {s['id']} {s['branch']:12} \"{s['context']}\"")
def cmd_info(session_id: str):
"""Show session details."""
# Match partial ID
index = load_index()
matches = [s for s in index.values() if s['id'].startswith(session_id)]
if not matches:
print(f"Error: Session not found: {session_id}", file=sys.stderr)
sys.exit(1)
if len(matches) > 1:
print(f"Error: Ambiguous ID, multiple matches:", file=sys.stderr)
for m in matches:
print(f" {m['id']}", file=sys.stderr)
sys.exit(1)
session = matches[0]
dt = datetime.fromisoformat(session['timestamp'])
print(f"Session: {session['id']}")
print(f"Date: {dt.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Project: {session['project']}")
print(f"Branch: {session['branch']}")
print(f"Context: {session['context']}")
def cmd_resume(session_id: str):
"""Resume a session."""
# Match partial ID
index = load_index()
matches = [s for s in index.values() if s['id'].startswith(session_id)]
if not matches:
print(f"Error: Session not found: {session_id}", file=sys.stderr)
sys.exit(1)
if len(matches) > 1:
print(f"Error: Ambiguous ID, multiple matches:", file=sys.stderr)
for m in matches:
print(f" {m['id']}", file=sys.stderr)
sys.exit(1)
full_id = matches[0]['id']
# exec claude --resume
os.execvp('claude', ['claude', '--resume', full_id])
def cmd_reindex():
"""Force rebuild of entire index."""
print("Rebuilding index...", file=sys.stderr)
projects_dir = CLAUDE_DIR / "projects"
if not projects_dir.exists():
print("Error: No projects directory found", file=sys.stderr)
sys.exit(1)
all_dirs = [d for d in projects_dir.iterdir() if d.is_dir()]
index = build_index(all_dirs, {})
save_index(index)
print(f"Indexed {len(index)} sessions", file=sys.stderr)
# ─── DISCOVER subcommand ──────────────────────────────────────────────────────
# Boilerplate phrases that identify system-injected messages (compact summaries,
# system-reminder injections, plan mode prompts, task tool notifications...).
# These appear as plain-text user messages in JSONL but are not real user input.
_SYSTEM_INJECTION_MARKERS = (
'this session is being continued',
'read the full transcript',
'context summary below covers',
'exact snippets error messages content',
'exiting plan mode',
'task tools haven',
'teamcreate tool team parallelize',
'florianbruniaux/.claude/projects', # path fragments in compact prompts
'florianbruniaux/sites/', # project path fragments
'-users-florianbruniaux-', # encoded path in compact messages
)
def _is_system_injection(text: str) -> bool:
"""Return True if this looks like a Claude Code system message, not real user input."""
lower = text.lower()
return any(marker in lower for marker in _SYSTEM_INJECTION_MARKERS)
# Stop words to exclude from n-gram analysis
_STOP_WORDS = frozenset({
'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'of', 'with', 'by', 'from', 'is', 'it', 'its', 'be', 'as', 'was',
'are', 'were', 'been', 'have', 'has', 'had', 'do', 'does', 'did',
'will', 'would', 'could', 'should', 'may', 'might', 'can', 'shall',
'this', 'that', 'these', 'those', 'i', 'you', 'we', 'they', 'he',
'she', 'my', 'your', 'our', 'their', 'his', 'her', 'me', 'us', 'them',
'so', 'if', 'then', 'than', 'when', 'what', 'how', 'why', 'where',
'who', 'which', 'not', 'no', 'also', 'just', 'now', 'up', 'out',
'about', 'into', 'after', 'before', 'all', 'any', 'some', 'more',
'new', 'add', 'use', 'make', 'get', 'go', 'run', 'see', 'here',
'there', 'need', 'want', 'please', 'ok', 'okay', 'yes', 'yeah',
'let', 'can', 'help', 'look', 'check', 'same', 'like', 'very',
'much', 'only', 'other', 'also', 'each', 'file', 'code', 'create',
'update', 'change', 'think', 'know', 'give', 'take', 'put', 'keep',
})
def normalize_text(text: str) -> List[str]:
"""Lowercase, strip punctuation, tokenize, remove stop words."""
text = text.lower()
# Replace punctuation/special chars with spaces (keep alphanumeric and hyphens)
text = re.sub(r'[^a-z0-9\s\-]', ' ', text)
# Collapse whitespace
tokens = text.split()
# Filter stop words and very short tokens
return [t for t in tokens if t not in _STOP_WORDS and len(t) > 2]
def extract_ngrams(tokens: List[str], n: int) -> List[Tuple[str, ...]]:
"""Extract n-grams from a token list."""
return [tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1)]
def token_overlap(tokens_a: List[str], tokens_b: List[str]) -> float:
"""Jaccard similarity between two token sets."""
if not tokens_a or not tokens_b:
return 0.0
set_a, set_b = set(tokens_a), set(tokens_b)
return len(set_a & set_b) / len(set_a | set_b)
def load_discover_cache() -> Dict[str, Dict]:
"""Load the discover cache (session_id -> {mtime, messages[]})."""
if not DISCOVER_CACHE_PATH.exists():
return {}
cache = {}
try:
with open(DISCOVER_CACHE_PATH, 'r') as f:
for line in f:
if not line.strip():
continue
entry = json.loads(line)
cache[entry['id']] = entry
except Exception:
return {}
return cache
def save_discover_cache(cache: Dict[str, Dict]):
"""Persist the discover cache to disk."""
CLAUDE_DIR.mkdir(exist_ok=True)
with open(DISCOVER_CACHE_PATH, 'w') as f:
for entry in cache.values():
f.write(json.dumps(entry) + '\n')
def collect_sessions_data(
project_dirs: List[Path],
since_dt: Optional[datetime],
) -> List[Dict]:
"""
Collect {session_id, project, mtime, messages[]} for all sessions.
Uses mtime-based cache to avoid re-reading unchanged files.
Returns one dict per session that has at least one user message.
"""
cache = load_discover_cache()
updated_cache = {}
sessions_data = []
for project_dir in project_dirs:
project_name = project_dir.name
jsonl_files = list(project_dir.glob("*.jsonl"))
for filepath in jsonl_files:
session_id = filepath.stem
# Skip subagent sessions
if session_id.startswith('agent-'):
continue
try:
file_mtime = filepath.stat().st_mtime
except OSError:
continue
# Apply date filter: skip if file is older than since_dt
if since_dt:
file_dt = datetime.fromtimestamp(file_mtime)
if file_dt < since_dt:
continue
# Cache hit: file unchanged since last analysis
if session_id in cache and cache[session_id].get('mtime', 0) >= file_mtime:
entry = cache[session_id]
if entry.get('messages'):
sessions_data.append({
'session_id': session_id,
'project': project_name,
'mtime': file_mtime,
'messages': entry['messages'],
})
updated_cache[session_id] = entry
continue
# Cache miss: parse file
messages = extract_all_user_messages(filepath)
cache_entry = {
'id': session_id,
'mtime': file_mtime,
'messages': messages,
}
updated_cache[session_id] = cache_entry
if messages:
sessions_data.append({
'session_id': session_id,
'project': project_name,
'mtime': file_mtime,
'messages': messages,
})
save_discover_cache(updated_cache)
return sessions_data
def discover_patterns(
sessions_data: List[Dict],
min_count: int = 3,
top: int = 20,
) -> List[Dict]:
"""
Analyze sessions data and return pattern suggestions.
Each suggestion has:
- pattern: human-readable phrase
- count: number of occurrences
- session_count: number of distinct sessions
- project_count: number of distinct projects
- cross_project: bool
- category: 'CLAUDE.md rule' | 'skill' | 'command'
- score: float (frequency × cross-project bonus)
- example_sessions: list of up to 2 session_ids
"""
total_sessions = len(sessions_data)
if total_sessions == 0:
return []
# ── Step 1: Build per-session token lists and n-gram index ────────────────
# ngram_index: ngram_tuple -> list of {session_id, project, original_message}
ngram_index: Dict[Tuple, List[Dict]] = defaultdict(list)
for sd in sessions_data:
session_id = sd['session_id']
project = sd['project']
for msg in sd['messages']:
tokens = normalize_text(msg)
if len(tokens) < 3:
continue
for n in range(3, 7): # 3-6 word n-grams
for ngram in extract_ngrams(tokens, n):
# Filter n-grams that are all stop words (shouldn't happen after normalize)
if all(t in _STOP_WORDS for t in ngram):
continue
ngram_index[ngram].append({
'session_id': session_id,
'project': project,
'msg': msg[:80],
})
# ── Step 2: Filter n-grams below min_count ────────────────────────────────
frequent_ngrams = {
ng: occurrences
for ng, occurrences in ngram_index.items()
if len(occurrences) >= min_count
}
# ── Step 3: Deduplicate — prefer longer n-gram if it subsumes shorter ─────
# Sort by length desc, then count desc
sorted_ngrams = sorted(
frequent_ngrams.items(),
key=lambda x: (len(x[0]), len(x[1])),
reverse=True,
)
kept_ngrams: List[Tuple[Tuple, List[Dict]]] = []
subsumed: set = set()
for ngram, occurrences in sorted_ngrams:
if ngram in subsumed:
continue
kept_ngrams.append((ngram, occurrences))
# Mark all sub-ngrams of this ngram as subsumed
for n in range(3, len(ngram)):
for i in range(len(ngram) - n + 1):
sub = ngram[i:i+n]
subsumed.add(sub)
# ── Step 4: Similarity clustering — merge near-duplicate phrases ──────────
# Group kept_ngrams by token overlap > 60%
clusters: List[List[int]] = []
assigned = set()
for i, (ng_i, _) in enumerate(kept_ngrams):
if i in assigned:
continue
cluster = [i]
for j, (ng_j, _) in enumerate(kept_ngrams):
if j <= i or j in assigned:
continue
overlap = token_overlap(list(ng_i), list(ng_j))
if overlap > 0.6:
cluster.append(j)
assigned.add(j)
clusters.append(cluster)
assigned.add(i)
# ── Step 5: Build suggestion per cluster (representative = highest count) ──
suggestions = []
for cluster in clusters:
# Pick representative: longest ngram with most occurrences
best_idx = max(cluster, key=lambda i: (len(kept_ngrams[i][0]), len(kept_ngrams[i][1])))
ngram, occurrences = kept_ngrams[best_idx]
# Aggregate across cluster members
all_occurrences = []
for idx in cluster:
all_occurrences.extend(kept_ngrams[idx][1])
distinct_sessions = list({o['session_id'] for o in all_occurrences})
distinct_projects = list({o['project'] for o in all_occurrences})
count = len(all_occurrences)
session_count = len(distinct_sessions)
project_count = len(distinct_projects)
cross_project = project_count >= 2
if session_count < min_count:
continue
session_pct = session_count / total_sessions
# Categorize
if session_pct > 0.20:
category = 'CLAUDE.md rule'
elif session_pct >= 0.05:
category = 'skill'
else:
category = 'command'
# Score: base = session_pct, bonus × 1.5 if cross-project
score = session_pct * (1.5 if cross_project else 1.0)
phrase = ' '.join(ngram)
suggestions.append({
'pattern': phrase,
'count': count,
'session_count': session_count,
'project_count': project_count,
'cross_project': cross_project,
'category': category,
'score': round(score, 4),
'example_sessions': distinct_sessions[:2],
})
# ── Step 6: Sort by score desc, truncate ──────────────────────────────────
suggestions.sort(key=lambda x: x['score'], reverse=True)
return suggestions[:top]
def cmd_discover(
project_dirs: List[Path],
since: str = '90d',
min_count: int = 3,
top: int = 20,
json_output: bool = False,
):
"""Analyze sessions and surface patterns worth extracting as skills/commands/rules."""
since_dt = parse_duration(since)
print(f"Scanning sessions since {since_dt.strftime('%Y-%m-%d')}...", file=sys.stderr)
sessions_data = collect_sessions_data(project_dirs, since_dt)
if not sessions_data:
print("No sessions found in the given time range.", file=sys.stderr)
return
print(f"Analyzing {len(sessions_data)} sessions across "
f"{len({sd['project'] for sd in sessions_data})} project(s)...", file=sys.stderr)
suggestions = discover_patterns(sessions_data, min_count=min_count, top=top)
if not suggestions:
print("No recurring patterns found (try --min-count 2 or --since 180d).", file=sys.stderr)
return
if json_output:
print(json.dumps(suggestions, indent=2))
return
# ── Human-readable output ─────────────────────────────────────────────────
by_category: Dict[str, List[Dict]] = defaultdict(list)
for s in suggestions:
by_category[s['category']].append(s)
category_order = ['CLAUDE.md rule', 'skill', 'command']
category_icons = {
'CLAUDE.md rule': '📋',
'skill': '🧩',
'command': '',
}
total_sessions = len(sessions_data)
total_projects = len({sd['project'] for sd in sessions_data})
print()
print(f" cc-sessions discover — {total_sessions} sessions · {total_projects} project(s) · since {since}")
print()
for cat in category_order:
items = by_category.get(cat, [])
if not items:
continue
icon = category_icons[cat]
print(f" {icon} {cat.upper()}")
print(f" {'' * 60}")
for item in items:
tag = ' [cross-project]' if item['cross_project'] else ''
pct = item['session_count'] / total_sessions * 100
print(
f" {item['pattern']}"
f"{tag}"
)
print(
f" {item['session_count']} sessions ({pct:.0f}%) · "
f"{item['count']} occurrences · "
f"score {item['score']:.3f}"
)
for ex in item['example_sessions']:
print(f"{ex[:36]}")
print()
print()
print(f" Run with --json to pipe to jq for further processing.")
print()
# ─── DISCOVER --llm subcommand ────────────────────────────────────────────────
def deduplicate_messages_for_llm(sessions_data: List[Dict], max_messages: int = 300) -> List[Dict]:
"""
Deduplicate semantically similar messages using Jaccard similarity.
Returns list of {text, count, projects} sorted by frequency desc.
"""
all_msgs = []
for sd in sessions_data:
for msg in sd.get('messages', []):
all_msgs.append({
'text': msg[:500],
'project': sd['project'],
'tokens': normalize_text(msg),
})
if not all_msgs:
return []
clusters: List[List[int]] = []
assigned = set()
for i, m in enumerate(all_msgs):
if i in assigned:
continue
cluster = [i]
# Limit comparison window for performance on large sets
window_end = min(i + 300, len(all_msgs))
for j in range(i + 1, window_end):
if j in assigned:
continue
if token_overlap(m['tokens'], all_msgs[j]['tokens']) > 0.65:
cluster.append(j)
assigned.add(j)
clusters.append(cluster)
assigned.add(i)
deduped = []
for cluster in clusters:
representative = all_msgs[cluster[0]]
projects = list({all_msgs[i]['project'] for i in cluster})
deduped.append({
'text': representative['text'],
'count': len(cluster),
'projects': projects,
})
deduped.sort(key=lambda x: x['count'], reverse=True)
return deduped[:max_messages]
def build_analysis_prompt(messages: List[Dict]) -> str:
lines = []
for i, m in enumerate(messages, 1):
count_info = f" (x{m['count']})" if m['count'] > 1 else ""
cross = " [multi-project]" if len(m['projects']) > 1 else ""
text = m['text'][:200].replace('\n', ' ')
lines.append(f"{i}. {text}{count_info}{cross}")
messages_block = '\n'.join(lines)
return f"""You are analyzing a developer's Claude Code session history to find recurring patterns worth extracting as reusable configurations.
Below are user messages (deduplicated). Numbers in parentheses show how many times a similar message appeared. [multi-project] means it appeared across different codebases.
MESSAGES:
{messages_block}
Identify recurring patterns and suggest what to extract. For each suggestion, choose the category:
- CLAUDE.md rule: a behavioral instruction that should always be active (broad constraint or guideline)
- skill: specialized expertise loaded on-demand (domain-specific, not always needed)
- command: a repeatable step-by-step workflow with clear inputs/outputs
Return ONLY a JSON array, no prose outside it:
[
{{
"pattern": "short description of the recurring intent (max 60 chars)",
"category": "CLAUDE.md rule",
"suggested_name": "kebab-case-name",
"rationale": "one sentence explaining why this should be extracted",
"frequency": "high",
"example_messages": ["example 1", "example 2"],
"suggested_content": "what the skill/command/rule would contain (2-3 sentences)"
}}
]
Rules:
- Only include genuinely recurring patterns (at least 2 messages with similar intent)
- Prefer specific, actionable suggestions over generic ones
- Maximum 15 suggestions, sorted by impact (most valuable first)"""
def call_claude_cli(messages_batch: List[Dict], model: str) -> List[Dict]:
"""
Call the local `claude --print` CLI (uses your existing subscription).
No API key required.
"""
import subprocess
import tempfile
prompt = build_analysis_prompt(messages_batch)
# claude --print accepts the prompt as a positional argument
cmd = ['claude', '--print', prompt]
if model:
cmd += ['--model', model]
# Remove CLAUDECODE so the subprocess isn't blocked by nested-session detection
env = os.environ.copy()
env.pop('CLAUDECODE', None)
env.pop('CLAUDE_CODE_ENTRYPOINT', None)
try:
result = subprocess.run(
cmd,
env=env,
capture_output=True,
text=True,
timeout=120,
)
except FileNotFoundError:
raise RuntimeError("'claude' CLI not found. Make sure Claude Code is installed and in PATH.")
except subprocess.TimeoutExpired:
raise RuntimeError("claude CLI timed out after 120s.")
if result.returncode != 0:
detail = (result.stderr or result.stdout)[:500]
raise RuntimeError(f"claude CLI error (exit {result.returncode}):\n{detail}")
text = result.stdout.strip()
# Catch runtime errors reported on stdout (returncode 0 but failed)
if text.lower().startswith('execution error') or text.lower().startswith('error:'):
stderr_hint = f"\nstderr: {result.stderr[:300]}" if result.stderr.strip() else ""
raise RuntimeError(f"claude CLI reported an error:\n{text[:300]}{stderr_hint}")
# Strip markdown code fences if present
if text.startswith('```'):
text = re.sub(r'^```(?:json)?\n?', '', text)
text = re.sub(r'\n?```$', '', text)
# Extract JSON array if surrounded by prose
match = re.search(r'\[.*\]', text, re.DOTALL)
if match:
text = match.group(0)
try:
suggestions = json.loads(text)
except json.JSONDecodeError as e:
raise RuntimeError(f"Failed to parse CLI response as JSON: {e}\nRaw: {text[:500]}") from e
if not isinstance(suggestions, list):
raise RuntimeError(f"Expected JSON array, got: {type(suggestions)}")
return suggestions
def cmd_discover_llm(
project_dirs: List[Path],
since: str = '90d',
top: int = 15,
model: str = LLM_DEFAULT_MODEL,
json_output: bool = False,
):
"""LLM-powered pattern discovery via `claude --print` (uses your subscription)."""
since_dt = parse_duration(since)
print(f"Scanning sessions since {since_dt.strftime('%Y-%m-%d')}...", file=sys.stderr)
sessions_data = collect_sessions_data(project_dirs, since_dt)
if not sessions_data:
print("No sessions found in the given time range.", file=sys.stderr)
return
print(f"Collected {len(sessions_data)} sessions — deduplicating messages...", file=sys.stderr)
deduped = deduplicate_messages_for_llm(sessions_data, max_messages=300)
if not deduped:
print("No user messages found.", file=sys.stderr)
return
batch = deduped[:LLM_BATCH_SIZE]
print(f"Sending {len(batch)} unique messages to claude --print ({model})...", file=sys.stderr)
try:
suggestions = call_claude_cli(batch, model)
except RuntimeError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
suggestions = suggestions[:top]
if json_output:
print(json.dumps(suggestions, indent=2))
return
total_sessions = len(sessions_data)
total_projects = len({sd['project'] for sd in sessions_data})
print()
print(f" cc-sessions discover --llm — {total_sessions} sessions · {total_projects} project(s) · {model}")
print()
by_category: Dict[str, List[Dict]] = defaultdict(list)
for s in suggestions:
by_category[s.get('category', 'command')].append(s)
category_order = ['CLAUDE.md rule', 'skill', 'command']
category_icons = {'CLAUDE.md rule': '📋', 'skill': '🧩', 'command': ''}
for cat in category_order:
items = by_category.get(cat, [])
if not items:
continue
icon = category_icons.get(cat, '')
print(f" {icon} {cat.upper()}")
print(f" {'' * 70}")
for item in items:
freq = item.get('frequency', '')
freq_tag = f" [{freq}]" if freq else ""
print(f" {item.get('pattern', '?')}{freq_tag}")
print(f" -> /{item.get('suggested_name', '?')}")
print(f" {item.get('rationale', '')}")
if item.get('suggested_content'):
print(f" Content: {item['suggested_content']}")
for ex in item.get('example_messages', [])[:2]:
print(f" e.g. \"{ex[:100].replace(chr(10), ' ')}\"")
print()
print()
print(f" Run with --json to pipe to jq.")
print()
# ─── main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Search Claude Code session history")
parser.add_argument('--all', action='store_true', help="Search all projects")
parser.add_argument('--json', action='store_true', help="JSON output")
subparsers = parser.add_subparsers(dest='command', required=True)
# search
search_parser = subparsers.add_parser('search', help="Search sessions by keyword")
search_parser.add_argument('keyword', help="Search keyword")
search_parser.add_argument('--limit', type=int, default=10, help="Max results")
search_parser.add_argument('--since', help="Filter by date (7d, 30d, or ISO date)")
search_parser.add_argument('--branch', help="Filter by git branch")
# recent
recent_parser = subparsers.add_parser('recent', help="Show recent sessions")
recent_parser.add_argument('limit', nargs='?', type=int, default=10, help="Number of sessions")
# info
info_parser = subparsers.add_parser('info', help="Show session details")
info_parser.add_argument('session_id', help="Session ID (partial match)")
# resume
resume_parser = subparsers.add_parser('resume', help="Resume a session")
resume_parser.add_argument('session_id', help="Session ID (partial match)")
# reindex
subparsers.add_parser('reindex', help="Force rebuild index")
# discover
discover_parser = subparsers.add_parser(
'discover',
help="Analyze sessions to suggest skills, commands, and CLAUDE.md rules",
)
discover_parser.add_argument(
'--since', default='90d',
help="Time window to analyze (default: 90d)",
)
discover_parser.add_argument(
'--min-count', type=int, default=3,
help="Minimum occurrences to surface a pattern (default: 3)",
)
discover_parser.add_argument(
'--top', type=int, default=20,
help="Maximum number of suggestions to show (default: 20)",
)
discover_parser.add_argument(
'--llm', action='store_true',
help="Use 'claude --print' for semantic analysis (uses your subscription)",
)
discover_parser.add_argument(
'--model', default=LLM_DEFAULT_MODEL,
help="Claude model to use with --llm, e.g. 'haiku' or 'sonnet' (default: CLI default)",
)
args = parser.parse_args()
# Get project dirs
if args.command in ['search', 'recent', 'discover']:
project_dirs = get_project_dirs(args.all)
if not project_dirs:
if args.all:
print("Error: No projects found", file=sys.stderr)
else:
print("Error: Not in a Claude project directory", file=sys.stderr)
print("Hint: Use --all to search all projects", file=sys.stderr)
sys.exit(1)
# Execute command
if args.command == 'search':
cmd_search(args.keyword, project_dirs, args.limit, args.since, args.branch, args.json)
elif args.command == 'recent':
cmd_recent(project_dirs, args.limit, args.json)
elif args.command == 'info':
cmd_info(args.session_id)
elif args.command == 'resume':
cmd_resume(args.session_id)
elif args.command == 'reindex':
cmd_reindex()
elif args.command == 'discover':
if args.llm:
cmd_discover_llm(
project_dirs,
since=args.since,
top=args.top,
model=args.model,
json_output=args.json,
)
else:
cmd_discover(
project_dirs,
since=args.since,
min_count=args.min_count,
top=args.top,
json_output=args.json,
)
if __name__ == '__main__':
main()