merge: resolve conflicts after merging main
Adapt runtime features (usage tracking, ping, heartbeat) to main's multi-workspace architecture. Update frontend imports from @multica/types to @/shared/types after the package consolidation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
commit
6ee034c6e9
151 changed files with 3664 additions and 6579 deletions
|
|
@ -4,63 +4,164 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/cli"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/execenv"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/usage"
|
||||
"github.com/multica-ai/multica/server/pkg/agent"
|
||||
)
|
||||
|
||||
// workspaceState tracks registered runtimes for a single workspace.
|
||||
type workspaceState struct {
|
||||
workspaceID string
|
||||
runtimeIDs []string
|
||||
}
|
||||
|
||||
// Daemon is the local agent runtime that polls for and executes tasks.
|
||||
type Daemon struct {
|
||||
cfg Config
|
||||
client *Client
|
||||
logger *slog.Logger
|
||||
|
||||
mu sync.Mutex
|
||||
workspaces map[string]*workspaceState
|
||||
runtimeIndex map[string]Runtime // runtimeID -> Runtime for provider lookups
|
||||
reloading sync.Mutex // prevents concurrent reloadWorkspaces
|
||||
}
|
||||
|
||||
// New creates a new Daemon instance.
|
||||
func New(cfg Config, logger *slog.Logger) *Daemon {
|
||||
return &Daemon{
|
||||
cfg: cfg,
|
||||
client: NewClient(cfg.ServerBaseURL),
|
||||
logger: logger,
|
||||
cfg: cfg,
|
||||
client: NewClient(cfg.ServerBaseURL),
|
||||
logger: logger,
|
||||
workspaces: make(map[string]*workspaceState),
|
||||
runtimeIndex: make(map[string]Runtime),
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the daemon: pairs if needed, registers runtimes, then polls for tasks.
|
||||
// Run starts the daemon: resolves auth, registers runtimes, then polls for tasks.
|
||||
func (d *Daemon) Run(ctx context.Context) error {
|
||||
agentNames := make([]string, 0, len(d.cfg.Agents))
|
||||
for name := range d.cfg.Agents {
|
||||
agentNames = append(agentNames, name)
|
||||
}
|
||||
d.logger.Info("starting daemon", "agents", agentNames, "workspace_id", d.cfg.WorkspaceID, "server", d.cfg.ServerBaseURL, "repos_root", d.cfg.ReposRoot)
|
||||
d.logger.Info("starting daemon", "agents", agentNames, "server", d.cfg.ServerBaseURL)
|
||||
|
||||
if strings.TrimSpace(d.cfg.WorkspaceID) == "" {
|
||||
workspaceID, err := d.ensurePaired(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.cfg.WorkspaceID = workspaceID
|
||||
d.logger.Info("pairing completed", "workspace_id", workspaceID)
|
||||
}
|
||||
|
||||
runtimes, err := d.registerRuntimes(ctx)
|
||||
if err != nil {
|
||||
// Load auth token from CLI config.
|
||||
if err := d.resolveAuth(); err != nil {
|
||||
return err
|
||||
}
|
||||
runtimeIDs := make([]string, 0, len(runtimes))
|
||||
for _, rt := range runtimes {
|
||||
d.logger.Info("registered runtime", "id", rt.ID, "provider", rt.Provider, "status", rt.Status)
|
||||
runtimeIDs = append(runtimeIDs, rt.ID)
|
||||
|
||||
// Load and register watched workspaces.
|
||||
if err := d.loadWatchedWorkspaces(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go d.heartbeatLoop(ctx, runtimes)
|
||||
go d.usageScanLoop(ctx, runtimes)
|
||||
return d.pollLoop(ctx, runtimeIDs)
|
||||
runtimeIDs := d.allRuntimeIDs()
|
||||
if len(runtimeIDs) == 0 {
|
||||
return fmt.Errorf("no runtimes registered")
|
||||
}
|
||||
|
||||
// Start config watcher for hot-reload.
|
||||
go d.configWatchLoop(ctx)
|
||||
|
||||
go d.heartbeatLoop(ctx)
|
||||
go d.usageScanLoop(ctx)
|
||||
return d.pollLoop(ctx)
|
||||
}
|
||||
|
||||
func (d *Daemon) registerRuntimes(ctx context.Context) ([]Runtime, error) {
|
||||
// resolveAuth loads the auth token from the CLI config.
|
||||
func (d *Daemon) resolveAuth() error {
|
||||
cfg, err := cli.LoadCLIConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("load CLI config: %w", err)
|
||||
}
|
||||
if cfg.Token == "" {
|
||||
d.logger.Warn("not authenticated — run 'multica auth login' to authenticate, then restart the daemon")
|
||||
return fmt.Errorf("not authenticated: run 'multica auth login' first")
|
||||
}
|
||||
d.client.SetToken(cfg.Token)
|
||||
d.logger.Info("authenticated")
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadWatchedWorkspaces reads watched workspaces from CLI config and registers runtimes.
|
||||
func (d *Daemon) loadWatchedWorkspaces(ctx context.Context) error {
|
||||
cfg, err := cli.LoadCLIConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("load CLI config: %w", err)
|
||||
}
|
||||
|
||||
if len(cfg.WatchedWorkspaces) == 0 {
|
||||
return fmt.Errorf("no watched workspaces configured: run 'multica watch <id>' to add one")
|
||||
}
|
||||
|
||||
var registered int
|
||||
for _, ws := range cfg.WatchedWorkspaces {
|
||||
runtimes, err := d.registerRuntimesForWorkspace(ctx, ws.ID)
|
||||
if err != nil {
|
||||
d.logger.Error("failed to register runtimes", "workspace_id", ws.ID, "name", ws.Name, "error", err)
|
||||
continue
|
||||
}
|
||||
runtimeIDs := make([]string, len(runtimes))
|
||||
for i, rt := range runtimes {
|
||||
runtimeIDs[i] = rt.ID
|
||||
d.logger.Info("registered runtime", "workspace_id", ws.ID, "runtime_id", rt.ID, "provider", rt.Provider)
|
||||
}
|
||||
d.mu.Lock()
|
||||
d.workspaces[ws.ID] = &workspaceState{workspaceID: ws.ID, runtimeIDs: runtimeIDs}
|
||||
for _, rt := range runtimes {
|
||||
d.runtimeIndex[rt.ID] = rt
|
||||
}
|
||||
d.mu.Unlock()
|
||||
d.logger.Info("watching workspace", "workspace_id", ws.ID, "name", ws.Name, "runtimes", len(runtimes))
|
||||
registered++
|
||||
}
|
||||
|
||||
if registered == 0 {
|
||||
return fmt.Errorf("failed to register runtimes for any of the %d watched workspace(s)", len(cfg.WatchedWorkspaces))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// allRuntimeIDs returns all runtime IDs across all watched workspaces.
|
||||
func (d *Daemon) allRuntimeIDs() []string {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
var ids []string
|
||||
for _, ws := range d.workspaces {
|
||||
ids = append(ids, ws.runtimeIDs...)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// findRuntime looks up a Runtime by its ID.
|
||||
func (d *Daemon) findRuntime(id string) *Runtime {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if rt, ok := d.runtimeIndex[id]; ok {
|
||||
return &rt
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// providerToRuntimeMap returns a mapping from provider name to runtime ID.
|
||||
func (d *Daemon) providerToRuntimeMap() map[string]string {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
m := make(map[string]string)
|
||||
for id, rt := range d.runtimeIndex {
|
||||
m[rt.Provider] = id
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID string) ([]Runtime, error) {
|
||||
var runtimes []map[string]string
|
||||
for name, entry := range d.cfg.Agents {
|
||||
version, err := agent.DetectVersion(ctx, entry.Path)
|
||||
|
|
@ -80,7 +181,7 @@ func (d *Daemon) registerRuntimes(ctx context.Context) ([]Runtime, error) {
|
|||
}
|
||||
|
||||
req := map[string]any{
|
||||
"workspace_id": d.cfg.WorkspaceID,
|
||||
"workspace_id": workspaceID,
|
||||
"daemon_id": d.cfg.DaemonID,
|
||||
"device_name": d.cfg.DeviceName,
|
||||
"runtimes": runtimes,
|
||||
|
|
@ -96,77 +197,106 @@ func (d *Daemon) registerRuntimes(ctx context.Context) ([]Runtime, error) {
|
|||
return rts, nil
|
||||
}
|
||||
|
||||
func (d *Daemon) ensurePaired(ctx context.Context) (string, error) {
|
||||
// Use a deterministic agent for the pairing session metadata (prefer codex for backward compat).
|
||||
var firstName string
|
||||
var firstEntry AgentEntry
|
||||
for _, preferred := range []string{"codex", "claude"} {
|
||||
if entry, ok := d.cfg.Agents[preferred]; ok {
|
||||
firstName = preferred
|
||||
firstEntry = entry
|
||||
break
|
||||
}
|
||||
}
|
||||
version, err := agent.DetectVersion(ctx, firstEntry.Path)
|
||||
// configWatchLoop periodically checks for config file changes and reloads workspaces.
|
||||
func (d *Daemon) configWatchLoop(ctx context.Context) {
|
||||
configPath, err := cli.CLIConfigPath()
|
||||
if err != nil {
|
||||
return "", err
|
||||
d.logger.Warn("cannot watch config file", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
session, err := d.client.CreatePairingSession(ctx, map[string]string{
|
||||
"daemon_id": d.cfg.DaemonID,
|
||||
"device_name": d.cfg.DeviceName,
|
||||
"runtime_name": d.cfg.RuntimeName,
|
||||
"runtime_type": firstName,
|
||||
"runtime_version": version,
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create pairing session: %w", err)
|
||||
}
|
||||
if session.LinkURL != nil {
|
||||
d.logger.Info("open this link to pair the daemon", "url", *session.LinkURL)
|
||||
} else {
|
||||
d.logger.Info("pairing session created", "token", session.Token)
|
||||
var lastModTime time.Time
|
||||
if info, err := os.Stat(configPath); err == nil {
|
||||
lastModTime = info.ModTime()
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(DefaultConfigReloadInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
current, err := d.client.GetPairingSession(ctx, session.Token)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("poll pairing session: %w", err)
|
||||
}
|
||||
|
||||
switch current.Status {
|
||||
case "approved", "claimed":
|
||||
if current.WorkspaceID == nil || strings.TrimSpace(*current.WorkspaceID) == "" {
|
||||
return "", fmt.Errorf("pairing session approved without workspace")
|
||||
return
|
||||
case <-ticker.C:
|
||||
info, err := os.Stat(configPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := SavePersistedConfig(d.cfg.ConfigPath, PersistedConfig{
|
||||
WorkspaceID: strings.TrimSpace(*current.WorkspaceID),
|
||||
}); err != nil {
|
||||
return "", err
|
||||
if !info.ModTime().After(lastModTime) {
|
||||
continue
|
||||
}
|
||||
if current.Status != "claimed" {
|
||||
if _, err := d.client.ClaimPairingSession(ctx, current.Token); err != nil {
|
||||
return "", fmt.Errorf("claim pairing session: %w", err)
|
||||
}
|
||||
}
|
||||
return strings.TrimSpace(*current.WorkspaceID), nil
|
||||
case "expired":
|
||||
return "", fmt.Errorf("pairing session expired before approval")
|
||||
}
|
||||
|
||||
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
|
||||
return "", err
|
||||
lastModTime = info.ModTime()
|
||||
d.reloadWorkspaces(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) heartbeatLoop(ctx context.Context, runtimes []Runtime) {
|
||||
// reloadWorkspaces reconciles the active workspace set with the config file.
|
||||
// NOTE: Token changes (e.g. re-login as a different user) are not picked up;
|
||||
// the daemon must be restarted for a new auth token to take effect.
|
||||
func (d *Daemon) reloadWorkspaces(ctx context.Context) {
|
||||
d.reloading.Lock()
|
||||
defer d.reloading.Unlock()
|
||||
|
||||
cfg, err := cli.LoadCLIConfig()
|
||||
if err != nil {
|
||||
d.logger.Warn("reload config failed", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
newIDs := make(map[string]string) // id -> name
|
||||
for _, ws := range cfg.WatchedWorkspaces {
|
||||
newIDs[ws.ID] = ws.Name
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
currentIDs := make(map[string]bool)
|
||||
for id := range d.workspaces {
|
||||
currentIDs[id] = true
|
||||
}
|
||||
d.mu.Unlock()
|
||||
|
||||
// Register runtimes for newly added workspaces.
|
||||
for id, name := range newIDs {
|
||||
if !currentIDs[id] {
|
||||
runtimes, err := d.registerRuntimesForWorkspace(ctx, id)
|
||||
if err != nil {
|
||||
d.logger.Error("register runtimes for new workspace failed", "workspace_id", id, "error", err)
|
||||
continue
|
||||
}
|
||||
runtimeIDs := make([]string, len(runtimes))
|
||||
for i, rt := range runtimes {
|
||||
runtimeIDs[i] = rt.ID
|
||||
}
|
||||
d.mu.Lock()
|
||||
d.workspaces[id] = &workspaceState{workspaceID: id, runtimeIDs: runtimeIDs}
|
||||
for _, rt := range runtimes {
|
||||
d.runtimeIndex[rt.ID] = rt
|
||||
}
|
||||
d.mu.Unlock()
|
||||
d.logger.Info("now watching workspace", "workspace_id", id, "name", name)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove workspaces no longer in config.
|
||||
// NOTE: runtimes are not deregistered server-side; they will go offline
|
||||
// after heartbeats stop arriving (within HeartbeatInterval).
|
||||
for id := range currentIDs {
|
||||
if _, ok := newIDs[id]; !ok {
|
||||
d.mu.Lock()
|
||||
if ws, exists := d.workspaces[id]; exists {
|
||||
for _, rid := range ws.runtimeIDs {
|
||||
delete(d.runtimeIndex, rid)
|
||||
}
|
||||
}
|
||||
delete(d.workspaces, id)
|
||||
d.mu.Unlock()
|
||||
d.logger.Info("stopped watching workspace", "workspace_id", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) heartbeatLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(d.cfg.HeartbeatInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
|
@ -175,16 +305,19 @@ func (d *Daemon) heartbeatLoop(ctx context.Context, runtimes []Runtime) {
|
|||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
for _, rt := range runtimes {
|
||||
resp, err := d.client.SendHeartbeat(ctx, rt.ID)
|
||||
for _, rid := range d.allRuntimeIDs() {
|
||||
resp, err := d.client.SendHeartbeat(ctx, rid)
|
||||
if err != nil {
|
||||
d.logger.Warn("heartbeat failed", "runtime_id", rt.ID, "error", err)
|
||||
d.logger.Warn("heartbeat failed", "runtime_id", rid, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle pending ping requests.
|
||||
if resp.PendingPing != nil {
|
||||
go d.handlePing(ctx, rt, resp.PendingPing.ID)
|
||||
rt := d.findRuntime(rid)
|
||||
if rt != nil {
|
||||
go d.handlePing(ctx, *rt, resp.PendingPing.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -265,31 +398,28 @@ func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) usageScanLoop(ctx context.Context, runtimes []Runtime) {
|
||||
func (d *Daemon) usageScanLoop(ctx context.Context) {
|
||||
scanner := usage.NewScanner(d.logger)
|
||||
|
||||
// Build provider -> runtime ID mapping.
|
||||
providerToRuntime := make(map[string]string)
|
||||
for _, rt := range runtimes {
|
||||
providerToRuntime[rt.Provider] = rt.ID
|
||||
}
|
||||
|
||||
report := func() {
|
||||
records := scanner.Scan()
|
||||
if len(records) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Build provider -> runtime ID mapping from current state.
|
||||
providerToRuntime := d.providerToRuntimeMap()
|
||||
|
||||
// Group records by provider to send to the correct runtime.
|
||||
byProvider := make(map[string][]map[string]any)
|
||||
for _, r := range records {
|
||||
byProvider[r.Provider] = append(byProvider[r.Provider], map[string]any{
|
||||
"date": r.Date,
|
||||
"provider": r.Provider,
|
||||
"model": r.Model,
|
||||
"input_tokens": r.InputTokens,
|
||||
"output_tokens": r.OutputTokens,
|
||||
"cache_read_tokens": r.CacheReadTokens,
|
||||
"date": r.Date,
|
||||
"provider": r.Provider,
|
||||
"model": r.Model,
|
||||
"input_tokens": r.InputTokens,
|
||||
"output_tokens": r.OutputTokens,
|
||||
"cache_read_tokens": r.CacheReadTokens,
|
||||
"cache_write_tokens": r.CacheWriteTokens,
|
||||
})
|
||||
}
|
||||
|
|
@ -324,7 +454,7 @@ func (d *Daemon) usageScanLoop(ctx context.Context, runtimes []Runtime) {
|
|||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) pollLoop(ctx context.Context, runtimeIDs []string) error {
|
||||
func (d *Daemon) pollLoop(ctx context.Context) error {
|
||||
pollOffset := 0
|
||||
pollCount := 0
|
||||
for {
|
||||
|
|
@ -334,6 +464,14 @@ func (d *Daemon) pollLoop(ctx context.Context, runtimeIDs []string) error {
|
|||
default:
|
||||
}
|
||||
|
||||
runtimeIDs := d.allRuntimeIDs()
|
||||
if len(runtimeIDs) == 0 {
|
||||
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
claimed := false
|
||||
n := len(runtimeIDs)
|
||||
for i := 0; i < n; i++ {
|
||||
|
|
@ -421,7 +559,7 @@ func (d *Daemon) runTask(ctx context.Context, task Task) (TaskResult, error) {
|
|||
}
|
||||
env, err := execenv.Prepare(execenv.PrepareParams{
|
||||
WorkspacesRoot: d.cfg.WorkspacesRoot,
|
||||
ReposRoot: d.cfg.ReposRoot,
|
||||
RepoPath: task.Context.RepoPath,
|
||||
TaskID: task.ID,
|
||||
AgentName: task.Context.Agent.Name,
|
||||
Task: taskCtx,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue