diff --git a/packages/core/src/agent/async-agent.test.ts b/packages/core/src/agent/async-agent.test.ts index 26e7cc9d..e9700d03 100644 --- a/packages/core/src/agent/async-agent.test.ts +++ b/packages/core/src/agent/async-agent.test.ts @@ -272,6 +272,18 @@ describe("AsyncAgent internal flow", () => { agent.close(); }); + it("does not persist assistant summary when result text is a NO_REPLY variant", async () => { + runInternalMock.mockResolvedValueOnce({ text: "NO_REPLY.", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).not.toHaveBeenCalled(); + + agent.close(); + }); + it("does not persist assistant summary when result text is empty", async () => { runInternalMock.mockResolvedValueOnce({ text: " ", thinking: undefined, error: undefined }); const agent = new AsyncAgent(); diff --git a/packages/core/src/agent/async-agent.ts b/packages/core/src/agent/async-agent.ts index 30e1f94d..41c33778 100644 --- a/packages/core/src/agent/async-agent.ts +++ b/packages/core/src/agent/async-agent.ts @@ -5,6 +5,7 @@ import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; import type { MulticaEvent } from "./events.js"; import { injectMessageTimestamp } from "./message-timestamp.js"; +import { isSilentReplyText } from "./tokens.js"; const devNull = { write: () => true } as unknown as NodeJS.WritableStream; @@ -31,6 +32,7 @@ export class AsyncAgent { private pendingWrites = 0; private closeCallbacks: Array<() => void> = []; private forwardInternalAssistant = false; + private _lastRunError: string | undefined; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -64,13 +66,19 @@ export class AsyncAgent { this.queue = this.queue .then(async () => { - if (this._closed) return; + if (this._closed) { + console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] write() skipped — agent closed`); + return; + } + console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() starting for message: ${content.slice(0, 80)}`); const result = await this.agent.run(message, { displayPrompt: content }); + console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() completed, error=${result.error ?? "none"}`); // Flush pending session writes so waitForIdle() callers // can safely read session data from disk. await this.agent.flushSession(); // Normal text is delivered via message_end event; only handle errors here if (result.error) { + this._lastRunError = result.error; console.error(`[AsyncAgent] Agent run error: ${result.error}`); this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); // Only emit agent_error for HTTP 401 from the LLM provider so the @@ -83,6 +91,7 @@ export class AsyncAgent { }) .catch((err) => { const message = err instanceof Error ? err.message : String(err); + this._lastRunError = message; console.error(`[AsyncAgent] Agent run exception: ${message}`); this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); // Only emit agent_error for HTTP 401 from the LLM provider so the @@ -120,8 +129,11 @@ export class AsyncAgent { // Internal run errors are for diagnostics only; do not leak to user stream. console.error(`[AsyncAgent] Internal run error: ${result.error}`); } + // Stop forwarding BEFORE persist to avoid double-emitting the same + // assistant message (once from runInternal streaming, once from appendMessage). + this.forwardInternalAssistant = prevForward; // Persist the LLM summary so it remains in parent context for future turns - if (persistResponse && result.text?.trim() && result.text.trim() !== "NO_REPLY") { + if (persistResponse && result.text?.trim() && !isSilentReplyText(result.text)) { this.agent.persistAssistantSummary(result.text.trim()); await this.agent.flushSession(); } @@ -164,6 +176,43 @@ export class AsyncAgent { return this.queue; } + /** Error message from the last run, if it failed. */ + get lastRunError(): string | undefined { + return this._lastRunError; + } + + /** Whether the agent is currently executing a run (normal or internal). */ + get isRunning(): boolean { + return this.agent.isRunning; + } + + /** Whether the underlying LLM is currently streaming a response. */ + get isStreaming(): boolean { + return this.agent.isStreaming; + } + + /** + * Steer the agent mid-run. Bypasses the serial queue and injects a message + * directly into the PiAgentCore steering queue. The message is delivered + * after the current tool execution completes, skipping remaining tool calls. + */ + steer(content: string): void { + this.agent.steer(content); + } + + /** + * Queue a follow-up message for after the current run finishes. + * Delivered only when the agent has no more tool calls or steering messages. + */ + followUp(content: string): void { + this.agent.followUp(content); + } + + /** Whether the underlying PiAgentCore has queued steer/followUp messages. */ + hasQueuedMessages(): boolean { + return this.agent.hasQueuedMessages(); + } + private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean { if (!this.agent.isInternalRun) return true; if (!this.forwardInternalAssistant) return false; diff --git a/packages/core/src/agent/runner.ts b/packages/core/src/agent/runner.ts index beaa4faf..b5ac016f 100644 --- a/packages/core/src/agent/runner.ts +++ b/packages/core/src/agent/runner.ts @@ -91,6 +91,7 @@ export class Agent { // Internal run state private _internalRun = false; + private _isRunning = false; private _runMutex: Promise = Promise.resolve(); private currentUserDisplayPrompt: string | undefined; @@ -304,8 +305,8 @@ export class Agent { const mergedToolsConfig = mergeToolsConfig(profileToolsConfig, options.tools); const profileDir = this.profile?.getProfileDir(); this.toolsOptions = mergedToolsConfig - ? { ...options, tools: mergedToolsConfig, profileDir } - : { ...options, profileDir }; + ? { ...options, tools: mergedToolsConfig, profileDir, provider: this.resolvedProvider } + : { ...options, profileDir, provider: this.resolvedProvider }; const tools = resolveTools(this.toolsOptions); if (this.debug) { @@ -427,6 +428,7 @@ export class Agent { this.refreshAuthState(); this.output.state.lastAssistantText = ""; this.currentUserDisplayPrompt = options?.displayPrompt; + this._isRunning = true; try { // Early validation: check API key before calling PiAgentCore.prompt(), @@ -489,6 +491,7 @@ export class Agent { : undefined; return { text: this.output.state.lastAssistantText, thinking, error: this.agent.state.error }; } finally { + this._isRunning = false; this.currentUserDisplayPrompt = undefined; } } @@ -664,6 +667,40 @@ export class Agent { return this._internalRun; } + /** Whether a run (normal or internal) is currently executing inside _run(). */ + get isRunning(): boolean { + return this._isRunning; + } + + /** Whether the underlying PiAgentCore is currently streaming an LLM response. */ + get isStreaming(): boolean { + return this.agent.state.isStreaming; + } + + /** + * Queue a steering message to interrupt the agent mid-run. + * Delivered after current tool execution, skipping remaining tool calls. + * Safe to call from any context (does not require the run mutex). + */ + steer(content: string): void { + const msg: UserMessage = { role: "user", content, timestamp: Date.now() }; + this.agent.steer(msg); + } + + /** + * Queue a follow-up message for after the current run finishes. + * Delivered only when the agent has no more tool calls or steering messages. + */ + followUp(content: string): void { + const msg: UserMessage = { role: "user", content, timestamp: Date.now() }; + this.agent.followUp(msg); + } + + /** Whether the underlying PiAgentCore has queued steer/followUp messages. */ + hasQueuedMessages(): boolean { + return this.agent.hasQueuedMessages(); + } + /** * Persist a synthetic assistant message into both in-memory state and session JSONL. * Used after an internal run to keep the LLM summary visible in future turns @@ -895,6 +932,8 @@ export class Agent { // Update internal state this.resolvedProvider = providerId; + // Keep toolsOptions.provider in sync so sessions_spawn inherits the current provider + this.toolsOptions = { ...this.toolsOptions, provider: providerId }; // Update session metadata (save original providerId, not alias-resolved) this.session.saveMeta({