Bridge script enables local execution of Claude Code plans via LM Studio: - Python CLI with 5 components (DoobidooReader, LMStudioClient, Validator, StepExecutor, PlanExecutor) - JSON Schema for plan validation (bridge-plan-schema.json) - Cost optimization: Plan with Opus (~$0.50-2), execute free locally (80-90% savings) - 4 validation types: json, syntax_check, contains_keys, non_empty - CLI: --health, --list, --plan ID, -v verbose mode Documentation: - New section "Local Execution Bridge" in ultimate-guide.md §11.2 - scripts/README.md with full usage documentation - machine-readable/reference.yaml entries for discoverability Co-Authored-By: Claude <noreply@anthropic.com>
715 lines
23 KiB
Python
Executable file
715 lines
23 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Bridge Script: Claude Code → doobidoo → LM Studio
|
|
|
|
Orchestrates plan execution from Claude Code (stored in doobidoo SQLite)
|
|
to local LM Studio for cost-effective execution.
|
|
|
|
Usage:
|
|
python bridge.py # Execute all pending plans
|
|
python bridge.py --list # List pending plans
|
|
python bridge.py --plan ID # Execute specific plan
|
|
python bridge.py --health # Check LM Studio connectivity
|
|
|
|
Requires:
|
|
- doobidoo MCP server with SQLite backend (~/.mcp-memory-service/)
|
|
- LM Studio running on localhost:1234
|
|
- httpx: pip install httpx
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ast
|
|
import json
|
|
import logging
|
|
import shutil
|
|
import sqlite3
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
print("Error: httpx required. Install with: pip install httpx")
|
|
sys.exit(1)
|
|
|
|
|
|
# === Configuration ===
|
|
|
|
DOOBIDOO_DB = Path.home() / ".mcp-memory-service" / "memories.db"
|
|
LM_STUDIO_URL = "http://localhost:1234/v1/chat/completions"
|
|
LM_STUDIO_TIMEOUT = 120.0
|
|
DEFAULT_MODEL = "default" # LM Studio uses loaded model
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s | %(levelname)-7s | %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
log = logging.getLogger("bridge")
|
|
|
|
|
|
# === Data Classes ===
|
|
|
|
|
|
@dataclass
|
|
class StepResult:
|
|
"""Result of executing a single step."""
|
|
|
|
step_id: int
|
|
success: bool
|
|
output: str
|
|
error: str | None = None
|
|
retries: int = 0
|
|
|
|
|
|
@dataclass
|
|
class PlanResult:
|
|
"""Result of executing a complete plan."""
|
|
|
|
plan_id: str
|
|
success: bool
|
|
steps: list[StepResult] = field(default_factory=list)
|
|
error: str | None = None
|
|
|
|
|
|
# === Doobidoo Reader ===
|
|
|
|
|
|
class DoobidooReader:
|
|
"""Direct SQLite access to doobidoo memory database."""
|
|
|
|
def __init__(self, db_path: Path = DOOBIDOO_DB):
|
|
self.db_path = db_path
|
|
if not db_path.exists():
|
|
raise FileNotFoundError(f"doobidoo database not found: {db_path}")
|
|
|
|
def _connect(self) -> sqlite3.Connection:
|
|
return sqlite3.connect(self.db_path)
|
|
|
|
def get_pending_plans(self) -> list[dict[str, Any]]:
|
|
"""Fetch all plans with status=pending."""
|
|
plans = []
|
|
with self._connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id, content, metadata, created_at
|
|
FROM memories
|
|
WHERE content LIKE '%"$schema": "bridge-plan-v1"%'
|
|
ORDER BY created_at DESC
|
|
"""
|
|
)
|
|
for row in cursor:
|
|
try:
|
|
content = json.loads(row[1])
|
|
if content.get("status") == "pending":
|
|
plans.append(
|
|
{
|
|
"db_id": row[0],
|
|
"plan": content,
|
|
"metadata": json.loads(row[2]) if row[2] else {},
|
|
"created_at": row[3],
|
|
}
|
|
)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return plans
|
|
|
|
def get_plan_by_id(self, plan_id: str) -> dict[str, Any] | None:
|
|
"""Fetch a specific plan by its plan ID."""
|
|
with self._connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id, content, metadata, created_at
|
|
FROM memories
|
|
WHERE content LIKE ?
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
""",
|
|
(f'%"id": "{plan_id}"%',),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
try:
|
|
content = json.loads(row[1])
|
|
if content.get("$schema") == "bridge-plan-v1":
|
|
return {
|
|
"db_id": row[0],
|
|
"plan": content,
|
|
"metadata": json.loads(row[2]) if row[2] else {},
|
|
"created_at": row[3],
|
|
}
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return None
|
|
|
|
def update_plan_status(self, db_id: str, status: str) -> None:
|
|
"""Update plan status in the database."""
|
|
with self._connect() as conn:
|
|
cursor = conn.execute(
|
|
"SELECT content FROM memories WHERE id = ?", (db_id,)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
content = json.loads(row[0])
|
|
content["status"] = status
|
|
conn.execute(
|
|
"UPDATE memories SET content = ? WHERE id = ?",
|
|
(json.dumps(content), db_id),
|
|
)
|
|
conn.commit()
|
|
|
|
def store_result(self, plan_id: str, result: PlanResult) -> None:
|
|
"""Store execution result as a new memory entry."""
|
|
result_data = {
|
|
"type": "bridge_result",
|
|
"plan_id": plan_id,
|
|
"success": result.success,
|
|
"executed_at": datetime.now().isoformat(),
|
|
"steps": [
|
|
{
|
|
"id": s.step_id,
|
|
"success": s.success,
|
|
"output": s.output[:2000], # Truncate for storage
|
|
"error": s.error,
|
|
"retries": s.retries,
|
|
}
|
|
for s in result.steps
|
|
],
|
|
"error": result.error,
|
|
}
|
|
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO memories (id, content, metadata, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(
|
|
f"result_{plan_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
|
|
json.dumps(result_data),
|
|
json.dumps({"tags": ["result", plan_id]}),
|
|
datetime.now().isoformat(),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# === LM Studio Client ===
|
|
|
|
|
|
class LMStudioClient:
|
|
"""HTTP client for LM Studio API."""
|
|
|
|
def __init__(self, base_url: str = LM_STUDIO_URL, timeout: float = LM_STUDIO_TIMEOUT):
|
|
self.base_url = base_url
|
|
self.timeout = timeout
|
|
self.client = httpx.Client(timeout=timeout)
|
|
|
|
def health_check(self) -> bool:
|
|
"""Check if LM Studio is running and responsive."""
|
|
try:
|
|
# Try models endpoint first
|
|
response = self.client.get(
|
|
self.base_url.replace("/chat/completions", "/models")
|
|
)
|
|
return response.status_code == 200
|
|
except httpx.RequestError:
|
|
return False
|
|
|
|
def generate(self, prompt: str, system: str | None = None) -> str:
|
|
"""Generate completion from LM Studio."""
|
|
messages = []
|
|
if system:
|
|
messages.append({"role": "system", "content": system})
|
|
messages.append({"role": "user", "content": prompt})
|
|
|
|
response = self.client.post(
|
|
self.base_url,
|
|
json={
|
|
"model": DEFAULT_MODEL,
|
|
"messages": messages,
|
|
"temperature": 0.3,
|
|
"max_tokens": 4096,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["choices"][0]["message"]["content"]
|
|
|
|
def close(self) -> None:
|
|
"""Close the HTTP client."""
|
|
self.client.close()
|
|
|
|
|
|
# === Validators ===
|
|
|
|
|
|
class Validator:
|
|
"""Validation functions for step outputs."""
|
|
|
|
@staticmethod
|
|
def validate(output: str, validation: dict[str, Any]) -> tuple[bool, str | None]:
|
|
"""Validate output based on validation config."""
|
|
val_type = validation.get("type", "non_empty")
|
|
|
|
if val_type == "non_empty":
|
|
return Validator.non_empty(output)
|
|
elif val_type == "json":
|
|
return Validator.json_valid(output)
|
|
elif val_type == "syntax_check":
|
|
return Validator.python_syntax(output)
|
|
elif val_type == "contains_keys":
|
|
keys = validation.get("keys", [])
|
|
return Validator.contains_keys(output, keys)
|
|
else:
|
|
return True, None
|
|
|
|
@staticmethod
|
|
def non_empty(output: str) -> tuple[bool, str | None]:
|
|
"""Check output is not empty or whitespace."""
|
|
if output and output.strip():
|
|
return True, None
|
|
return False, "Output is empty"
|
|
|
|
@staticmethod
|
|
def json_valid(output: str) -> tuple[bool, str | None]:
|
|
"""Check output is valid JSON."""
|
|
try:
|
|
# Try to extract JSON from markdown code blocks
|
|
content = output
|
|
if "```json" in output:
|
|
start = output.find("```json") + 7
|
|
end = output.find("```", start)
|
|
if end > start:
|
|
content = output[start:end].strip()
|
|
elif "```" in output:
|
|
start = output.find("```") + 3
|
|
end = output.find("```", start)
|
|
if end > start:
|
|
content = output[start:end].strip()
|
|
|
|
json.loads(content)
|
|
return True, None
|
|
except json.JSONDecodeError as e:
|
|
return False, f"Invalid JSON: {e}"
|
|
|
|
@staticmethod
|
|
def python_syntax(output: str) -> tuple[bool, str | None]:
|
|
"""Check output is valid Python syntax."""
|
|
try:
|
|
# Extract code from markdown blocks
|
|
content = output
|
|
if "```python" in output:
|
|
start = output.find("```python") + 9
|
|
end = output.find("```", start)
|
|
if end > start:
|
|
content = output[start:end].strip()
|
|
elif "```" in output:
|
|
start = output.find("```") + 3
|
|
end = output.find("```", start)
|
|
if end > start:
|
|
content = output[start:end].strip()
|
|
|
|
ast.parse(content)
|
|
return True, None
|
|
except SyntaxError as e:
|
|
return False, f"Syntax error: {e}"
|
|
|
|
@staticmethod
|
|
def contains_keys(output: str, keys: list[str]) -> tuple[bool, str | None]:
|
|
"""Check JSON output contains required keys."""
|
|
valid, error = Validator.json_valid(output)
|
|
if not valid:
|
|
return valid, error
|
|
|
|
try:
|
|
# Extract JSON content
|
|
content = output
|
|
if "```json" in output:
|
|
start = output.find("```json") + 7
|
|
end = output.find("```", start)
|
|
if end > start:
|
|
content = output[start:end].strip()
|
|
|
|
data = json.loads(content)
|
|
missing = [k for k in keys if k not in data]
|
|
if missing:
|
|
return False, f"Missing keys: {missing}"
|
|
return True, None
|
|
except (json.JSONDecodeError, TypeError) as e:
|
|
return False, f"JSON parse error: {e}"
|
|
|
|
|
|
# === Step Executor ===
|
|
|
|
|
|
class StepExecutor:
|
|
"""Executes individual plan steps."""
|
|
|
|
def __init__(self, lm_client: LMStudioClient, project_root: Path | None = None):
|
|
self.lm_client = lm_client
|
|
self.project_root = project_root
|
|
self.context_accumulator: dict[int, str] = {}
|
|
|
|
def load_file_context(self, files_context: dict[str, str]) -> str:
|
|
"""Load file contents for context injection."""
|
|
if not self.project_root or not files_context:
|
|
return ""
|
|
|
|
context_parts = []
|
|
for file_path, load_type in files_context.items():
|
|
full_path = self.project_root / file_path
|
|
if load_type == "LOAD" and full_path.exists():
|
|
try:
|
|
content = full_path.read_text()
|
|
context_parts.append(f"=== {file_path} ===\n{content}")
|
|
except Exception as e:
|
|
context_parts.append(f"=== {file_path} ===\n[Error loading: {e}]")
|
|
elif load_type == "REFERENCE":
|
|
context_parts.append(f"=== {file_path} ===\n[File reference only]")
|
|
|
|
return "\n\n".join(context_parts)
|
|
|
|
def build_prompt(self, step: dict[str, Any], file_context: str) -> str:
|
|
"""Build complete prompt with context."""
|
|
parts = []
|
|
|
|
# Add file context if available
|
|
if file_context:
|
|
parts.append("## File Context\n" + file_context)
|
|
|
|
# Add results from dependencies
|
|
depends_on = step.get("depends_on", [])
|
|
if depends_on:
|
|
dep_context = []
|
|
for dep_id in depends_on:
|
|
if dep_id in self.context_accumulator:
|
|
dep_context.append(
|
|
f"## Result from Step {dep_id}\n{self.context_accumulator[dep_id]}"
|
|
)
|
|
if dep_context:
|
|
parts.append("\n\n".join(dep_context))
|
|
|
|
# Add the actual prompt
|
|
parts.append("## Task\n" + step["prompt"])
|
|
|
|
return "\n\n".join(parts)
|
|
|
|
def execute(
|
|
self, step: dict[str, Any], file_context: str
|
|
) -> StepResult:
|
|
"""Execute a single step with retries."""
|
|
step_id = step["id"]
|
|
max_retries = step.get("max_retries", 2)
|
|
on_failure = step.get("on_failure", "retry_with_context")
|
|
validation = step.get("validation", {"type": "non_empty"})
|
|
|
|
prompt = self.build_prompt(step, file_context)
|
|
retries = 0
|
|
last_error = None
|
|
|
|
while retries <= max_retries:
|
|
try:
|
|
log.info(f"Step {step_id}: Executing (attempt {retries + 1})")
|
|
output = self.lm_client.generate(prompt)
|
|
|
|
# Validate output
|
|
valid, error = Validator.validate(output, validation)
|
|
if valid:
|
|
# Store in accumulator for dependent steps
|
|
self.context_accumulator[step_id] = output
|
|
|
|
# Write file if specified
|
|
file_output = step.get("file_output")
|
|
if file_output and self.project_root:
|
|
self._write_output(file_output, output)
|
|
|
|
log.info(f"Step {step_id}: Success")
|
|
return StepResult(
|
|
step_id=step_id,
|
|
success=True,
|
|
output=output,
|
|
retries=retries,
|
|
)
|
|
|
|
last_error = error
|
|
log.warning(f"Step {step_id}: Validation failed - {error}")
|
|
|
|
if on_failure == "retry_with_context" and retries < max_retries:
|
|
# Add error context to prompt for retry
|
|
prompt += f"\n\n## Previous Attempt Failed\nError: {error}\nPlease fix and try again."
|
|
retries += 1
|
|
elif on_failure == "skip":
|
|
return StepResult(
|
|
step_id=step_id,
|
|
success=False,
|
|
output=output,
|
|
error=f"Skipped after validation failure: {error}",
|
|
)
|
|
else:
|
|
break
|
|
|
|
except httpx.RequestError as e:
|
|
last_error = str(e)
|
|
log.error(f"Step {step_id}: Request error - {e}")
|
|
retries += 1
|
|
|
|
return StepResult(
|
|
step_id=step_id,
|
|
success=False,
|
|
output="",
|
|
error=last_error,
|
|
retries=retries,
|
|
)
|
|
|
|
def _write_output(self, file_path: str, output: str) -> None:
|
|
"""Write step output to file."""
|
|
if not self.project_root:
|
|
return
|
|
|
|
full_path = self.project_root / file_path
|
|
|
|
# Extract code from markdown blocks
|
|
content = output
|
|
if "```python" in output:
|
|
start = output.find("```python") + 9
|
|
end = output.find("```", start)
|
|
if end > start:
|
|
content = output[start:end].strip()
|
|
elif "```" in output:
|
|
start = output.find("```") + 3
|
|
# Skip language identifier if present
|
|
newline = output.find("\n", start)
|
|
if newline > start:
|
|
start = newline + 1
|
|
end = output.find("```", start)
|
|
if end > start:
|
|
content = output[start:end].strip()
|
|
|
|
# Create parent directories
|
|
full_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Backup existing file
|
|
if full_path.exists():
|
|
backup = full_path.with_suffix(full_path.suffix + ".bak")
|
|
shutil.copy2(full_path, backup)
|
|
|
|
full_path.write_text(content)
|
|
log.info(f"Wrote output to {file_path}")
|
|
|
|
|
|
# === Plan Executor ===
|
|
|
|
|
|
class PlanExecutor:
|
|
"""Executes complete plans."""
|
|
|
|
def __init__(self, reader: DoobidooReader, lm_client: LMStudioClient):
|
|
self.reader = reader
|
|
self.lm_client = lm_client
|
|
|
|
def execute(self, plan_data: dict[str, Any]) -> PlanResult:
|
|
"""Execute a complete plan."""
|
|
db_id = plan_data["db_id"]
|
|
plan = plan_data["plan"]
|
|
plan_id = plan["id"]
|
|
|
|
log.info(f"Executing plan: {plan_id}")
|
|
log.info(f"Objective: {plan['context'].get('objective', 'N/A')}")
|
|
|
|
# Update status to in_progress
|
|
self.reader.update_plan_status(db_id, "in_progress")
|
|
|
|
# Setup executor
|
|
project_root = None
|
|
if project_path := plan["context"].get("project"):
|
|
project_root = Path(project_path)
|
|
if not project_root.exists():
|
|
log.warning(f"Project root does not exist: {project_root}")
|
|
project_root = None
|
|
|
|
step_executor = StepExecutor(self.lm_client, project_root)
|
|
|
|
# Load file context
|
|
files_context = plan["context"].get("files_context", {})
|
|
file_context = step_executor.load_file_context(files_context)
|
|
|
|
# Track step results
|
|
results: list[StepResult] = []
|
|
failed_steps: set[int] = set()
|
|
|
|
# Execute steps in order
|
|
for step in plan["steps"]:
|
|
step_id = step["id"]
|
|
|
|
# Check dependencies
|
|
depends_on = step.get("depends_on", [])
|
|
if any(dep in failed_steps for dep in depends_on):
|
|
log.warning(f"Step {step_id}: Skipping due to failed dependency")
|
|
results.append(
|
|
StepResult(
|
|
step_id=step_id,
|
|
success=False,
|
|
output="",
|
|
error="Skipped: dependency failed",
|
|
)
|
|
)
|
|
failed_steps.add(step_id)
|
|
continue
|
|
|
|
# Execute step
|
|
result = step_executor.execute(step, file_context)
|
|
results.append(result)
|
|
|
|
if not result.success:
|
|
failed_steps.add(step_id)
|
|
on_failure = step.get("on_failure", "retry_with_context")
|
|
if on_failure == "halt":
|
|
log.error(f"Step {step_id} failed with halt policy - stopping plan")
|
|
break
|
|
|
|
# Determine overall success
|
|
success = all(r.success for r in results)
|
|
|
|
# Handle rollback if needed
|
|
if not success and plan.get("rollback_strategy") == "revert_files":
|
|
self._rollback(project_root, plan["steps"])
|
|
|
|
# Update final status
|
|
final_status = "completed" if success else "failed"
|
|
self.reader.update_plan_status(db_id, final_status)
|
|
|
|
# Create result
|
|
plan_result = PlanResult(
|
|
plan_id=plan_id,
|
|
success=success,
|
|
steps=results,
|
|
error=None if success else "One or more steps failed",
|
|
)
|
|
|
|
# Store result
|
|
self.reader.store_result(plan_id, plan_result)
|
|
|
|
log.info(f"Plan {plan_id}: {'Completed successfully' if success else 'Failed'}")
|
|
return plan_result
|
|
|
|
def _rollback(self, project_root: Path | None, steps: list[dict]) -> None:
|
|
"""Rollback file changes by restoring backups."""
|
|
if not project_root:
|
|
return
|
|
|
|
log.info("Rolling back file changes...")
|
|
for step in steps:
|
|
if file_output := step.get("file_output"):
|
|
backup = project_root / (file_output + ".bak")
|
|
original = project_root / file_output
|
|
if backup.exists():
|
|
shutil.move(str(backup), str(original))
|
|
log.info(f"Restored {file_output}")
|
|
|
|
|
|
# === CLI ===
|
|
|
|
|
|
def main() -> int:
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Bridge: Claude Code → doobidoo → LM Studio"
|
|
)
|
|
parser.add_argument("--list", action="store_true", help="List pending plans")
|
|
parser.add_argument("--plan", type=str, help="Execute specific plan by ID")
|
|
parser.add_argument("--health", action="store_true", help="Check LM Studio health")
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
log.setLevel(logging.DEBUG)
|
|
|
|
# Health check
|
|
if args.health:
|
|
client = LMStudioClient()
|
|
if client.health_check():
|
|
print("LM Studio: OK (running at localhost:1234)")
|
|
client.close()
|
|
return 0
|
|
else:
|
|
print("LM Studio: NOT RUNNING")
|
|
print("Start LM Studio and load a model first.")
|
|
client.close()
|
|
return 1
|
|
|
|
# Check doobidoo database
|
|
try:
|
|
reader = DoobidooReader()
|
|
except FileNotFoundError as e:
|
|
print(f"Error: {e}")
|
|
print("Ensure doobidoo MCP server has been used at least once.")
|
|
return 1
|
|
|
|
# List plans
|
|
if args.list:
|
|
plans = reader.get_pending_plans()
|
|
if not plans:
|
|
print("No pending plans found.")
|
|
return 0
|
|
|
|
print(f"Found {len(plans)} pending plan(s):\n")
|
|
for p in plans:
|
|
plan = p["plan"]
|
|
print(f" ID: {plan['id']}")
|
|
print(f" Objective: {plan['context'].get('objective', 'N/A')}")
|
|
print(f" Steps: {len(plan['steps'])}")
|
|
print(f" Created: {p['created_at']}")
|
|
print()
|
|
return 0
|
|
|
|
# Check LM Studio
|
|
lm_client = LMStudioClient()
|
|
if not lm_client.health_check():
|
|
print("Error: LM Studio is not running at localhost:1234")
|
|
print("Start LM Studio and load a model first.")
|
|
lm_client.close()
|
|
return 1
|
|
|
|
executor = PlanExecutor(reader, lm_client)
|
|
|
|
# Execute specific plan
|
|
if args.plan:
|
|
plan_data = reader.get_plan_by_id(args.plan)
|
|
if not plan_data:
|
|
print(f"Plan not found: {args.plan}")
|
|
lm_client.close()
|
|
return 1
|
|
|
|
result = executor.execute(plan_data)
|
|
lm_client.close()
|
|
return 0 if result.success else 1
|
|
|
|
# Execute all pending plans
|
|
plans = reader.get_pending_plans()
|
|
if not plans:
|
|
print("No pending plans to execute.")
|
|
lm_client.close()
|
|
return 0
|
|
|
|
print(f"Executing {len(plans)} pending plan(s)...\n")
|
|
all_success = True
|
|
|
|
for plan_data in plans:
|
|
result = executor.execute(plan_data)
|
|
if not result.success:
|
|
all_success = False
|
|
print()
|
|
|
|
lm_client.close()
|
|
return 0 if all_success else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|