Merge pull request #224 from multica-ai/codex/fix-multi-session-chat-sync

fix(chat): stabilize multi-session sync and tool flow
This commit is contained in:
Jiayuan Zhang 2026-02-17 16:05:53 +08:00 committed by GitHub
commit 7387216482
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 354 additions and 268 deletions

View file

@ -194,11 +194,12 @@ interface ElectronAPI {
revokeDevice: (deviceId: string) => Promise<{ ok: boolean }>
onConnectionStateChanged: (callback: (state: string) => void) => void
offConnectionStateChanged: () => void
onDevicesChanged: (callback: () => void) => void
offDevicesChanged: () => void
onInboundMessage: (callback: (event: InboundMessageEvent) => void) => void
offInboundMessage: () => void
}
onDevicesChanged: (callback: () => void) => void
offDevicesChanged: () => void
onInboundMessage: (callback: (event: InboundMessageEvent) => void) => () => void
onConversationsChanged: (callback: () => void) => () => void
offInboundMessage: () => void
}
tools: {
list: () => Promise<ToolInfo[]>
toggle: (name: string) => Promise<unknown>
@ -253,18 +254,18 @@ interface ElectronAPI {
wake: (reason?: string) => Promise<{ ok: boolean; result?: unknown; error?: string }>
}
localChat: {
subscribe: (conversationId: string) => Promise<{ ok?: boolean; error?: string; alreadySubscribed?: boolean }>
unsubscribe: (conversationId: string) => Promise<{ ok: boolean }>
getHistory: (conversationId: string, options?: { offset?: number; limit?: number }) => Promise<{ messages: unknown[]; total: number; offset: number; limit: number; contextWindowTokens?: number }>
subscribe: (conversationId: string) => Promise<{ ok?: boolean; error?: string; alreadySubscribed?: boolean; token?: number; isRunning?: boolean }>
unsubscribe: (conversationId: string, token?: number) => Promise<{ ok: boolean; skipped?: boolean; alreadyUnsubscribed?: boolean }>
getHistory: (conversationId: string, options?: { offset?: number; limit?: number }) => Promise<{ messages: unknown[]; total: number; offset: number; limit: number; contextWindowTokens?: number; isRunning?: boolean }>
send: (conversationId: string, content: string) => Promise<{ ok?: boolean; error?: string }>
abort: (conversationId: string) => Promise<{ ok?: boolean; error?: string }>
resolveExecApproval: (approvalId: string, decision: string) => Promise<{ ok: boolean }>
onEvent: (callback: (event: LocalChatEvent) => void) => void
onEvent: (callback: (event: LocalChatEvent) => void) => () => void
offEvent: () => void
onApproval: (callback: (approval: LocalChatApproval) => void) => void
onApproval: (callback: (approval: LocalChatApproval) => void) => () => void
offApproval: () => void
}
}
}
// Used in Renderer process, expose in `preload.ts`
interface Window {

View file

@ -13,9 +13,21 @@ let hub: Hub | null = null
let defaultConversationId: string | null = null
let mainWindowRef: BrowserWindow | null = null
function isConversationBusy(conversation: AsyncAgent): boolean {
return conversation.isRunning
|| conversation.isStreaming
|| conversation.hasQueuedMessages()
|| conversation.getPendingWrites() > 0
}
interface IpcAgentSubscription {
token: number
unsubscribe: () => void
}
// Track which agents have active IPC subscriptions (for local direct chat)
// Value is the unsubscribe function returned by agent.subscribe()
const ipcAgentSubscriptions = new Map<string, () => void>()
const ipcAgentSubscriptions = new Map<string, IpcAgentSubscription>()
let nextIpcSubscriptionToken = 1
// Resolve gateway URL: GATEWAY_URL env > MAIN_VITE_GATEWAY_URL (.env file)
const gatewayUrl =
@ -258,9 +270,19 @@ export function registerHubIpcHandlers(): void {
}
const logicalAgentId = h.getConversationAgentId(conversationId) ?? conversationId
// Already subscribed?
if (ipcAgentSubscriptions.has(conversationId)) {
return { ok: true, alreadySubscribed: true }
const existingSubscription = ipcAgentSubscriptions.get(conversationId)
if (existingSubscription) {
const refreshedToken = nextIpcSubscriptionToken++
ipcAgentSubscriptions.set(conversationId, {
token: refreshedToken,
unsubscribe: existingSubscription.unsubscribe,
})
return {
ok: true,
alreadySubscribed: true,
token: refreshedToken,
isRunning: isConversationBusy(conversation),
}
}
// Track current stream ID for message grouping
@ -318,7 +340,8 @@ export function registerHubIpcHandlers(): void {
}
})
ipcAgentSubscriptions.set(conversationId, unsubscribe)
const token = nextIpcSubscriptionToken++
ipcAgentSubscriptions.set(conversationId, { token, unsubscribe })
// Register local approval handler so exec approval requests route via IPC
h.setLocalApprovalHandler(conversationId, (payload) => {
@ -329,17 +352,24 @@ export function registerHubIpcHandlers(): void {
safeLog(`[IPC] Local chat subscribed to conversation: ${conversationId}`)
return { ok: true }
return { ok: true, token, isRunning: isConversationBusy(conversation) }
})
/**
* Unsubscribe from local agent events.
*/
ipcMain.handle('localChat:unsubscribe', async (_event, conversationId: string) => {
const unsubscribe = ipcAgentSubscriptions.get(conversationId)
if (unsubscribe) {
unsubscribe()
ipcMain.handle('localChat:unsubscribe', async (_event, conversationId: string, token?: number) => {
const subscription = ipcAgentSubscriptions.get(conversationId)
if (!subscription) {
return { ok: true, alreadyUnsubscribed: true }
}
if (typeof token === 'number' && token !== subscription.token) {
safeLog(`[IPC] Skip stale local chat unsubscribe: conversation=${conversationId}, token=${token}`)
return { ok: true, skipped: true }
}
subscription.unsubscribe()
ipcAgentSubscriptions.delete(conversationId)
getHub().removeLocalApprovalHandler(conversationId)
safeLog(`[IPC] Local chat unsubscribed from conversation: ${conversationId}`)
@ -362,21 +392,36 @@ export function registerHubIpcHandlers(): void {
const h = getHub()
const agent = h.getConversation(conversationId)
if (!agent) {
return { messages: [], total: 0, offset: 0, limit: 0, contextWindowTokens: undefined }
return {
messages: [],
total: 0,
offset: 0,
limit: 0,
contextWindowTokens: undefined,
isRunning: false,
}
}
try {
await agent.ensureInitialized()
const allMessages = agent.loadSessionMessagesForDisplay()
const contextWindowTokens = agent.getContextWindowTokens()
const isRunning = isConversationBusy(agent)
const total = allMessages.length
// Must match DEFAULT_MESSAGES_LIMIT from @multica/sdk/actions/rpc
const limit = options?.limit ?? 200
const offset = options?.offset ?? Math.max(0, total - limit)
const sliced = allMessages.slice(offset, offset + limit)
return { messages: sliced, total, offset, limit, contextWindowTokens }
return { messages: sliced, total, offset, limit, contextWindowTokens, isRunning }
} catch {
return { messages: [], total: 0, offset: 0, limit: 0, contextWindowTokens: undefined }
return {
messages: [],
total: 0,
offset: 0,
limit: 0,
contextWindowTokens: undefined,
isRunning: false,
}
}
})
@ -528,6 +573,13 @@ export function setupDeviceConfirmation(mainWindow: Electron.BrowserWindow): voi
mainWindow.webContents.send('hub:inbound-message', event)
}
})
// Forward conversation list changes (e.g. created from Telegram / RPC).
h.onConversationsChanged(() => {
if (!mainWindow.isDestroyed()) {
mainWindow.webContents.send('hub:conversations-changed')
}
})
}
/**
@ -535,8 +587,8 @@ export function setupDeviceConfirmation(mainWindow: Electron.BrowserWindow): voi
*/
export function cleanupHub(): void {
// Unsubscribe all IPC listeners
for (const unsubscribe of ipcAgentSubscriptions.values()) {
unsubscribe()
for (const subscription of ipcAgentSubscriptions.values()) {
subscription.unsubscribe()
}
ipcAgentSubscriptions.clear()

View file

@ -207,13 +207,28 @@ const electronAPI = {
offDevicesChanged: () => {
ipcRenderer.removeAllListeners('hub:devices-changed')
},
onInboundMessage: (callback: (event: InboundMessageEvent) => void) => {
ipcRenderer.on('hub:inbound-message', (_event, data: InboundMessageEvent) => callback(data))
},
offInboundMessage: () => {
ipcRenderer.removeAllListeners('hub:inbound-message')
},
},
onInboundMessage: (callback: (event: InboundMessageEvent) => void) => {
const listener = (_event: Electron.IpcRendererEvent, data: InboundMessageEvent): void => {
callback(data)
}
ipcRenderer.on('hub:inbound-message', listener)
return (): void => {
ipcRenderer.removeListener('hub:inbound-message', listener)
}
},
onConversationsChanged: (callback: () => void) => {
const listener = (): void => {
callback()
}
ipcRenderer.on('hub:conversations-changed', listener)
return (): void => {
ipcRenderer.removeListener('hub:conversations-changed', listener)
}
},
offInboundMessage: () => {
ipcRenderer.removeAllListeners('hub:inbound-message')
},
},
// Tools management
tools: {
@ -334,7 +349,7 @@ const electronAPI = {
/** Subscribe to conversation events for local direct chat */
subscribe: (conversationId: string) => ipcRenderer.invoke('localChat:subscribe', conversationId),
/** Unsubscribe from conversation events */
unsubscribe: (conversationId: string) => ipcRenderer.invoke('localChat:unsubscribe', conversationId),
unsubscribe: (conversationId: string, token?: number) => ipcRenderer.invoke('localChat:unsubscribe', conversationId, token),
/** Get message history for local chat with pagination (returns raw AgentMessageItem[]) */
getHistory: (conversationId: string, options?: { offset?: number; limit?: number }) =>
ipcRenderer.invoke('localChat:getHistory', conversationId, options),
@ -347,18 +362,30 @@ const electronAPI = {
/** Resolve an exec approval request */
resolveExecApproval: (approvalId: string, decision: string) =>
ipcRenderer.invoke('localChat:resolveExecApproval', approvalId, decision),
/** Listen for agent events */
onEvent: (callback: (event: LocalChatEvent) => void) => {
ipcRenderer.on('localChat:event', (_event, data: LocalChatEvent) => callback(data))
},
/** Listen for agent events */
onEvent: (callback: (event: LocalChatEvent) => void) => {
const listener = (_event: Electron.IpcRendererEvent, data: LocalChatEvent): void => {
callback(data)
}
ipcRenderer.on('localChat:event', listener)
return (): void => {
ipcRenderer.removeListener('localChat:event', listener)
}
},
/** Remove event listener */
offEvent: () => {
ipcRenderer.removeAllListeners('localChat:event')
},
/** Listen for exec approval requests */
onApproval: (callback: (approval: LocalChatApproval) => void) => {
ipcRenderer.on('localChat:approval', (_event, data: LocalChatApproval) => callback(data))
},
/** Listen for exec approval requests */
onApproval: (callback: (approval: LocalChatApproval) => void) => {
const listener = (_event: Electron.IpcRendererEvent, data: LocalChatApproval): void => {
callback(data)
}
ipcRenderer.on('localChat:approval', listener)
return (): void => {
ipcRenderer.removeListener('localChat:approval', listener)
}
},
/** Remove approval listener */
offApproval: () => {
ipcRenderer.removeAllListeners('localChat:approval')

View file

@ -15,10 +15,34 @@ export interface QueuedLocalMessage {
createdAt: number
}
interface QueuedInboundMessage {
content: string
agentId: string
conversationId: string
source: MessageSource
}
interface UseLocalChatOptions {
conversationId?: string
}
interface LocalChatSubscribeResult {
ok?: boolean
error?: string
alreadySubscribed?: boolean
token?: number
isRunning?: boolean
}
interface LocalChatHistoryResult {
messages?: AgentMessageItem[]
total: number
offset: number
limit: number
contextWindowTokens?: number
isRunning?: boolean
}
function makeQueueId(): string {
return globalThis.crypto?.randomUUID?.() ?? `queued-${Date.now()}-${Math.random().toString(36).slice(2, 10)}`
}
@ -35,6 +59,7 @@ export function useLocalChat(options: UseLocalChatOptions = {}) {
const [isLoadingMore, setIsLoadingMore] = useState(false)
const isLoadingMoreRef = useRef(false)
const [queuedMessages, setQueuedMessages] = useState<QueuedLocalMessage[]>([])
const [queuedInboundMessages, setQueuedInboundMessages] = useState<QueuedInboundMessage[]>([])
const [initError, setInitError] = useState<string | null>(null)
const initRef = useRef(false)
const offsetRef = useRef<number | null>(null)
@ -68,20 +93,31 @@ export function useLocalChat(options: UseLocalChatOptions = {}) {
// Subscribe to events + fetch history once conversation is available
useEffect(() => {
if (!activeConversationId) return
let disposed = false
setQueuedMessages([])
setQueuedInboundMessages([])
offsetRef.current = null
setIsLoading(false)
setIsLoadingHistory(true)
chatRef.current.reset()
// Subscribe to agent events
window.electronAPI.localChat.subscribe(activeConversationId).catch(() => {})
const subscribePromise = window.electronAPI.localChat.subscribe(activeConversationId)
.then((result) => {
const typed = result as LocalChatSubscribeResult
if (!disposed && typed.isRunning) {
setIsLoading(true)
}
return typed
})
.catch(() => null)
// Listen for stream events
window.electronAPI.localChat.onEvent((data) => {
const unsubscribeEvent = window.electronAPI.localChat.onEvent((data) => {
// Cast IPC event to StreamPayload (same shape: { agentId, streamId, event })
const payload = data as unknown as StreamPayload
if (!payload.event) return
if (payload.conversationId !== activeConversationId) return
// Handle agent error events
if (payload.event.type === 'agent_error') {
@ -111,21 +147,33 @@ export function useLocalChat(options: UseLocalChatOptions = {}) {
})
// Listen for exec approval requests
window.electronAPI.localChat.onApproval((approval) => {
const unsubscribeApproval = window.electronAPI.localChat.onApproval((approval) => {
if (approval.conversationId !== activeConversationId) return
chatRef.current.addApproval(approval as ExecApprovalRequestPayload)
})
// Listen for inbound messages from all sources (gateway, channel)
// This allows the local UI to display messages from other sources
window.electronAPI.hub.onInboundMessage((event: InboundMessageEvent) => {
const unsubscribeInbound = window.electronAPI.hub.onInboundMessage((event: InboundMessageEvent) => {
const eventConversationId = event.conversationId
// Only add non-local messages (local messages are added by sendMessage)
if (event.source.type !== 'local' && eventConversationId === activeConversationId) {
const queuedInbound: QueuedInboundMessage = {
content: event.content,
agentId: event.agentId,
conversationId: eventConversationId,
source: event.source as MessageSource,
}
if (isLoadingRef.current) {
setQueuedInboundMessages((prev) => [...prev, queuedInbound])
return
}
chatRef.current.addUserMessage(
event.content,
event.agentId,
eventConversationId,
event.source as MessageSource,
queuedInbound.content,
queuedInbound.agentId,
queuedInbound.conversationId,
queuedInbound.source,
)
setIsLoading(true)
}
@ -136,6 +184,11 @@ export function useLocalChat(options: UseLocalChatOptions = {}) {
limit: DEFAULT_MESSAGES_LIMIT,
})
.then((result) => {
if (disposed) return
const typed = result as LocalChatHistoryResult
if (typed.isRunning) {
setIsLoading(true)
}
console.log('[LocalChat] getHistory result:', result.messages?.length, 'messages, total:', result.total)
if (result.messages?.length) {
chatRef.current.setHistory(result.messages as AgentMessageItem[], activeConversationId, activeConversationId, {
@ -153,13 +206,23 @@ export function useLocalChat(options: UseLocalChatOptions = {}) {
}
})
.catch(() => {})
.finally(() => setIsLoadingHistory(false))
.finally(() => {
if (!disposed) {
setIsLoadingHistory(false)
}
})
return () => {
window.electronAPI.localChat.offEvent()
window.electronAPI.localChat.offApproval()
window.electronAPI.hub.offInboundMessage()
window.electronAPI.localChat.unsubscribe(activeConversationId).catch(() => {})
disposed = true
unsubscribeEvent?.()
unsubscribeApproval?.()
unsubscribeInbound?.()
void subscribePromise
.then((result) => {
if (typeof result?.token !== 'number') return
return window.electronAPI.localChat.unsubscribe(activeConversationId, result.token)
})
.catch(() => {})
}
}, [activeConversationId])
@ -213,12 +276,28 @@ export function useLocalChat(options: UseLocalChatOptions = {}) {
}, [])
useEffect(() => {
if (!activeConversationId || isLoading || queuedMessages.length === 0) return
const next = queuedMessages[0]
if (!next) return
if (!activeConversationId || isLoading) return
// Inbound channel/gateway messages are already queued in backend.
// Render them first to keep frontend ordering aligned with agent run order.
const nextInbound = queuedInboundMessages[0]
if (nextInbound) {
setQueuedInboundMessages((prev) => prev.slice(1))
chatRef.current.addUserMessage(
nextInbound.content,
nextInbound.agentId,
nextInbound.conversationId,
nextInbound.source,
)
setIsLoading(true)
return
}
const nextLocal = queuedMessages[0]
if (!nextLocal) return
setQueuedMessages((prev) => prev.slice(1))
dispatchMessageNow(next.text)
}, [activeConversationId, isLoading, queuedMessages, dispatchMessageNow])
dispatchMessageNow(nextLocal.text)
}, [activeConversationId, isLoading, queuedInboundMessages, queuedMessages, dispatchMessageNow])
const abortGeneration = useCallback(() => {
if (!activeConversationId) return

View file

@ -63,6 +63,12 @@ export const useHubStore = create<HubStore>()((set, get) => ({
: prev.hubInfo,
}))
})
// Refresh conversation list when backend conversations change
// (e.g. Telegram / RPC creates a new session).
window.electronAPI.hub.onConversationsChanged(() => {
void get().refresh()
})
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
set({ error: message })

View file

@ -106,6 +106,8 @@ const VERIFY_TIMEOUT_MS = 30_000;
const WELCOME_RPC_TIMEOUT_MS = 20_000;
const WELCOME_COOLDOWN_MS = 15_000;
const TYPING_TIMEOUT_MS = 60_000;
const INBOUND_MESSAGE_DEDUP_WINDOW_MS = 2 * 60_000;
const INBOUND_MESSAGE_DEDUP_MAX_ENTRIES = 10_000;
const MAX_CHARS_PER_MESSAGE = 4000; // Telegram limit is 4096; leave room for HTML overhead
const REPLY_CONTEXT_MAX_CHARS = 300; // Max chars of quoted text when user replies to a message
@ -195,6 +197,8 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
private readonly messageContexts = new MessageContextQueue();
/** Deduplicate welcome sends when Telegram replays updates in a short window */
private welcomeSentAt = new Map<string, number>();
/** Deduplicate inbound Telegram message updates by chatId:messageId */
private inboundMessageSeenAt = new Map<string, number>();
private readonly logger = new Logger(TelegramService.name);
@ -725,6 +729,10 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
private async handleTextMessage(ctx: Context): Promise<void> {
const msg = ctx.message;
if (!msg || !msg.text) return;
if (!this.shouldHandleInboundMessage(msg.chat.id, msg.message_id)) {
this.logger.debug(`Skipped duplicate inbound text update: chatId=${msg.chat.id} messageId=${msg.message_id}`);
return;
}
const telegramUserId = String(msg.from?.id);
const text = msg.text.trim();
@ -940,6 +948,10 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
private async handleMediaMessage(ctx: Context, media: MediaAttachment): Promise<void> {
const msg = ctx.message;
if (!msg) return;
if (!this.shouldHandleInboundMessage(msg.chat.id, msg.message_id)) {
this.logger.debug(`Skipped duplicate inbound media update: chatId=${msg.chat.id} messageId=${msg.message_id}`);
return;
}
const telegramUserId = String(msg.from?.id);
const caption = (msg as any).caption as string | undefined;
@ -1418,6 +1430,44 @@ export class TelegramService implements OnModuleInit, OnModuleDestroy {
return true;
}
private shouldHandleInboundMessage(chatId: number, messageId: number): boolean {
const now = Date.now();
const key = `${chatId}:${messageId}`;
const seenAt = this.inboundMessageSeenAt.get(key);
if (seenAt && now - seenAt < INBOUND_MESSAGE_DEDUP_WINDOW_MS) {
return false;
}
this.inboundMessageSeenAt.set(key, now);
if (this.inboundMessageSeenAt.size <= INBOUND_MESSAGE_DEDUP_MAX_ENTRIES) {
return true;
}
// Opportunistic pruning to keep memory bounded.
for (const [candidate, ts] of this.inboundMessageSeenAt.entries()) {
if (now - ts > INBOUND_MESSAGE_DEDUP_WINDOW_MS) {
this.inboundMessageSeenAt.delete(candidate);
}
}
if (this.inboundMessageSeenAt.size <= INBOUND_MESSAGE_DEDUP_MAX_ENTRIES) {
return true;
}
const overflow = this.inboundMessageSeenAt.size - INBOUND_MESSAGE_DEDUP_MAX_ENTRIES;
if (overflow > 0) {
let removed = 0;
for (const candidate of this.inboundMessageSeenAt.keys()) {
this.inboundMessageSeenAt.delete(candidate);
removed += 1;
if (removed >= overflow) break;
}
}
return true;
}
private buildFallbackConnectWelcome(firstName: string | undefined, isReconnect: boolean): string {
const safeName = firstName?.trim() || "there";
if (isReconnect) {

View file

@ -43,13 +43,6 @@ import {
type SystemPromptMode,
} from "./system-prompt/index.js";
import type { AuthProfileFailureReason } from "./auth-profiles/index.js";
import {
analyzeCrossTurnWebFetchNeed,
resolveWebFetchRequirementFromPrompt,
shouldEnforceWebFetchAfterSearch,
summarizeWebToolUsage,
type ToolExecutionRecord,
} from "./web-tools-policy.js";
import {
sanitizeToolCallInputs,
sanitizeToolUseResultPairing,
@ -315,56 +308,6 @@ function formatRunLogToolSummary(tool: string, details: Record<string, unknown>
}
}
function buildWebSearchFetchEnforcementPrompt(params: {
requiredMinFetchSuccess: number;
fetchSuccess: number;
needsFollowupForLatestSearch: boolean;
}): { prompt: string; additionalFetchNeeded: number } {
const additionalFetchNeeded = Math.max(
1,
params.requiredMinFetchSuccess - params.fetchSuccess,
params.needsFollowupForLatestSearch ? 1 : 0,
);
const lines = [
"You used web_search, but web evidence coverage for this turn is still incomplete.",
"Search snippets are incomplete previews and are not sufficient evidence for detailed claims.",
];
if (params.requiredMinFetchSuccess > 1) {
lines.push(
`This task currently requires at least ${params.requiredMinFetchSuccess} successful web_fetch calls.`,
);
}
if (params.needsFollowupForLatestSearch) {
lines.push(
"You performed another successful web_search after your last successful web_fetch. " +
"You must fetch URLs from the latest search results before finalizing.",
);
}
lines.push(
"Before finalizing your answer, you MUST:",
"1) Pick the 1-3 most relevant URLs from the latest successful web_search results.",
`2) Complete at least ${additionalFetchNeeded} additional successful web_fetch call(s).`,
"3) Revise your answer based on fetched page content.",
"If all additional fetch attempts fail, explicitly say so and avoid relying on snippets for specific claims.",
);
return { prompt: lines.join("\n"), additionalFetchNeeded };
}
const CROSS_TURN_WEB_FETCH_ENFORCEMENT_PROMPT = [
"You are about to finalize a web-dependent answer, but no successful web_fetch happened in this turn.",
"Do not rely only on snippets or prior-turn memory for fresh factual claims.",
"Before finalizing your answer, you MUST:",
"1) If relevant URLs are already available in this conversation, call web_fetch on 1-3 of them.",
"2) If no URLs are available, call web_search to find candidates, then web_fetch on 1-3 relevant URLs.",
"3) Revise your answer using fetched page content as primary evidence.",
"If all fetch attempts fail, explicitly report that limitation and avoid specific claims not backed by fetched content.",
].join("\n");
export class Agent {
private readonly agent: PiAgentCore;
private output;
@ -379,7 +322,6 @@ export class Agent {
private readonly stderr: NodeJS.WritableStream;
private readonly runLog: RunLog;
private readonly toolStartTimes = new Map<string, number>();
private currentRunToolExecutions: ToolExecutionRecord[] = [];
private initialized = false;
private allowSkillInstallForCurrentRun = false;
private awaitingSkillInstallConfirmation = false;
@ -791,7 +733,6 @@ export class Agent {
this.currentUserSource = options?.source;
this._isRunning = true;
this._aborted = false;
this.currentRunToolExecutions = [];
if (this._internalRun) {
this.allowSkillInstallForCurrentRun = false;
@ -856,7 +797,6 @@ export class Agent {
// Loop to exhaust all candidate profiles on rotatable errors
while (true) {
const toolExecutionStartIndex = this.currentRunToolExecutions.length;
try {
const llmStart = Date.now();
this.runLog.log("llm_call", {
@ -866,14 +806,6 @@ export class Agent {
messages: this.agent.state.messages.length,
});
await this.agent.prompt(prompt);
await this.enforceWebFetchAfterSearchIfNeeded({
toolExecutionStartIndex,
userPrompt: prompt,
});
await this.enforceCrossTurnWebFetchIfNeeded({
toolExecutionStartIndex,
userPrompt: prompt,
});
this.runLog.log("llm_result", {
duration_ms: Date.now() - llmStart,
});
@ -1008,7 +940,6 @@ export class Agent {
this._lastEventSavedAssistant = undefined;
this.currentUserDisplayPrompt = undefined;
this.currentUserSource = undefined;
this.currentRunToolExecutions = [];
this.runLog.flush().catch(() => {});
}
}
@ -1183,125 +1114,6 @@ export class Agent {
this.session.setApiKey(this.currentApiKey);
}
private async enforceWebFetchAfterSearchIfNeeded(params: {
toolExecutionStartIndex: number;
userPrompt: string;
}): Promise<void> {
if (this._internalRun) return;
const activeTools = new Set(
(this.agent.state.tools ?? []).map((tool) => tool.name.toLowerCase()),
);
const webSearchAvailable = activeTools.has("web_search");
const webFetchAvailable = activeTools.has("web_fetch");
const currentTurnExecutions = this.currentRunToolExecutions.slice(
params.toolExecutionStartIndex,
);
const usage = summarizeWebToolUsage(currentTurnExecutions);
const requirement = resolveWebFetchRequirementFromPrompt(params.userPrompt);
if (
!shouldEnforceWebFetchAfterSearch({
usage,
webSearchAvailable,
webFetchAvailable,
requiredMinFetchSuccess: requirement.requiredMinFetchSuccess,
})
) {
return;
}
const { prompt, additionalFetchNeeded } = buildWebSearchFetchEnforcementPrompt({
requiredMinFetchSuccess: requirement.requiredMinFetchSuccess,
fetchSuccess: usage.fetchSuccess,
needsFollowupForLatestSearch: usage.searchNeedsFollowupFetch,
});
this.runLog.log("web_search_fetch_guard", {
search_calls: usage.searchCalls,
search_success: usage.searchSuccess,
search_with_results: usage.searchSuccessWithResults,
search_needs_followup_fetch: usage.searchNeedsFollowupFetch,
fetch_calls: usage.fetchCalls,
fetch_success: usage.fetchSuccess,
required_min_fetch_success: requirement.requiredMinFetchSuccess,
prompt_suggests_research_depth: requirement.promptSuggestsResearchDepth,
prompt_multi_source_cue: requirement.multiSourceCue,
prompt_explicit_min_fetch: requirement.explicitMinFetchFromPrompt,
});
try {
await this.agent.prompt(prompt);
this.runLog.log("web_search_fetch_guard_applied", {
search_with_results: usage.searchSuccessWithResults,
search_needs_followup_fetch: usage.searchNeedsFollowupFetch,
required_min_fetch_success: requirement.requiredMinFetchSuccess,
additional_fetch_needed: additionalFetchNeeded,
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.runLog.log("web_search_fetch_guard_failed", {
error: message.slice(0, 200),
});
if (this.debug) {
this.stderr.write(`[web-guard] Failed to enforce search->fetch: ${message}\n`);
}
}
}
private async enforceCrossTurnWebFetchIfNeeded(params: {
toolExecutionStartIndex: number;
userPrompt: string;
}): Promise<void> {
if (this._internalRun) return;
const activeTools = new Set(
(this.agent.state.tools ?? []).map((tool) => tool.name.toLowerCase()),
);
const webFetchAvailable = activeTools.has("web_fetch");
const currentTurnExecutions = this.currentRunToolExecutions.slice(
params.toolExecutionStartIndex,
);
const usage = summarizeWebToolUsage(currentTurnExecutions);
const analysis = analyzeCrossTurnWebFetchNeed({
usage,
webFetchAvailable,
userPrompt: params.userPrompt,
assistantText: this.output.state.lastAssistantText ?? "",
});
if (!analysis.shouldEnforce) return;
this.runLog.log("web_cross_turn_fetch_guard", {
fetch_calls: usage.fetchCalls,
fetch_success: usage.fetchSuccess,
explicit_fetch_request: analysis.explicitFetchRequest,
user_provides_url: analysis.userProvidesUrl,
freshness_cue: analysis.freshnessCue,
web_cue: analysis.webCue,
user_needs_fresh_web_evidence: analysis.userNeedsFreshWebEvidence,
user_blocks_web_fetch: analysis.userBlocksWebFetch,
assistant_web_claim_signal: analysis.assistantHasWebClaimSignal,
});
try {
await this.agent.prompt(CROSS_TURN_WEB_FETCH_ENFORCEMENT_PROMPT);
this.runLog.log("web_cross_turn_fetch_guard_applied", {
explicit_fetch_request: analysis.explicitFetchRequest,
user_provides_url: analysis.userProvidesUrl,
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.runLog.log("web_cross_turn_fetch_guard_failed", {
error: message.slice(0, 200),
});
if (this.debug) {
this.stderr.write(`[web-cross-turn-guard] Failed to enforce fetch: ${message}\n`);
}
}
}
private handleRunLogEvent(event: AgentEvent) {
if (event.type === "tool_execution_start") {
const toolName = (event as any).toolName ?? "unknown";
@ -1322,11 +1134,6 @@ export class Agent {
const resultChars = resultText?.length ?? 0;
const details = extractRunLogResultDetails(result);
const isError = inferRunLogToolIsError((event as any).isError, resultText, details);
this.currentRunToolExecutions.push({
toolName,
isError,
details,
});
const toolEndData: Record<string, unknown> = {
tool: toolName,

View file

@ -18,7 +18,7 @@ const ExecSchema = Type.Object({
yieldMs: Type.Optional(
Type.Number({
description:
"Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 10000ms. Set to 0 to disable auto-backgrounding.",
"Auto-background threshold in milliseconds. If command doesn't complete within this time, it runs in background. Default 30000ms when timeoutMs is not set. If timeoutMs is set and yieldMs is omitted, auto-backgrounding is disabled. Set to 0 to disable auto-backgrounding.",
minimum: 0,
}),
),
@ -39,7 +39,7 @@ export type ExecResult = {
processId?: string;
};
const DEFAULT_YIELD_MS = 10000; // Changed from 5000 to 10000
const DEFAULT_YIELD_MS = 30000;
export function createExecTool(
defaultCwd?: string,
@ -49,10 +49,13 @@ export function createExecTool(
name: "exec",
label: "Exec",
description:
"Execute a shell command. If the command doesn't complete within yieldMs (default 10s), it automatically runs in background and returns a process ID with any output collected so far. Use 'process output <id>' to check output, 'process status <id>' to check status, 'process stop <id>' to terminate.",
"Execute a shell command. If the command doesn't complete within yieldMs, it automatically runs in background and returns a process ID with any output collected so far. Default yieldMs is 30s when timeoutMs is not provided. If timeoutMs is provided and yieldMs is omitted, auto-backgrounding is disabled. Use 'process output <id>' to check output, 'process status <id>' to check status, 'process stop <id>' to terminate.",
parameters: ExecSchema,
execute: async (_toolCallId, args, signal, onUpdate) => {
const { command, cwd, timeoutMs, yieldMs = DEFAULT_YIELD_MS } = args as ExecArgs;
const { command, cwd, timeoutMs, yieldMs } = args as ExecArgs;
const effectiveYieldMs = typeof yieldMs === "number"
? yieldMs
: (typeof timeoutMs === "number" && timeoutMs > 0 ? 0 : DEFAULT_YIELD_MS);
const effectiveCwd = cwd || defaultCwd;
// Exec approval: ask for permission before executing
@ -119,7 +122,7 @@ export function createExecTool(
}
// Yield window handling (auto-background)
if (yieldMs > 0) {
if (effectiveYieldMs > 0) {
yieldTimer = setTimeout(() => {
if (yielded) return;
yielded = true;
@ -152,7 +155,7 @@ export function createExecTool(
processId,
},
});
}, yieldMs);
}, effectiveYieldMs);
}
// Note: Output is now collected by process-registry, no local chunk collection needed

View file

@ -93,6 +93,7 @@ export class Hub {
private readonly suppressedStreamAgents = new Set<string>();
private readonly localApprovalHandlers = new Map<string, (payload: ExecApprovalRequest) => void>();
private readonly inboundListeners = new Set<(event: InboundMessageEvent) => void>();
private readonly conversationListeners = new Set<(conversationIds: string[]) => void>();
private readonly rpc: RpcDispatcher;
private readonly approvalManager: ExecApprovalManager;
private readonly heartbeatListeners = new Set<(event: HeartbeatEventPayload) => void>();
@ -523,6 +524,26 @@ export class Hub {
};
}
/** Subscribe to conversation list changes. Returns unsubscribe function. */
onConversationsChanged(callback: (conversationIds: string[]) => void): () => void {
this.conversationListeners.add(callback);
return () => {
this.conversationListeners.delete(callback);
};
}
/** Notify listeners that conversation list changed. */
private emitConversationsChanged(): void {
const conversationIds = this.listConversations();
for (const listener of this.conversationListeners) {
try {
listener([...conversationIds]);
} catch {
// Keep fanout resilient against listener errors.
}
}
}
/** Broadcast an inbound message to all listeners */
broadcastInbound(event: InboundMessageEvent): void {
for (const listener of this.inboundListeners) {
@ -659,6 +680,7 @@ export class Hub {
// Internally consume agent output (AgentEvent stream + error Messages)
void this.consumeAgent(agent);
this.heartbeatRunner?.updateConfig();
this.emitConversationsChanged();
console.log(`[Hub] Conversation created: ${conversationId} (agent: ${targetAgentId})`);
return agent;
@ -1122,6 +1144,7 @@ export class Hub {
this.agentProfiles.delete(resolvedAgentId);
removeAgentRecordById(resolvedAgentId);
this.heartbeatRunner?.updateConfig();
this.emitConversationsChanged();
return closedAny;
}
@ -1147,6 +1170,7 @@ export class Hub {
this.clearAgentIfNoConversation(agentId);
this.heartbeatRunner?.updateConfig();
this.emitConversationsChanged();
return true;
}
@ -1175,6 +1199,7 @@ export class Hub {
}
this.agentMainConversations.clear();
this.agentProfiles.clear();
this.conversationListeners.clear();
this.client.disconnect();
console.log("Hub shut down");
}

View file

@ -315,9 +315,27 @@ export function useChat() {
}
case "message_update": {
const content = extractContent(event);
setMessages((prev) =>
prev.map((m) => (m.id === payload.streamId ? { ...m, content } : m)),
);
setMessages((prev) => {
let found = false;
const updated = prev.map((m) => {
if (m.id !== payload.streamId) return m;
found = true;
return { ...m, content };
});
if (found) return updated;
// Session switches can miss message_start; recover by creating the stream message on first update.
return [
...updated,
{
id: payload.streamId,
role: "assistant",
content,
agentId: payload.agentId,
conversationId,
},
];
});
setStreamingIds((prev) => new Set(prev).add(payload.streamId));
break;
}
case "message_end": {
@ -327,9 +345,13 @@ export function useChat() {
? (event.message as { stopReason?: string })?.stopReason
: undefined;
setMessages((prev) =>
prev.map((m) => {
if (m.id === payload.streamId) return { ...m, content, stopReason };
setMessages((prev) => {
let found = false;
const updated = prev.map((m) => {
if (m.id === payload.streamId) {
found = true;
return { ...m, content, stopReason };
}
if (
m.role === "toolResult"
&& m.toolStatus === "running"
@ -339,8 +361,22 @@ export function useChat() {
return { ...m, toolStatus: "interrupted" as ToolStatus };
}
return m;
}),
);
});
if (found || content.length === 0) return updated;
// Session switches can miss message_start; recover final content from message_end.
return [
...updated,
{
id: payload.streamId,
role: "assistant",
content,
agentId: payload.agentId,
conversationId,
stopReason,
},
];
});
setStreamingIds((prev) => {
const next = new Set(prev);
next.delete(payload.streamId);