From 299c947893e12a51a20fb1cafb22fbb431c83dcd Mon Sep 17 00:00:00 2001 From: Jiang Bohan Date: Wed, 11 Feb 2026 17:09:33 +0800 Subject: [PATCH] feat(agent): expose isRunning and lastRunError on Agent and AsyncAgent Add isRunning flag to Agent (runner.ts) for detecting active runs. Add lastRunError to AsyncAgent for propagating child run errors to the registry. Fix duplicate message emission in writeInternal by resetting forwardInternalAssistant before persistAssistantSummary. Co-Authored-By: Claude Opus 4.6 --- packages/core/src/agent/async-agent.test.ts | 12 +++++ packages/core/src/agent/async-agent.ts | 53 ++++++++++++++++++++- packages/core/src/agent/runner.ts | 43 ++++++++++++++++- 3 files changed, 104 insertions(+), 4 deletions(-) diff --git a/packages/core/src/agent/async-agent.test.ts b/packages/core/src/agent/async-agent.test.ts index 26e7cc9d..e9700d03 100644 --- a/packages/core/src/agent/async-agent.test.ts +++ b/packages/core/src/agent/async-agent.test.ts @@ -272,6 +272,18 @@ describe("AsyncAgent internal flow", () => { agent.close(); }); + it("does not persist assistant summary when result text is a NO_REPLY variant", async () => { + runInternalMock.mockResolvedValueOnce({ text: "NO_REPLY.", thinking: undefined, error: undefined }); + const agent = new AsyncAgent(); + + agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true }); + await agent.waitForIdle(); + + expect(persistAssistantSummaryMock).not.toHaveBeenCalled(); + + agent.close(); + }); + it("does not persist assistant summary when result text is empty", async () => { runInternalMock.mockResolvedValueOnce({ text: " ", thinking: undefined, error: undefined }); const agent = new AsyncAgent(); diff --git a/packages/core/src/agent/async-agent.ts b/packages/core/src/agent/async-agent.ts index 30e1f94d..41c33778 100644 --- a/packages/core/src/agent/async-agent.ts +++ b/packages/core/src/agent/async-agent.ts @@ -5,6 +5,7 @@ import { Channel } from "./channel.js"; import type { AgentOptions, Message } from "./types.js"; import type { MulticaEvent } from "./events.js"; import { injectMessageTimestamp } from "./message-timestamp.js"; +import { isSilentReplyText } from "./tokens.js"; const devNull = { write: () => true } as unknown as NodeJS.WritableStream; @@ -31,6 +32,7 @@ export class AsyncAgent { private pendingWrites = 0; private closeCallbacks: Array<() => void> = []; private forwardInternalAssistant = false; + private _lastRunError: string | undefined; readonly sessionId: string; constructor(options?: AgentOptions) { @@ -64,13 +66,19 @@ export class AsyncAgent { this.queue = this.queue .then(async () => { - if (this._closed) return; + if (this._closed) { + console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] write() skipped — agent closed`); + return; + } + console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() starting for message: ${content.slice(0, 80)}`); const result = await this.agent.run(message, { displayPrompt: content }); + console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() completed, error=${result.error ?? "none"}`); // Flush pending session writes so waitForIdle() callers // can safely read session data from disk. await this.agent.flushSession(); // Normal text is delivered via message_end event; only handle errors here if (result.error) { + this._lastRunError = result.error; console.error(`[AsyncAgent] Agent run error: ${result.error}`); this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` }); // Only emit agent_error for HTTP 401 from the LLM provider so the @@ -83,6 +91,7 @@ export class AsyncAgent { }) .catch((err) => { const message = err instanceof Error ? err.message : String(err); + this._lastRunError = message; console.error(`[AsyncAgent] Agent run exception: ${message}`); this.channel.send({ id: uuidv7(), content: `[error] ${message}` }); // Only emit agent_error for HTTP 401 from the LLM provider so the @@ -120,8 +129,11 @@ export class AsyncAgent { // Internal run errors are for diagnostics only; do not leak to user stream. console.error(`[AsyncAgent] Internal run error: ${result.error}`); } + // Stop forwarding BEFORE persist to avoid double-emitting the same + // assistant message (once from runInternal streaming, once from appendMessage). + this.forwardInternalAssistant = prevForward; // Persist the LLM summary so it remains in parent context for future turns - if (persistResponse && result.text?.trim() && result.text.trim() !== "NO_REPLY") { + if (persistResponse && result.text?.trim() && !isSilentReplyText(result.text)) { this.agent.persistAssistantSummary(result.text.trim()); await this.agent.flushSession(); } @@ -164,6 +176,43 @@ export class AsyncAgent { return this.queue; } + /** Error message from the last run, if it failed. */ + get lastRunError(): string | undefined { + return this._lastRunError; + } + + /** Whether the agent is currently executing a run (normal or internal). */ + get isRunning(): boolean { + return this.agent.isRunning; + } + + /** Whether the underlying LLM is currently streaming a response. */ + get isStreaming(): boolean { + return this.agent.isStreaming; + } + + /** + * Steer the agent mid-run. Bypasses the serial queue and injects a message + * directly into the PiAgentCore steering queue. The message is delivered + * after the current tool execution completes, skipping remaining tool calls. + */ + steer(content: string): void { + this.agent.steer(content); + } + + /** + * Queue a follow-up message for after the current run finishes. + * Delivered only when the agent has no more tool calls or steering messages. + */ + followUp(content: string): void { + this.agent.followUp(content); + } + + /** Whether the underlying PiAgentCore has queued steer/followUp messages. */ + hasQueuedMessages(): boolean { + return this.agent.hasQueuedMessages(); + } + private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean { if (!this.agent.isInternalRun) return true; if (!this.forwardInternalAssistant) return false; diff --git a/packages/core/src/agent/runner.ts b/packages/core/src/agent/runner.ts index beaa4faf..b5ac016f 100644 --- a/packages/core/src/agent/runner.ts +++ b/packages/core/src/agent/runner.ts @@ -91,6 +91,7 @@ export class Agent { // Internal run state private _internalRun = false; + private _isRunning = false; private _runMutex: Promise = Promise.resolve(); private currentUserDisplayPrompt: string | undefined; @@ -304,8 +305,8 @@ export class Agent { const mergedToolsConfig = mergeToolsConfig(profileToolsConfig, options.tools); const profileDir = this.profile?.getProfileDir(); this.toolsOptions = mergedToolsConfig - ? { ...options, tools: mergedToolsConfig, profileDir } - : { ...options, profileDir }; + ? { ...options, tools: mergedToolsConfig, profileDir, provider: this.resolvedProvider } + : { ...options, profileDir, provider: this.resolvedProvider }; const tools = resolveTools(this.toolsOptions); if (this.debug) { @@ -427,6 +428,7 @@ export class Agent { this.refreshAuthState(); this.output.state.lastAssistantText = ""; this.currentUserDisplayPrompt = options?.displayPrompt; + this._isRunning = true; try { // Early validation: check API key before calling PiAgentCore.prompt(), @@ -489,6 +491,7 @@ export class Agent { : undefined; return { text: this.output.state.lastAssistantText, thinking, error: this.agent.state.error }; } finally { + this._isRunning = false; this.currentUserDisplayPrompt = undefined; } } @@ -664,6 +667,40 @@ export class Agent { return this._internalRun; } + /** Whether a run (normal or internal) is currently executing inside _run(). */ + get isRunning(): boolean { + return this._isRunning; + } + + /** Whether the underlying PiAgentCore is currently streaming an LLM response. */ + get isStreaming(): boolean { + return this.agent.state.isStreaming; + } + + /** + * Queue a steering message to interrupt the agent mid-run. + * Delivered after current tool execution, skipping remaining tool calls. + * Safe to call from any context (does not require the run mutex). + */ + steer(content: string): void { + const msg: UserMessage = { role: "user", content, timestamp: Date.now() }; + this.agent.steer(msg); + } + + /** + * Queue a follow-up message for after the current run finishes. + * Delivered only when the agent has no more tool calls or steering messages. + */ + followUp(content: string): void { + const msg: UserMessage = { role: "user", content, timestamp: Date.now() }; + this.agent.followUp(msg); + } + + /** Whether the underlying PiAgentCore has queued steer/followUp messages. */ + hasQueuedMessages(): boolean { + return this.agent.hasQueuedMessages(); + } + /** * Persist a synthetic assistant message into both in-memory state and session JSONL. * Used after an internal run to keep the LLM summary visible in future turns @@ -895,6 +932,8 @@ export class Agent { // Update internal state this.resolvedProvider = providerId; + // Keep toolsOptions.provider in sync so sessions_spawn inherits the current provider + this.toolsOptions = { ...this.toolsOptions, provider: providerId }; // Update session metadata (save original providerId, not alias-resolved) this.session.saveMeta({