feat(agent): expose isRunning and lastRunError on Agent and AsyncAgent
Add isRunning flag to Agent (runner.ts) for detecting active runs. Add lastRunError to AsyncAgent for propagating child run errors to the registry. Fix duplicate message emission in writeInternal by resetting forwardInternalAssistant before persistAssistantSummary. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
45db13cbdd
commit
299c947893
3 changed files with 104 additions and 4 deletions
|
|
@ -272,6 +272,18 @@ describe("AsyncAgent internal flow", () => {
|
|||
agent.close();
|
||||
});
|
||||
|
||||
it("does not persist assistant summary when result text is a NO_REPLY variant", async () => {
|
||||
runInternalMock.mockResolvedValueOnce({ text: "NO_REPLY.", thinking: undefined, error: undefined });
|
||||
const agent = new AsyncAgent();
|
||||
|
||||
agent.writeInternal("announce findings", { forwardAssistant: true, persistResponse: true });
|
||||
await agent.waitForIdle();
|
||||
|
||||
expect(persistAssistantSummaryMock).not.toHaveBeenCalled();
|
||||
|
||||
agent.close();
|
||||
});
|
||||
|
||||
it("does not persist assistant summary when result text is empty", async () => {
|
||||
runInternalMock.mockResolvedValueOnce({ text: " ", thinking: undefined, error: undefined });
|
||||
const agent = new AsyncAgent();
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import { Channel } from "./channel.js";
|
|||
import type { AgentOptions, Message } from "./types.js";
|
||||
import type { MulticaEvent } from "./events.js";
|
||||
import { injectMessageTimestamp } from "./message-timestamp.js";
|
||||
import { isSilentReplyText } from "./tokens.js";
|
||||
|
||||
const devNull = { write: () => true } as unknown as NodeJS.WritableStream;
|
||||
|
||||
|
|
@ -31,6 +32,7 @@ export class AsyncAgent {
|
|||
private pendingWrites = 0;
|
||||
private closeCallbacks: Array<() => void> = [];
|
||||
private forwardInternalAssistant = false;
|
||||
private _lastRunError: string | undefined;
|
||||
readonly sessionId: string;
|
||||
|
||||
constructor(options?: AgentOptions) {
|
||||
|
|
@ -64,13 +66,19 @@ export class AsyncAgent {
|
|||
|
||||
this.queue = this.queue
|
||||
.then(async () => {
|
||||
if (this._closed) return;
|
||||
if (this._closed) {
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] write() skipped — agent closed`);
|
||||
return;
|
||||
}
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() starting for message: ${content.slice(0, 80)}`);
|
||||
const result = await this.agent.run(message, { displayPrompt: content });
|
||||
console.log(`[AsyncAgent:${this.sessionId.slice(0, 8)}] run() completed, error=${result.error ?? "none"}`);
|
||||
// Flush pending session writes so waitForIdle() callers
|
||||
// can safely read session data from disk.
|
||||
await this.agent.flushSession();
|
||||
// Normal text is delivered via message_end event; only handle errors here
|
||||
if (result.error) {
|
||||
this._lastRunError = result.error;
|
||||
console.error(`[AsyncAgent] Agent run error: ${result.error}`);
|
||||
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
|
||||
// Only emit agent_error for HTTP 401 from the LLM provider so the
|
||||
|
|
@ -83,6 +91,7 @@ export class AsyncAgent {
|
|||
})
|
||||
.catch((err) => {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
this._lastRunError = message;
|
||||
console.error(`[AsyncAgent] Agent run exception: ${message}`);
|
||||
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
|
||||
// Only emit agent_error for HTTP 401 from the LLM provider so the
|
||||
|
|
@ -120,8 +129,11 @@ export class AsyncAgent {
|
|||
// Internal run errors are for diagnostics only; do not leak to user stream.
|
||||
console.error(`[AsyncAgent] Internal run error: ${result.error}`);
|
||||
}
|
||||
// Stop forwarding BEFORE persist to avoid double-emitting the same
|
||||
// assistant message (once from runInternal streaming, once from appendMessage).
|
||||
this.forwardInternalAssistant = prevForward;
|
||||
// Persist the LLM summary so it remains in parent context for future turns
|
||||
if (persistResponse && result.text?.trim() && result.text.trim() !== "NO_REPLY") {
|
||||
if (persistResponse && result.text?.trim() && !isSilentReplyText(result.text)) {
|
||||
this.agent.persistAssistantSummary(result.text.trim());
|
||||
await this.agent.flushSession();
|
||||
}
|
||||
|
|
@ -164,6 +176,43 @@ export class AsyncAgent {
|
|||
return this.queue;
|
||||
}
|
||||
|
||||
/** Error message from the last run, if it failed. */
|
||||
get lastRunError(): string | undefined {
|
||||
return this._lastRunError;
|
||||
}
|
||||
|
||||
/** Whether the agent is currently executing a run (normal or internal). */
|
||||
get isRunning(): boolean {
|
||||
return this.agent.isRunning;
|
||||
}
|
||||
|
||||
/** Whether the underlying LLM is currently streaming a response. */
|
||||
get isStreaming(): boolean {
|
||||
return this.agent.isStreaming;
|
||||
}
|
||||
|
||||
/**
|
||||
* Steer the agent mid-run. Bypasses the serial queue and injects a message
|
||||
* directly into the PiAgentCore steering queue. The message is delivered
|
||||
* after the current tool execution completes, skipping remaining tool calls.
|
||||
*/
|
||||
steer(content: string): void {
|
||||
this.agent.steer(content);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a follow-up message for after the current run finishes.
|
||||
* Delivered only when the agent has no more tool calls or steering messages.
|
||||
*/
|
||||
followUp(content: string): void {
|
||||
this.agent.followUp(content);
|
||||
}
|
||||
|
||||
/** Whether the underlying PiAgentCore has queued steer/followUp messages. */
|
||||
hasQueuedMessages(): boolean {
|
||||
return this.agent.hasQueuedMessages();
|
||||
}
|
||||
|
||||
private shouldForwardEvent(event: AgentEvent | MulticaEvent): boolean {
|
||||
if (!this.agent.isInternalRun) return true;
|
||||
if (!this.forwardInternalAssistant) return false;
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ export class Agent {
|
|||
|
||||
// Internal run state
|
||||
private _internalRun = false;
|
||||
private _isRunning = false;
|
||||
private _runMutex: Promise<void> = Promise.resolve();
|
||||
private currentUserDisplayPrompt: string | undefined;
|
||||
|
||||
|
|
@ -304,8 +305,8 @@ export class Agent {
|
|||
const mergedToolsConfig = mergeToolsConfig(profileToolsConfig, options.tools);
|
||||
const profileDir = this.profile?.getProfileDir();
|
||||
this.toolsOptions = mergedToolsConfig
|
||||
? { ...options, tools: mergedToolsConfig, profileDir }
|
||||
: { ...options, profileDir };
|
||||
? { ...options, tools: mergedToolsConfig, profileDir, provider: this.resolvedProvider }
|
||||
: { ...options, profileDir, provider: this.resolvedProvider };
|
||||
|
||||
const tools = resolveTools(this.toolsOptions);
|
||||
if (this.debug) {
|
||||
|
|
@ -427,6 +428,7 @@ export class Agent {
|
|||
this.refreshAuthState();
|
||||
this.output.state.lastAssistantText = "";
|
||||
this.currentUserDisplayPrompt = options?.displayPrompt;
|
||||
this._isRunning = true;
|
||||
|
||||
try {
|
||||
// Early validation: check API key before calling PiAgentCore.prompt(),
|
||||
|
|
@ -489,6 +491,7 @@ export class Agent {
|
|||
: undefined;
|
||||
return { text: this.output.state.lastAssistantText, thinking, error: this.agent.state.error };
|
||||
} finally {
|
||||
this._isRunning = false;
|
||||
this.currentUserDisplayPrompt = undefined;
|
||||
}
|
||||
}
|
||||
|
|
@ -664,6 +667,40 @@ export class Agent {
|
|||
return this._internalRun;
|
||||
}
|
||||
|
||||
/** Whether a run (normal or internal) is currently executing inside _run(). */
|
||||
get isRunning(): boolean {
|
||||
return this._isRunning;
|
||||
}
|
||||
|
||||
/** Whether the underlying PiAgentCore is currently streaming an LLM response. */
|
||||
get isStreaming(): boolean {
|
||||
return this.agent.state.isStreaming;
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a steering message to interrupt the agent mid-run.
|
||||
* Delivered after current tool execution, skipping remaining tool calls.
|
||||
* Safe to call from any context (does not require the run mutex).
|
||||
*/
|
||||
steer(content: string): void {
|
||||
const msg: UserMessage = { role: "user", content, timestamp: Date.now() };
|
||||
this.agent.steer(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a follow-up message for after the current run finishes.
|
||||
* Delivered only when the agent has no more tool calls or steering messages.
|
||||
*/
|
||||
followUp(content: string): void {
|
||||
const msg: UserMessage = { role: "user", content, timestamp: Date.now() };
|
||||
this.agent.followUp(msg);
|
||||
}
|
||||
|
||||
/** Whether the underlying PiAgentCore has queued steer/followUp messages. */
|
||||
hasQueuedMessages(): boolean {
|
||||
return this.agent.hasQueuedMessages();
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist a synthetic assistant message into both in-memory state and session JSONL.
|
||||
* Used after an internal run to keep the LLM summary visible in future turns
|
||||
|
|
@ -895,6 +932,8 @@ export class Agent {
|
|||
|
||||
// Update internal state
|
||||
this.resolvedProvider = providerId;
|
||||
// Keep toolsOptions.provider in sync so sessions_spawn inherits the current provider
|
||||
this.toolsOptions = { ...this.toolsOptions, provider: providerId };
|
||||
|
||||
// Update session metadata (save original providerId, not alias-resolved)
|
||||
this.session.saveMeta({
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue