When an agent is working on an issue, users can now see real-time output
in the issue detail page instead of waiting for completion.
Backend:
- Add task_message table and migration for persisting agent messages
- Add POST /api/daemon/tasks/{id}/messages endpoint for daemon to report
structured messages (tool_use, tool_result, text, error) in batches
- Add GET /api/daemon/tasks/{id}/messages for catch-up after reconnect
- Add GET /api/issues/{id}/active-task to check for running tasks
- Broadcast task:message events via WebSocket
- Daemon forwards agent session messages with 500ms text throttling
Frontend:
- Add AgentLiveCard component showing live tool calls, text output,
and progress indicators with auto-scroll
- Wire into issue detail timeline with WS subscription and HTTP catch-up
- Card appears when agent is working, disappears on completion/failure
1013 lines
28 KiB
Go
1013 lines
28 KiB
Go
package daemon
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/multica-ai/multica/server/internal/cli"
|
|
"github.com/multica-ai/multica/server/internal/daemon/execenv"
|
|
"github.com/multica-ai/multica/server/internal/daemon/repocache"
|
|
"github.com/multica-ai/multica/server/internal/daemon/usage"
|
|
"github.com/multica-ai/multica/server/pkg/agent"
|
|
)
|
|
|
|
// workspaceState tracks registered runtimes for a single workspace.
|
|
type workspaceState struct {
|
|
workspaceID string
|
|
runtimeIDs []string
|
|
}
|
|
|
|
// Daemon is the local agent runtime that polls for and executes tasks.
|
|
type Daemon struct {
|
|
cfg Config
|
|
client *Client
|
|
repoCache *repocache.Cache
|
|
logger *slog.Logger
|
|
|
|
mu sync.Mutex
|
|
workspaces map[string]*workspaceState
|
|
runtimeIndex map[string]Runtime // runtimeID -> Runtime for provider lookups
|
|
reloading sync.Mutex // prevents concurrent reloadWorkspaces
|
|
}
|
|
|
|
// New creates a new Daemon instance.
|
|
func New(cfg Config, logger *slog.Logger) *Daemon {
|
|
cacheRoot := filepath.Join(cfg.WorkspacesRoot, ".repos")
|
|
return &Daemon{
|
|
cfg: cfg,
|
|
client: NewClient(cfg.ServerBaseURL),
|
|
repoCache: repocache.New(cacheRoot, logger),
|
|
logger: logger,
|
|
workspaces: make(map[string]*workspaceState),
|
|
runtimeIndex: make(map[string]Runtime),
|
|
}
|
|
}
|
|
|
|
// Run starts the daemon: resolves auth, registers runtimes, then polls for tasks.
|
|
func (d *Daemon) Run(ctx context.Context) error {
|
|
// Bind health port early to detect another running daemon.
|
|
healthLn, err := d.listenHealth()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
agentNames := make([]string, 0, len(d.cfg.Agents))
|
|
for name := range d.cfg.Agents {
|
|
agentNames = append(agentNames, name)
|
|
}
|
|
logFields := []any{"agents", agentNames, "server", d.cfg.ServerBaseURL}
|
|
if d.cfg.Profile != "" {
|
|
logFields = append(logFields, "profile", d.cfg.Profile)
|
|
}
|
|
d.logger.Info("starting daemon", logFields...)
|
|
|
|
// Load auth token from CLI config.
|
|
if err := d.resolveAuth(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Load and register watched workspaces.
|
|
if err := d.loadWatchedWorkspaces(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
runtimeIDs := d.allRuntimeIDs()
|
|
if len(runtimeIDs) == 0 {
|
|
return fmt.Errorf("no runtimes registered")
|
|
}
|
|
|
|
// Deregister runtimes on shutdown (uses a fresh context since ctx will be cancelled).
|
|
defer d.deregisterRuntimes()
|
|
|
|
// Start config watcher for hot-reload.
|
|
go d.configWatchLoop(ctx)
|
|
|
|
// Start workspace sync loop to discover newly created workspaces.
|
|
go d.workspaceSyncLoop(ctx)
|
|
|
|
go d.heartbeatLoop(ctx)
|
|
go d.usageScanLoop(ctx)
|
|
go d.serveHealth(ctx, healthLn, time.Now())
|
|
return d.pollLoop(ctx)
|
|
}
|
|
|
|
// deregisterRuntimes notifies the server that all runtimes are going offline.
|
|
func (d *Daemon) deregisterRuntimes() {
|
|
runtimeIDs := d.allRuntimeIDs()
|
|
if len(runtimeIDs) == 0 {
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
if err := d.client.Deregister(ctx, runtimeIDs); err != nil {
|
|
d.logger.Warn("failed to deregister runtimes on shutdown", "error", err)
|
|
} else {
|
|
d.logger.Info("deregistered runtimes", "count", len(runtimeIDs))
|
|
}
|
|
}
|
|
|
|
// resolveAuth loads the auth token from the CLI config for the active profile.
|
|
func (d *Daemon) resolveAuth() error {
|
|
cfg, err := cli.LoadCLIConfigForProfile(d.cfg.Profile)
|
|
if err != nil {
|
|
return fmt.Errorf("load CLI config: %w", err)
|
|
}
|
|
if cfg.Token == "" {
|
|
loginHint := "'multica login'"
|
|
if d.cfg.Profile != "" {
|
|
loginHint = fmt.Sprintf("'multica login --profile %s'", d.cfg.Profile)
|
|
}
|
|
d.logger.Warn("not authenticated — run " + loginHint + " to authenticate, then restart the daemon")
|
|
return fmt.Errorf("not authenticated: run %s first", loginHint)
|
|
}
|
|
d.client.SetToken(cfg.Token)
|
|
d.logger.Info("authenticated")
|
|
return nil
|
|
}
|
|
|
|
// loadWatchedWorkspaces reads watched workspaces from CLI config and registers runtimes.
|
|
func (d *Daemon) loadWatchedWorkspaces(ctx context.Context) error {
|
|
cfg, err := cli.LoadCLIConfigForProfile(d.cfg.Profile)
|
|
if err != nil {
|
|
return fmt.Errorf("load CLI config: %w", err)
|
|
}
|
|
|
|
if len(cfg.WatchedWorkspaces) == 0 {
|
|
return fmt.Errorf("no watched workspaces configured: run 'multica workspace watch <id>' to add one")
|
|
}
|
|
|
|
var registered int
|
|
for _, ws := range cfg.WatchedWorkspaces {
|
|
resp, err := d.registerRuntimesForWorkspace(ctx, ws.ID)
|
|
if err != nil {
|
|
d.logger.Error("failed to register runtimes", "workspace_id", ws.ID, "name", ws.Name, "error", err)
|
|
continue
|
|
}
|
|
runtimeIDs := make([]string, len(resp.Runtimes))
|
|
for i, rt := range resp.Runtimes {
|
|
runtimeIDs[i] = rt.ID
|
|
d.logger.Info("registered runtime", "workspace_id", ws.ID, "runtime_id", rt.ID, "provider", rt.Provider)
|
|
}
|
|
d.mu.Lock()
|
|
d.workspaces[ws.ID] = &workspaceState{workspaceID: ws.ID, runtimeIDs: runtimeIDs}
|
|
for _, rt := range resp.Runtimes {
|
|
d.runtimeIndex[rt.ID] = rt
|
|
}
|
|
d.mu.Unlock()
|
|
|
|
// Sync workspace repos to local cache.
|
|
if d.repoCache != nil && len(resp.Repos) > 0 {
|
|
if err := d.repoCache.Sync(ws.ID, repoDataToInfo(resp.Repos)); err != nil {
|
|
d.logger.Warn("repo cache sync failed", "workspace_id", ws.ID, "error", err)
|
|
}
|
|
}
|
|
|
|
d.logger.Info("watching workspace", "workspace_id", ws.ID, "name", ws.Name, "runtimes", len(resp.Runtimes), "repos", len(resp.Repos))
|
|
registered++
|
|
}
|
|
|
|
if registered == 0 {
|
|
return fmt.Errorf("failed to register runtimes for any of the %d watched workspace(s)", len(cfg.WatchedWorkspaces))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// allRuntimeIDs returns all runtime IDs across all watched workspaces.
|
|
func (d *Daemon) allRuntimeIDs() []string {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
var ids []string
|
|
for _, ws := range d.workspaces {
|
|
ids = append(ids, ws.runtimeIDs...)
|
|
}
|
|
return ids
|
|
}
|
|
|
|
// findRuntime looks up a Runtime by its ID.
|
|
func (d *Daemon) findRuntime(id string) *Runtime {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
if rt, ok := d.runtimeIndex[id]; ok {
|
|
return &rt
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// providerToRuntimeMap returns a mapping from provider name to runtime ID.
|
|
func (d *Daemon) providerToRuntimeMap() map[string]string {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
m := make(map[string]string)
|
|
for id, rt := range d.runtimeIndex {
|
|
m[rt.Provider] = id
|
|
}
|
|
return m
|
|
}
|
|
|
|
func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID string) (*RegisterResponse, error) {
|
|
var runtimes []map[string]string
|
|
for name, entry := range d.cfg.Agents {
|
|
version, err := agent.DetectVersion(ctx, entry.Path)
|
|
if err != nil {
|
|
d.logger.Warn("skip registering runtime", "name", name, "error", err)
|
|
continue
|
|
}
|
|
displayName := strings.ToUpper(name[:1]) + name[1:]
|
|
if d.cfg.DeviceName != "" {
|
|
displayName = fmt.Sprintf("%s (%s)", displayName, d.cfg.DeviceName)
|
|
}
|
|
runtimes = append(runtimes, map[string]string{
|
|
"name": displayName,
|
|
"type": name,
|
|
"version": version,
|
|
"status": "online",
|
|
})
|
|
}
|
|
if len(runtimes) == 0 {
|
|
return nil, fmt.Errorf("no agent runtimes could be registered")
|
|
}
|
|
|
|
req := map[string]any{
|
|
"workspace_id": workspaceID,
|
|
"daemon_id": d.cfg.DaemonID,
|
|
"device_name": d.cfg.DeviceName,
|
|
"runtimes": runtimes,
|
|
}
|
|
|
|
resp, err := d.client.Register(ctx, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("register runtimes: %w", err)
|
|
}
|
|
if len(resp.Runtimes) == 0 {
|
|
return nil, fmt.Errorf("register runtimes: empty response")
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// configWatchLoop periodically checks for config file changes and reloads workspaces.
|
|
func (d *Daemon) configWatchLoop(ctx context.Context) {
|
|
configPath, err := cli.CLIConfigPathForProfile(d.cfg.Profile)
|
|
if err != nil {
|
|
d.logger.Warn("cannot watch config file", "error", err)
|
|
return
|
|
}
|
|
|
|
var lastModTime time.Time
|
|
if info, err := os.Stat(configPath); err == nil {
|
|
lastModTime = info.ModTime()
|
|
}
|
|
|
|
ticker := time.NewTicker(DefaultConfigReloadInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
info, err := os.Stat(configPath)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if !info.ModTime().After(lastModTime) {
|
|
continue
|
|
}
|
|
lastModTime = info.ModTime()
|
|
d.reloadWorkspaces(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// workspaceSyncLoop periodically fetches the user's workspaces from the API
|
|
// and adds any new ones to the CLI config. The configWatchLoop will then
|
|
// detect the config change and register runtimes for the new workspaces.
|
|
func (d *Daemon) workspaceSyncLoop(ctx context.Context) {
|
|
// Run immediately on startup before entering the periodic loop.
|
|
d.syncWorkspacesFromAPI(ctx)
|
|
|
|
ticker := time.NewTicker(DefaultWorkspaceSyncInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
d.syncWorkspacesFromAPI(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// syncWorkspacesFromAPI fetches all workspaces the user belongs to and adds
|
|
// any missing ones to the CLI config's watched list.
|
|
func (d *Daemon) syncWorkspacesFromAPI(ctx context.Context) {
|
|
apiCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
defer cancel()
|
|
|
|
workspaces, err := d.client.ListWorkspaces(apiCtx)
|
|
if err != nil {
|
|
d.logger.Debug("workspace sync: failed to list workspaces", "error", err)
|
|
return
|
|
}
|
|
|
|
cfg, err := cli.LoadCLIConfigForProfile(d.cfg.Profile)
|
|
if err != nil {
|
|
d.logger.Warn("workspace sync: failed to load config", "error", err)
|
|
return
|
|
}
|
|
|
|
var added int
|
|
for _, ws := range workspaces {
|
|
if cfg.AddWatchedWorkspace(ws.ID, ws.Name) {
|
|
added++
|
|
d.logger.Info("workspace sync: discovered new workspace", "workspace_id", ws.ID, "name", ws.Name)
|
|
}
|
|
}
|
|
|
|
if added == 0 {
|
|
return
|
|
}
|
|
|
|
if err := cli.SaveCLIConfigForProfile(cfg, d.cfg.Profile); err != nil {
|
|
d.logger.Warn("workspace sync: failed to save config", "error", err)
|
|
return
|
|
}
|
|
d.logger.Info("workspace sync: added new workspace(s) to config", "count", added)
|
|
}
|
|
|
|
// reloadWorkspaces reconciles the active workspace set with the config file.
|
|
// NOTE: Token changes (e.g. re-login as a different user) are not picked up;
|
|
// the daemon must be restarted for a new auth token to take effect.
|
|
func (d *Daemon) reloadWorkspaces(ctx context.Context) {
|
|
d.reloading.Lock()
|
|
defer d.reloading.Unlock()
|
|
|
|
cfg, err := cli.LoadCLIConfigForProfile(d.cfg.Profile)
|
|
if err != nil {
|
|
d.logger.Warn("reload config failed", "error", err)
|
|
return
|
|
}
|
|
|
|
newIDs := make(map[string]string) // id -> name
|
|
for _, ws := range cfg.WatchedWorkspaces {
|
|
newIDs[ws.ID] = ws.Name
|
|
}
|
|
|
|
d.mu.Lock()
|
|
currentIDs := make(map[string]bool)
|
|
for id := range d.workspaces {
|
|
currentIDs[id] = true
|
|
}
|
|
d.mu.Unlock()
|
|
|
|
// Register runtimes for newly added workspaces.
|
|
for id, name := range newIDs {
|
|
if !currentIDs[id] {
|
|
resp, err := d.registerRuntimesForWorkspace(ctx, id)
|
|
if err != nil {
|
|
d.logger.Error("register runtimes for new workspace failed", "workspace_id", id, "error", err)
|
|
continue
|
|
}
|
|
runtimeIDs := make([]string, len(resp.Runtimes))
|
|
for i, rt := range resp.Runtimes {
|
|
runtimeIDs[i] = rt.ID
|
|
}
|
|
d.mu.Lock()
|
|
d.workspaces[id] = &workspaceState{workspaceID: id, runtimeIDs: runtimeIDs}
|
|
for _, rt := range resp.Runtimes {
|
|
d.runtimeIndex[rt.ID] = rt
|
|
}
|
|
d.mu.Unlock()
|
|
|
|
// Sync workspace repos to local cache.
|
|
if d.repoCache != nil && len(resp.Repos) > 0 {
|
|
if err := d.repoCache.Sync(id, repoDataToInfo(resp.Repos)); err != nil {
|
|
d.logger.Warn("repo cache sync failed", "workspace_id", id, "error", err)
|
|
}
|
|
}
|
|
|
|
d.logger.Info("now watching workspace", "workspace_id", id, "name", name)
|
|
}
|
|
}
|
|
|
|
// Remove workspaces no longer in config.
|
|
// NOTE: runtimes are not deregistered server-side; they will go offline
|
|
// after heartbeats stop arriving (within HeartbeatInterval).
|
|
for id := range currentIDs {
|
|
if _, ok := newIDs[id]; !ok {
|
|
d.mu.Lock()
|
|
if ws, exists := d.workspaces[id]; exists {
|
|
for _, rid := range ws.runtimeIDs {
|
|
delete(d.runtimeIndex, rid)
|
|
}
|
|
}
|
|
delete(d.workspaces, id)
|
|
d.mu.Unlock()
|
|
d.logger.Info("stopped watching workspace", "workspace_id", id)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) heartbeatLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(d.cfg.HeartbeatInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
for _, rid := range d.allRuntimeIDs() {
|
|
resp, err := d.client.SendHeartbeat(ctx, rid)
|
|
if err != nil {
|
|
d.logger.Warn("heartbeat failed", "runtime_id", rid, "error", err)
|
|
continue
|
|
}
|
|
|
|
// Handle pending ping requests.
|
|
if resp.PendingPing != nil {
|
|
rt := d.findRuntime(rid)
|
|
if rt != nil {
|
|
go d.handlePing(ctx, *rt, resp.PendingPing.ID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) {
|
|
d.logger.Info("ping requested", "runtime_id", rt.ID, "ping_id", pingID, "provider", rt.Provider)
|
|
|
|
start := time.Now()
|
|
|
|
entry, ok := d.cfg.Agents[rt.Provider]
|
|
if !ok {
|
|
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
|
"status": "failed",
|
|
"error": fmt.Sprintf("no agent configured for provider %q", rt.Provider),
|
|
"duration_ms": time.Since(start).Milliseconds(),
|
|
})
|
|
return
|
|
}
|
|
|
|
backend, err := agent.New(rt.Provider, agent.Config{
|
|
ExecutablePath: entry.Path,
|
|
Logger: d.logger,
|
|
})
|
|
if err != nil {
|
|
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
|
"status": "failed",
|
|
"error": err.Error(),
|
|
"duration_ms": time.Since(start).Milliseconds(),
|
|
})
|
|
return
|
|
}
|
|
|
|
pingCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
defer cancel()
|
|
|
|
session, err := backend.Execute(pingCtx, "Respond with exactly one word: pong", agent.ExecOptions{
|
|
MaxTurns: 1,
|
|
Timeout: 60 * time.Second,
|
|
})
|
|
if err != nil {
|
|
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
|
"status": "failed",
|
|
"error": err.Error(),
|
|
"duration_ms": time.Since(start).Milliseconds(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Drain messages
|
|
go func() {
|
|
for range session.Messages {
|
|
}
|
|
}()
|
|
|
|
result := <-session.Result
|
|
durationMs := time.Since(start).Milliseconds()
|
|
|
|
if result.Status == "completed" {
|
|
d.logger.Info("ping completed", "runtime_id", rt.ID, "ping_id", pingID, "duration_ms", durationMs)
|
|
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
|
"status": "completed",
|
|
"output": result.Output,
|
|
"duration_ms": durationMs,
|
|
})
|
|
} else {
|
|
errMsg := result.Error
|
|
if errMsg == "" {
|
|
errMsg = fmt.Sprintf("agent returned status: %s", result.Status)
|
|
}
|
|
d.logger.Warn("ping failed", "runtime_id", rt.ID, "ping_id", pingID, "error", errMsg)
|
|
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
|
"status": "failed",
|
|
"error": errMsg,
|
|
"duration_ms": durationMs,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) usageScanLoop(ctx context.Context) {
|
|
scanner := usage.NewScanner(d.logger)
|
|
|
|
report := func() {
|
|
records := scanner.Scan()
|
|
if len(records) == 0 {
|
|
return
|
|
}
|
|
|
|
// Build provider -> runtime ID mapping from current state.
|
|
providerToRuntime := d.providerToRuntimeMap()
|
|
|
|
// Group records by provider to send to the correct runtime.
|
|
byProvider := make(map[string][]map[string]any)
|
|
for _, r := range records {
|
|
byProvider[r.Provider] = append(byProvider[r.Provider], map[string]any{
|
|
"date": r.Date,
|
|
"provider": r.Provider,
|
|
"model": r.Model,
|
|
"input_tokens": r.InputTokens,
|
|
"output_tokens": r.OutputTokens,
|
|
"cache_read_tokens": r.CacheReadTokens,
|
|
"cache_write_tokens": r.CacheWriteTokens,
|
|
})
|
|
}
|
|
|
|
for provider, entries := range byProvider {
|
|
runtimeID, ok := providerToRuntime[provider]
|
|
if !ok {
|
|
d.logger.Debug("no runtime for provider, skipping usage report", "provider", provider)
|
|
continue
|
|
}
|
|
if err := d.client.ReportUsage(ctx, runtimeID, entries); err != nil {
|
|
d.logger.Warn("usage report failed", "provider", provider, "runtime_id", runtimeID, "error", err)
|
|
} else {
|
|
d.logger.Info("usage reported", "provider", provider, "runtime_id", runtimeID, "entries", len(entries))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initial scan on startup.
|
|
report()
|
|
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
report()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) pollLoop(ctx context.Context) error {
|
|
sem := make(chan struct{}, d.cfg.MaxConcurrentTasks)
|
|
var wg sync.WaitGroup
|
|
|
|
pollOffset := 0
|
|
pollCount := 0
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
d.logger.Info("poll loop stopping, waiting for in-flight tasks", "max_wait", "30s")
|
|
waitDone := make(chan struct{})
|
|
go func() { wg.Wait(); close(waitDone) }()
|
|
select {
|
|
case <-waitDone:
|
|
case <-time.After(30 * time.Second):
|
|
d.logger.Warn("timed out waiting for in-flight tasks")
|
|
}
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
runtimeIDs := d.allRuntimeIDs()
|
|
if len(runtimeIDs) == 0 {
|
|
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
|
|
wg.Wait()
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
|
|
claimed := false
|
|
n := len(runtimeIDs)
|
|
for i := 0; i < n; i++ {
|
|
// Check if we have capacity before claiming.
|
|
select {
|
|
case sem <- struct{}{}:
|
|
// Acquired a slot.
|
|
default:
|
|
// All slots occupied, stop trying to claim.
|
|
d.logger.Debug("poll: at capacity", "running", d.cfg.MaxConcurrentTasks)
|
|
goto sleep
|
|
}
|
|
|
|
rid := runtimeIDs[(pollOffset+i)%n]
|
|
task, err := d.client.ClaimTask(ctx, rid)
|
|
if err != nil {
|
|
<-sem // Release the slot.
|
|
d.logger.Warn("claim task failed", "runtime_id", rid, "error", err)
|
|
continue
|
|
}
|
|
if task != nil {
|
|
d.logger.Info("task received", "task", shortID(task.ID), "issue", task.IssueID)
|
|
wg.Add(1)
|
|
go func(t Task) {
|
|
defer wg.Done()
|
|
defer func() { <-sem }()
|
|
d.handleTask(ctx, t)
|
|
}(*task)
|
|
claimed = true
|
|
pollOffset = (pollOffset + i + 1) % n
|
|
break
|
|
}
|
|
// No task for this runtime, release the slot and try next.
|
|
<-sem
|
|
}
|
|
|
|
sleep:
|
|
if !claimed {
|
|
pollCount++
|
|
if pollCount%20 == 1 {
|
|
d.logger.Debug("poll: no tasks", "runtimes", runtimeIDs, "cycle", pollCount)
|
|
}
|
|
pollOffset = (pollOffset + 1) % n
|
|
if err := sleepWithContext(ctx, d.cfg.PollInterval); err != nil {
|
|
wg.Wait()
|
|
return err
|
|
}
|
|
} else {
|
|
pollCount = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) handleTask(ctx context.Context, task Task) {
|
|
d.mu.Lock()
|
|
rt := d.runtimeIndex[task.RuntimeID]
|
|
d.mu.Unlock()
|
|
provider := rt.Provider
|
|
|
|
// Task-scoped logger with short ID for readable concurrent logs.
|
|
taskLog := d.logger.With("task", shortID(task.ID))
|
|
agentName := "agent"
|
|
if task.Agent != nil {
|
|
agentName = task.Agent.Name
|
|
}
|
|
taskLog.Info("picked task", "issue", task.IssueID, "agent", agentName, "provider", provider)
|
|
|
|
if err := d.client.StartTask(ctx, task.ID); err != nil {
|
|
taskLog.Error("start task failed", "error", err)
|
|
if failErr := d.client.FailTask(ctx, task.ID, fmt.Sprintf("start task failed: %s", err.Error())); failErr != nil {
|
|
taskLog.Error("fail task after start error", "error", failErr)
|
|
}
|
|
return
|
|
}
|
|
|
|
_ = d.client.ReportProgress(ctx, task.ID, fmt.Sprintf("Launching %s", provider), 1, 2)
|
|
|
|
result, err := d.runTask(ctx, task, provider, taskLog)
|
|
if err != nil {
|
|
taskLog.Error("task failed", "error", err)
|
|
if failErr := d.client.FailTask(ctx, task.ID, err.Error()); failErr != nil {
|
|
taskLog.Error("fail task callback failed", "error", failErr)
|
|
}
|
|
return
|
|
}
|
|
|
|
_ = d.client.ReportProgress(ctx, task.ID, "Finishing task", 2, 2)
|
|
|
|
// Check if the task was cancelled while it was running (e.g. issue
|
|
// was reassigned). If so, skip reporting results — the server already
|
|
// moved the task to 'cancelled' so complete/fail would fail anyway.
|
|
if status, err := d.client.GetTaskStatus(ctx, task.ID); err == nil && status == "cancelled" {
|
|
taskLog.Info("task cancelled during execution, discarding result")
|
|
return
|
|
}
|
|
|
|
switch result.Status {
|
|
case "blocked":
|
|
if err := d.client.FailTask(ctx, task.ID, result.Comment); err != nil {
|
|
taskLog.Error("report blocked task failed", "error", err)
|
|
}
|
|
default:
|
|
taskLog.Info("task completed", "status", result.Status)
|
|
if err := d.client.CompleteTask(ctx, task.ID, result.Comment, result.BranchName, result.SessionID, result.WorkDir); err != nil {
|
|
taskLog.Error("complete task failed, falling back to fail", "error", err)
|
|
if failErr := d.client.FailTask(ctx, task.ID, fmt.Sprintf("complete task failed: %s", err.Error())); failErr != nil {
|
|
taskLog.Error("fail task fallback also failed", "error", failErr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLog *slog.Logger) (TaskResult, error) {
|
|
entry, ok := d.cfg.Agents[provider]
|
|
if !ok {
|
|
return TaskResult{}, fmt.Errorf("no agent configured for provider %q", provider)
|
|
}
|
|
|
|
agentName := "agent"
|
|
var skills []SkillData
|
|
var instructions string
|
|
if task.Agent != nil {
|
|
agentName = task.Agent.Name
|
|
skills = task.Agent.Skills
|
|
instructions = task.Agent.Instructions
|
|
}
|
|
|
|
// Prepare isolated execution environment.
|
|
// Repos are passed as metadata only — the agent checks them out on demand
|
|
// via `multica repo checkout <url>`.
|
|
taskCtx := execenv.TaskContextForEnv{
|
|
IssueID: task.IssueID,
|
|
AgentName: agentName,
|
|
AgentInstructions: instructions,
|
|
AgentSkills: convertSkillsForEnv(skills),
|
|
Repos: convertReposForEnv(task.Repos),
|
|
}
|
|
|
|
// Try to reuse the workdir from a previous task on the same (agent, issue) pair.
|
|
var env *execenv.Environment
|
|
if task.PriorWorkDir != "" {
|
|
env = execenv.Reuse(task.PriorWorkDir, provider, taskCtx, d.logger)
|
|
}
|
|
if env == nil {
|
|
var err error
|
|
env, err = execenv.Prepare(execenv.PrepareParams{
|
|
WorkspacesRoot: d.cfg.WorkspacesRoot,
|
|
WorkspaceID: task.WorkspaceID,
|
|
TaskID: task.ID,
|
|
AgentName: agentName,
|
|
Provider: provider,
|
|
Task: taskCtx,
|
|
}, d.logger)
|
|
if err != nil {
|
|
return TaskResult{}, fmt.Errorf("prepare execution environment: %w", err)
|
|
}
|
|
}
|
|
|
|
// Inject runtime-specific config (meta skill) so the agent discovers .agent_context/.
|
|
if err := execenv.InjectRuntimeConfig(env.WorkDir, provider, taskCtx); err != nil {
|
|
d.logger.Warn("execenv: inject runtime config failed (non-fatal)", "error", err)
|
|
}
|
|
// NOTE: No cleanup — workdir is preserved for reuse by future tasks on
|
|
// the same (agent, issue) pair. The work_dir path is stored in DB on
|
|
// task completion and passed back via PriorWorkDir on the next claim.
|
|
|
|
prompt := BuildPrompt(task)
|
|
|
|
// Pass the daemon's auth credentials and context so the spawned agent CLI
|
|
// can call the Multica API and the local daemon (e.g. `multica repo checkout`).
|
|
agentEnv := map[string]string{
|
|
"MULTICA_TOKEN": d.client.Token(),
|
|
"MULTICA_SERVER_URL": d.cfg.ServerBaseURL,
|
|
"MULTICA_DAEMON_PORT": fmt.Sprintf("%d", d.cfg.HealthPort),
|
|
"MULTICA_WORKSPACE_ID": task.WorkspaceID,
|
|
"MULTICA_AGENT_NAME": agentName,
|
|
"MULTICA_AGENT_ID": task.AgentID,
|
|
"MULTICA_TASK_ID": task.ID,
|
|
}
|
|
// Point Codex to the per-task CODEX_HOME so it discovers skills natively
|
|
// without polluting the system ~/.codex/skills/.
|
|
if env.CodexHome != "" {
|
|
agentEnv["CODEX_HOME"] = env.CodexHome
|
|
}
|
|
backend, err := agent.New(provider, agent.Config{
|
|
ExecutablePath: entry.Path,
|
|
Env: agentEnv,
|
|
Logger: d.logger,
|
|
})
|
|
if err != nil {
|
|
return TaskResult{}, fmt.Errorf("create agent backend: %w", err)
|
|
}
|
|
|
|
reused := task.PriorWorkDir != "" && env.WorkDir == task.PriorWorkDir
|
|
taskLog.Info("starting agent",
|
|
"provider", provider,
|
|
"workdir", env.WorkDir,
|
|
"model", entry.Model,
|
|
"reused", reused,
|
|
)
|
|
if task.PriorSessionID != "" {
|
|
taskLog.Info("resuming session", "session_id", task.PriorSessionID)
|
|
}
|
|
|
|
taskStart := time.Now()
|
|
|
|
session, err := backend.Execute(ctx, prompt, agent.ExecOptions{
|
|
Cwd: env.WorkDir,
|
|
Model: entry.Model,
|
|
Timeout: d.cfg.AgentTimeout,
|
|
ResumeSessionID: task.PriorSessionID,
|
|
})
|
|
if err != nil {
|
|
return TaskResult{}, err
|
|
}
|
|
|
|
// Drain message channel — forward to server for live output + log locally.
|
|
var toolCount atomic.Int32
|
|
go func() {
|
|
var seq atomic.Int32
|
|
var mu sync.Mutex
|
|
var pendingText strings.Builder
|
|
var batch []TaskMessageData
|
|
|
|
flush := func() {
|
|
mu.Lock()
|
|
// Flush any accumulated text as a single message.
|
|
if pendingText.Len() > 0 {
|
|
s := seq.Add(1)
|
|
batch = append(batch, TaskMessageData{
|
|
Seq: int(s),
|
|
Type: "text",
|
|
Content: pendingText.String(),
|
|
})
|
|
pendingText.Reset()
|
|
}
|
|
toSend := batch
|
|
batch = nil
|
|
mu.Unlock()
|
|
|
|
if len(toSend) > 0 {
|
|
sendCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
if err := d.client.ReportTaskMessages(sendCtx, task.ID, toSend); err != nil {
|
|
taskLog.Debug("failed to report task messages", "error", err)
|
|
}
|
|
cancel()
|
|
}
|
|
}
|
|
|
|
// Periodically flush accumulated text messages.
|
|
ticker := time.NewTicker(500 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
flush()
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for msg := range session.Messages {
|
|
switch msg.Type {
|
|
case agent.MessageToolUse:
|
|
n := toolCount.Add(1)
|
|
taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool))
|
|
s := seq.Add(1)
|
|
mu.Lock()
|
|
batch = append(batch, TaskMessageData{
|
|
Seq: int(s),
|
|
Type: "tool_use",
|
|
Tool: msg.Tool,
|
|
Input: msg.Input,
|
|
})
|
|
mu.Unlock()
|
|
case agent.MessageToolResult:
|
|
s := seq.Add(1)
|
|
// Truncate large tool results for the live feed.
|
|
output := msg.Output
|
|
if len(output) > 8192 {
|
|
output = output[:8192]
|
|
}
|
|
mu.Lock()
|
|
batch = append(batch, TaskMessageData{
|
|
Seq: int(s),
|
|
Type: "tool_result",
|
|
Tool: msg.Tool,
|
|
Output: output,
|
|
})
|
|
mu.Unlock()
|
|
case agent.MessageText:
|
|
if msg.Content != "" {
|
|
taskLog.Debug("agent", "text", truncateLog(msg.Content, 200))
|
|
mu.Lock()
|
|
pendingText.WriteString(msg.Content)
|
|
mu.Unlock()
|
|
}
|
|
case agent.MessageError:
|
|
taskLog.Error("agent error", "content", msg.Content)
|
|
s := seq.Add(1)
|
|
mu.Lock()
|
|
batch = append(batch, TaskMessageData{
|
|
Seq: int(s),
|
|
Type: "error",
|
|
Content: msg.Content,
|
|
})
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
|
|
close(done)
|
|
flush() // Final flush after channel closes.
|
|
}()
|
|
|
|
result := <-session.Result
|
|
elapsed := time.Since(taskStart).Round(time.Second)
|
|
taskLog.Info("agent finished",
|
|
"status", result.Status,
|
|
"duration", elapsed.String(),
|
|
"tools", toolCount.Load(),
|
|
)
|
|
|
|
switch result.Status {
|
|
case "completed":
|
|
if result.Output == "" {
|
|
return TaskResult{}, fmt.Errorf("%s returned empty output", provider)
|
|
}
|
|
return TaskResult{
|
|
Status: "completed",
|
|
Comment: result.Output,
|
|
SessionID: result.SessionID,
|
|
WorkDir: env.WorkDir,
|
|
}, nil
|
|
case "timeout":
|
|
return TaskResult{}, fmt.Errorf("%s timed out after %s", provider, d.cfg.AgentTimeout)
|
|
default:
|
|
errMsg := result.Error
|
|
if errMsg == "" {
|
|
errMsg = fmt.Sprintf("%s execution %s", provider, result.Status)
|
|
}
|
|
return TaskResult{Status: "blocked", Comment: errMsg}, nil
|
|
}
|
|
}
|
|
|
|
// repoDataToInfo converts daemon RepoData to repocache RepoInfo.
|
|
func repoDataToInfo(repos []RepoData) []repocache.RepoInfo {
|
|
info := make([]repocache.RepoInfo, len(repos))
|
|
for i, r := range repos {
|
|
info[i] = repocache.RepoInfo{URL: r.URL, Description: r.Description}
|
|
}
|
|
return info
|
|
}
|
|
|
|
func convertReposForEnv(repos []RepoData) []execenv.RepoContextForEnv {
|
|
if len(repos) == 0 {
|
|
return nil
|
|
}
|
|
result := make([]execenv.RepoContextForEnv, len(repos))
|
|
for i, r := range repos {
|
|
result[i] = execenv.RepoContextForEnv{URL: r.URL, Description: r.Description}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// shortID returns the first 8 characters of an ID for readable logs.
|
|
func shortID(id string) string {
|
|
if len(id) <= 8 {
|
|
return id
|
|
}
|
|
return id[:8]
|
|
}
|
|
|
|
// truncateLog truncates a string to maxLen, appending "…" if truncated.
|
|
// Also collapses newlines to spaces for single-line log output.
|
|
func truncateLog(s string, maxLen int) string {
|
|
s = strings.ReplaceAll(s, "\n", " ")
|
|
s = strings.TrimSpace(s)
|
|
if len(s) <= maxLen {
|
|
return s
|
|
}
|
|
return s[:maxLen] + "…"
|
|
}
|
|
|
|
func convertSkillsForEnv(skills []SkillData) []execenv.SkillContextForEnv {
|
|
if len(skills) == 0 {
|
|
return nil
|
|
}
|
|
result := make([]execenv.SkillContextForEnv, len(skills))
|
|
for i, s := range skills {
|
|
result[i] = execenv.SkillContextForEnv{
|
|
Name: s.Name,
|
|
Content: s.Content,
|
|
}
|
|
for _, f := range s.Files {
|
|
result[i].Files = append(result[i].Files, execenv.SkillFileContextForEnv{
|
|
Path: f.Path,
|
|
Content: f.Content,
|
|
})
|
|
}
|
|
}
|
|
return result
|
|
}
|