Merge remote-tracking branch 'origin/main' into feat/telegram-channel

# Conflicts:
#	apps/desktop/src/hooks/use-local-chat.ts
#	packages/sdk/src/actions/stream.ts
#	packages/ui/src/components/chat-view.tsx
#	src/agent/async-agent.ts
#	src/agent/events.ts
This commit is contained in:
Naiyuan Qing 2026-02-09 14:28:06 +08:00
commit 0895d42d3b
21 changed files with 462 additions and 60 deletions

View file

@ -4,8 +4,8 @@ import { AsyncAgent } from "./async-agent.js";
const subscribeCallbacks: Array<(event: any) => void> = [];
const internalRunState = { value: false };
const runMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined }));
const runInternalMock = vi.fn(async () => ({ text: "", thinking: undefined, error: undefined }));
const runMock = vi.fn(async (_prompt: string) => ({ text: "", thinking: undefined, error: undefined as string | undefined }));
const runInternalMock = vi.fn(async (_prompt: string) => ({ text: "", thinking: undefined, error: undefined as string | undefined }));
const flushSessionMock = vi.fn(async () => {});
const persistAssistantSummaryMock = vi.fn();
const subscribeAllMock = vi.fn((fn: (event: any) => void) => {
@ -80,6 +80,8 @@ async function nextWithTimeout<T>(iter: AsyncIterator<T>, timeoutMs = 40): Promi
}
describe("AsyncAgent internal flow", () => {
const originalTz = process.env.TZ;
afterEach(() => {
subscribeCallbacks.length = 0;
internalRunState.value = false;
@ -91,6 +93,35 @@ describe("AsyncAgent internal flow", () => {
runMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined });
runInternalMock.mockResolvedValue({ text: "", thinking: undefined, error: undefined });
flushSessionMock.mockResolvedValue(undefined);
vi.useRealTimers();
process.env.TZ = originalTz;
});
it("injects a timestamp prefix into external user writes", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-01-29T01:30:00.000Z"));
process.env.TZ = "America/New_York";
const agent = new AsyncAgent();
agent.write("recent news");
await agent.waitForIdle();
expect(runMock).toHaveBeenCalledTimes(1);
const [message] = runMock.mock.calls[0] ?? [];
expect(message).toMatch(/^\[Wed 2026-01-28 20:30 EST\] recent news$/);
agent.close();
});
it("allows disabling timestamp injection per write", async () => {
const agent = new AsyncAgent();
agent.write("raw heartbeat prompt", { injectTimestamp: false });
await agent.waitForIdle();
expect(runMock).toHaveBeenCalledWith("raw heartbeat prompt");
agent.close();
});
it("filters internal events in direct subscribe stream", () => {

View file

@ -4,6 +4,7 @@ import { Agent } from "./runner.js";
import { Channel } from "./channel.js";
import type { AgentOptions, Message } from "./types.js";
import type { MulticaEvent } from "./events.js";
import { injectMessageTimestamp } from "./message-timestamp.js";
const devNull = { write: () => true } as unknown as NodeJS.WritableStream;
@ -17,6 +18,11 @@ export interface WriteInternalOptions {
persistResponse?: boolean | undefined;
}
export interface WriteOptions {
/** Disable automatic message timestamp injection */
injectTimestamp?: boolean | undefined;
}
export class AsyncAgent {
private readonly agent: Agent;
private readonly channel = new Channel<ChannelItem>();
@ -48,19 +54,18 @@ export class AsyncAgent {
}
/** Write message to agent (non-blocking, serialized queue) */
write(content: string): void {
this.enqueue(() => this.agent.run(content));
}
/** Enqueue an agent run, handling errors and session flush */
private enqueue(runFn: () => ReturnType<Agent["run"]>): void {
write(content: string, options?: WriteOptions): void {
if (this._closed) throw new Error("Agent is closed");
this.pendingWrites += 1;
const message =
options?.injectTimestamp === false
? content
: injectMessageTimestamp(content);
this.queue = this.queue
.then(async () => {
if (this._closed) return;
const result = await runFn();
const result = await this.agent.run(message);
// Flush pending session writes so waitForIdle() callers
// can safely read session data from disk.
await this.agent.flushSession();
@ -68,14 +73,15 @@ export class AsyncAgent {
if (result.error) {
console.error(`[AsyncAgent] Agent run error: ${result.error}`);
this.channel.send({ id: uuidv7(), content: `[error] ${result.error}` });
this.agent.emitMulticaEvent({ type: "agent_error", error: result.error });
this.agent.emitError(result.error);
}
})
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
console.error(`[AsyncAgent] Agent run exception: ${message}`);
this.channel.send({ id: uuidv7(), content: `[error] ${message}` });
this.agent.emitMulticaEvent({ type: "agent_error", error: message });
// Also emit through subscriber mechanism so IPC listeners receive the error
this.agent.emitError(message);
})
.finally(() => {
this.pendingWrites = Math.max(0, this.pendingWrites - 1);

View file

@ -21,15 +21,15 @@ export type CompactionEndEvent = {
type: "compaction_end";
removed: number;
kept: number;
tokensRemoved?: number;
tokensKept?: number;
tokensRemoved?: number | undefined;
tokensKept?: number | undefined;
reason: "count" | "tokens" | "summary" | "pruning";
};
/** Emitted when the agent encounters an error (LLM failure, quota exceeded, etc.) */
/** Emitted when an agent encounters an error during execution */
export type AgentErrorEvent = {
type: "agent_error";
error: string;
message: string;
};
/** Union of all Multica-specific events */

View file

@ -0,0 +1,57 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { injectMessageTimestamp, resolveMessageTimezone } from "./message-timestamp.js";
describe("injectMessageTimestamp", () => {
const originalTz = process.env.TZ;
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-01-29T01:30:00.000Z"));
process.env.TZ = "America/New_York";
});
afterEach(() => {
vi.useRealTimers();
process.env.TZ = originalTz;
});
it("prepends a compact timestamp prefix", () => {
const result = injectMessageTimestamp("Is it the weekend?");
expect(result).toMatch(/^\[Wed 2026-01-28 20:30 EST\] Is it the weekend\?$/);
});
it("does not double-stamp already enveloped messages", () => {
const existing = "[Wed 2026-01-28 20:30 EST] hello";
expect(injectMessageTimestamp(existing)).toBe(existing);
});
it("does not stamp cron messages that already include current time lines", () => {
const existing = "Cron run\nCurrent time: Wednesday, January 28th, 2026 — 8:30 PM (America/New_York)";
expect(injectMessageTimestamp(existing)).toBe(existing);
});
it("returns empty/whitespace input unchanged", () => {
expect(injectMessageTimestamp("")).toBe("");
expect(injectMessageTimestamp(" ")).toBe(" ");
});
});
describe("resolveMessageTimezone", () => {
const originalTz = process.env.TZ;
afterEach(() => {
process.env.TZ = originalTz;
});
it("prefers explicit argument when valid", () => {
process.env.TZ = "UTC";
expect(resolveMessageTimezone("America/Chicago")).toBe("America/Chicago");
});
it("falls back to UTC for invalid values", () => {
process.env.TZ = "Invalid/Timezone";
const resolved = resolveMessageTimezone("also/invalid");
expect(resolved).not.toBe("Invalid/Timezone");
expect(resolved.length).toBeGreaterThan(0);
});
});

View file

@ -0,0 +1,100 @@
/**
* Message timestamp injection for time awareness.
*
* Keeps system prompt stable while giving the model a reliable "now"
* reference in each incoming user turn.
*/
const CRON_TIME_PATTERN = /Current time:\s/;
const TIMESTAMP_ENVELOPE_PATTERN = /^\[.*\d{4}-\d{2}-\d{2} \d{2}:\d{2}/;
export interface MessageTimestampOptions {
timeZone?: string;
now?: Date;
}
export function resolveMessageTimezone(configured?: string): string {
const fromArg = configured?.trim();
if (fromArg && isValidTimezone(fromArg)) {
return fromArg;
}
const fromEnv = process.env.TZ?.trim();
if (fromEnv && isValidTimezone(fromEnv)) {
return fromEnv;
}
const hostTimezone = Intl.DateTimeFormat().resolvedOptions().timeZone;
return hostTimezone?.trim() || "UTC";
}
function isValidTimezone(timeZone: string): boolean {
try {
new Intl.DateTimeFormat("en-US", { timeZone }).format(new Date());
return true;
} catch {
return false;
}
}
function formatZonedTimestamp(date: Date, timeZone: string): string | undefined {
const parts = new Intl.DateTimeFormat("en-US", {
timeZone,
year: "numeric",
month: "2-digit",
day: "2-digit",
hour: "2-digit",
minute: "2-digit",
hourCycle: "h23",
timeZoneName: "short",
}).formatToParts(date);
const pick = (type: string) => parts.find((part) => part.type === type)?.value;
const yyyy = pick("year");
const mm = pick("month");
const dd = pick("day");
const hh = pick("hour");
const min = pick("minute");
const tz = [...parts]
.reverse()
.find((part) => part.type === "timeZoneName")
?.value?.trim();
if (!yyyy || !mm || !dd || !hh || !min) {
return undefined;
}
return `${yyyy}-${mm}-${dd} ${hh}:${min}${tz ? ` ${tz}` : ""}`;
}
export function injectMessageTimestamp(
message: string,
opts?: MessageTimestampOptions,
): string {
if (!message.trim()) {
return message;
}
if (TIMESTAMP_ENVELOPE_PATTERN.test(message)) {
return message;
}
if (CRON_TIME_PATTERN.test(message)) {
return message;
}
const now = opts?.now ?? new Date();
const timeZone = resolveMessageTimezone(opts?.timeZone);
const formatted = formatZonedTimestamp(now, timeZone);
if (!formatted) {
return message;
}
const dow = new Intl.DateTimeFormat("en-US", {
timeZone,
weekday: "short",
}).format(now);
return `[${dow} ${formatted}] ${message}`;
}

View file

@ -1,7 +1,7 @@
import { Agent as PiAgentCore, type AgentEvent, type AgentMessage } from "@mariozechner/pi-agent-core";
import { v7 as uuidv7 } from "uuid";
import type { AgentOptions, AgentRunResult, ReasoningMode } from "./types.js";
import type { MulticaEvent } from "./events.js";
import type { MulticaEvent, CompactionEndEvent } from "./events.js";
import { createAgentOutput } from "./cli/output.js";
import { resolveModel, resolveTools, type ResolveToolsOptions } from "./tools.js";
import {
@ -163,11 +163,14 @@ export class Agent {
: 0;
}
this.agent = new PiAgentCore(
this.currentApiKey
? { getApiKey: (_provider: string) => this.currentApiKey! }
: {},
);
this.agent = new PiAgentCore({
getApiKey: (_provider: string) => {
if (!this.currentApiKey) {
throw new Error(`No API key configured for provider: ${this.resolvedProvider}`);
}
return this.currentApiKey;
},
});
// Load Agent Profile (if profileId is specified)
// Every Agent should have a Profile for memory, tools config, and other settings
@ -356,6 +359,11 @@ export class Agent {
}
}
/** Emit an error event through the subscriber mechanism */
emitError(message: string): void {
this.emitMulticaEvent({ type: "agent_error", message });
}
async run(prompt: string): Promise<AgentRunResult> {
// Run-level mutex: prevents concurrent run/runInternal from mis-tagging messages
return this.withRunMutex(() => this._run(prompt));
@ -402,6 +410,14 @@ export class Agent {
await this.ensureInitialized();
this.output.state.lastAssistantText = "";
// Early validation: check API key before calling PiAgentCore.prompt(),
// because getApiKey errors thrown inside PiAgentCore's internal async
// context result in UnhandledPromiseRejection instead of propagating.
if (!this.currentApiKey) {
const errorMsg = `No API key configured for provider: ${this.resolvedProvider}. Please configure a provider in Agent Settings.`;
return { text: "", error: errorMsg };
}
const canRotate = !this.pinnedProfile && this.profileCandidates.length > 1;
let lastError: unknown;
@ -509,14 +525,15 @@ export class Agent {
if (result?.kept) {
this.agent.replaceMessages(result.kept);
}
this.emitMulticaEvent({
const endEvent: CompactionEndEvent = {
type: "compaction_end",
removed: result?.removedCount ?? 0,
kept: result?.kept.length ?? messages.length,
tokensRemoved: result?.tokensRemoved,
tokensKept: result?.tokensKept,
reason: result?.reason ?? "tokens",
});
};
this.emitMulticaEvent(endEvent);
} catch (err) {
throw err;
}

View file

@ -20,6 +20,7 @@ import {
buildSafetySection,
buildSkillsSection,
buildSubagentSection,
buildTimeAwarenessSection,
buildToolCallStyleSection,
buildToolingSummary,
buildUserSection,
@ -66,6 +67,7 @@ export function buildSystemPromptWithReport(options: SystemPromptOptions): {
{ name: "conditional-tools", lines: buildConditionalToolSections(tools, mode) },
{ name: "skills", lines: buildSkillsSection(skillsPrompt, mode) },
{ name: "runtime", lines: buildRuntimeSection(runtime, mode) },
{ name: "time-awareness", lines: buildTimeAwarenessSection(tools, mode) },
{ name: "profile-dir", lines: buildProfileDirSection(profileDir, mode) },
{ name: "subagent", lines: buildSubagentSection(subagent, mode) },
{ name: "extra", lines: buildExtraPromptSection(extraSystemPrompt, mode) },

View file

@ -4,6 +4,7 @@
import os from "node:os";
import type { RuntimeInfo } from "./types.js";
import { resolveMessageTimezone } from "../message-timestamp.js";
/**
* Collect runtime environment information.
@ -16,6 +17,7 @@ export function collectRuntimeInfo(overrides?: Partial<RuntimeInfo>): RuntimeInf
os: overrides?.os ?? process.platform,
arch: overrides?.arch ?? process.arch,
nodeVersion: overrides?.nodeVersion ?? process.version,
timezone: overrides?.timezone ?? resolveMessageTimezone(),
provider: overrides?.provider,
model: overrides?.model,
cwd: overrides?.cwd ?? process.cwd(),
@ -38,6 +40,7 @@ export function formatRuntimeLine(info: RuntimeInfo): string {
parts.push(`arch=${info.arch}`);
}
if (info.nodeVersion) parts.push(`node=${info.nodeVersion}`);
if (info.timezone) parts.push(`tz=${info.timezone}`);
if (info.model) {
const modelStr = info.provider ? `${info.provider}/${info.model}` : info.model;
parts.push(`model=${modelStr}`);

View file

@ -8,6 +8,7 @@ import {
buildSafetySection,
buildSkillsSection,
buildSubagentSection,
buildTimeAwarenessSection,
buildToolCallStyleSection,
buildToolingSummary,
buildUserSection,
@ -209,7 +210,7 @@ describe("buildSkillsSection", () => {
describe("buildRuntimeSection", () => {
it("formats runtime info in full mode", () => {
const result = buildRuntimeSection(
{ agentName: "test", os: "darwin", arch: "arm64", nodeVersion: "v22.0.0", model: "claude", provider: "anthropic" },
{ agentName: "test", os: "darwin", arch: "arm64", nodeVersion: "v22.0.0", timezone: "UTC", model: "claude", provider: "anthropic" },
"full",
);
const text = result.join("\n");
@ -228,6 +229,25 @@ describe("buildRuntimeSection", () => {
});
});
describe("buildTimeAwarenessSection", () => {
it("includes time awareness in full mode", () => {
const result = buildTimeAwarenessSection(["exec"], "full");
const text = result.join("\n");
expect(text).toContain("## Time Awareness");
expect(text).toContain("latest prefixed timestamp");
expect(text).toContain("`exec`");
});
it("includes time awareness in minimal mode", () => {
const result = buildTimeAwarenessSection(undefined, "minimal");
expect(result.join("\n")).toContain("## Time Awareness");
});
it("omits time awareness in none mode", () => {
expect(buildTimeAwarenessSection(["exec"], "none")).toEqual([]);
});
});
describe("buildProfileDirSection", () => {
it("returns empty in all modes (merged into workspace section)", () => {
// Profile directory info is now part of buildWorkspaceSection

View file

@ -314,6 +314,30 @@ export function buildRuntimeSection(
return ["## Runtime", formatRuntimeLine(runtime)];
}
/**
* Time awareness section helps the agent reason about "now" safely.
* Included in full and minimal modes.
*/
export function buildTimeAwarenessSection(
tools: string[] | undefined,
mode: SystemPromptMode,
): string[] {
if (mode === "none") return [];
const hasExecTool = (tools ?? []).some((tool) => tool.toLowerCase() === "exec");
const fallbackLine = hasExecTool
? "If a turn lacks a timestamp and exact current time matters, use `exec` with `date`."
: "If a turn lacks a timestamp and exact current time matters, ask for clarification.";
return [
"## Time Awareness",
"Incoming user messages may include a prefix like `[Wed 2026-02-09 21:15 PST]`.",
"Treat the latest prefixed timestamp as your reference for relative time requests (today, recent, last month).",
fallbackLine,
"",
];
}
/**
* Profile directory section now merged into buildWorkspaceSection.
* Kept for backwards compatibility but returns empty.

View file

@ -31,6 +31,8 @@ export interface RuntimeInfo {
arch?: string | undefined;
/** Node.js version (e.g. "v22.0.0") */
nodeVersion?: string | undefined;
/** User-facing timezone for temporal reasoning (e.g. "America/Los_Angeles") */
timezone?: string | undefined;
/** Current working directory */
cwd?: string | undefined;
}

View file

@ -194,7 +194,7 @@ export class ChannelManager {
if (event.type === "agent_error") {
this.stopTyping();
this.removeAckReaction();
const errorMsg = (event as { error?: string }).error ?? "Unknown error";
const errorMsg = (event as { message?: string }).message ?? "Unknown error";
console.error(`[Channels] Agent error: ${errorMsg}`);
const route = this.lastRoute;
if (route) {

View file

@ -9,7 +9,7 @@ type StubAgent = {
sessionId: string;
ensureInitialized: () => Promise<void>;
getMessages: () => Array<any>;
write: (content: string) => void;
write: (content: string, options?: { injectTimestamp?: boolean }) => void;
waitForIdle: () => Promise<void>;
getHeartbeatConfig: () => { prompt?: string; ackMaxChars?: number; enabled?: boolean };
getPendingWrites: () => number;
@ -71,4 +71,19 @@ describe("heartbeat runner", () => {
expect(result.status).toBe("ran");
});
it("disables timestamp injection for heartbeat prompt writes", async () => {
const writes: Array<{ content: string; options?: { injectTimestamp?: boolean } }> = [];
const agent = createStubAgent({ replyText: "HEARTBEAT_OK" });
const originalWrite = agent.write;
agent.write = (content, options) => {
writes.push(options ? { content, options } : { content });
originalWrite(content, options);
};
await runHeartbeatOnce({ agent: agent as any, reason: "manual" });
expect(writes.length).toBeGreaterThan(0);
expect(writes[0]?.options?.injectTimestamp).toBe(false);
});
});

View file

@ -176,7 +176,7 @@ export async function runHeartbeatOnce(opts: {
? `${basePrompt}\n\nSystem events:\n${pendingEvents.map((line) => `- ${line}`).join("\n")}`
: basePrompt;
agent.write(prompt);
agent.write(prompt, { injectTimestamp: false });
await agent.waitForIdle();
const afterMessages = agent.getMessages();