refactor: replace better-sqlite3 with lowdb for request details storage
- Removed better-sqlite3 dependency and implemented lowdb for JSON-based storage. - Updated observability settings handling and configuration retrieval. - Cleaned up unused observability settings UI components in ProfilePage. - Adjusted contributors badge display in README for better visibility.
This commit is contained in:
parent
753a04b49e
commit
8c8eeecc70
5 changed files with 206 additions and 592 deletions
|
|
@ -1270,7 +1270,7 @@ Full architecture reference: [`docs/ARCHITECTURE.md`](docs/ARCHITECTURE.md)
|
|||
|
||||
Thanks to all contributors who helped make 9Router better!
|
||||
|
||||
[](https://github.com/decolua/9router/graphs/contributors)
|
||||
[](https://github.com/decolua/9router/graphs/contributors)
|
||||
|
||||
---
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@
|
|||
"@monaco-editor/react": "^4.7.0",
|
||||
"@xyflow/react": "^12.10.1",
|
||||
"bcryptjs": "^3.0.3",
|
||||
"better-sqlite3": "^12.6.2",
|
||||
"express": "^5.2.1",
|
||||
"fs": "^0.0.1-security",
|
||||
"http-proxy-middleware": "^3.0.5",
|
||||
|
|
|
|||
|
|
@ -223,24 +223,6 @@ export default function ProfilePage() {
|
|||
}
|
||||
};
|
||||
|
||||
const updateObservabilitySetting = async (key, value) => {
|
||||
const numValue = parseInt(value);
|
||||
if (isNaN(numValue) || numValue < 1) return;
|
||||
|
||||
try {
|
||||
const res = await fetch("/api/settings", {
|
||||
method: "PATCH",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ [key]: numValue }),
|
||||
});
|
||||
if (res.ok) {
|
||||
setSettings(prev => ({ ...prev, [key]: numValue }));
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`Failed to update ${key}:`, err);
|
||||
}
|
||||
};
|
||||
|
||||
const updateObservabilityEnabled = async (enabled) => {
|
||||
try {
|
||||
const res = await fetch("/api/settings", {
|
||||
|
|
@ -623,102 +605,18 @@ export default function ProfilePage() {
|
|||
</div>
|
||||
<h3 className="text-lg font-semibold">Observability</h3>
|
||||
</div>
|
||||
<div className="flex flex-col gap-4">
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="font-medium">Enable Observability</p>
|
||||
<p className="text-sm text-text-muted">
|
||||
Turn request detail recording on/off globally
|
||||
</p>
|
||||
</div>
|
||||
<Toggle
|
||||
checked={observabilityEnabled}
|
||||
onChange={updateObservabilityEnabled}
|
||||
disabled={loading}
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div className={cn("flex flex-col gap-4", !observabilityEnabled && "opacity-60")}>
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="font-medium">Max Records</p>
|
||||
<p className="text-sm text-text-muted">
|
||||
Maximum request detail records to keep (older records are auto-deleted)
|
||||
</p>
|
||||
</div>
|
||||
<Input
|
||||
type="number"
|
||||
min="100"
|
||||
max="10000"
|
||||
step="100"
|
||||
value={settings.observabilityMaxRecords || 1000}
|
||||
onChange={(e) => updateObservabilitySetting("observabilityMaxRecords", parseInt(e.target.value))}
|
||||
disabled={loading || !observabilityEnabled}
|
||||
className="w-28 text-center"
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="font-medium">Batch Size</p>
|
||||
<p className="text-sm text-text-muted">
|
||||
Number of items to accumulate before writing to database (higher = better performance)
|
||||
</p>
|
||||
</div>
|
||||
<Input
|
||||
type="number"
|
||||
min="5"
|
||||
max="100"
|
||||
step="5"
|
||||
value={settings.observabilityBatchSize || 20}
|
||||
onChange={(e) => updateObservabilitySetting("observabilityBatchSize", parseInt(e.target.value))}
|
||||
disabled={loading || !observabilityEnabled}
|
||||
className="w-28 text-center"
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="font-medium">Flush Interval (ms)</p>
|
||||
<p className="text-sm text-text-muted">
|
||||
Maximum time to wait before flushing buffer (prevents data loss during low traffic)
|
||||
</p>
|
||||
</div>
|
||||
<Input
|
||||
type="number"
|
||||
min="1000"
|
||||
max="30000"
|
||||
step="1000"
|
||||
value={settings.observabilityFlushIntervalMs || 5000}
|
||||
onChange={(e) => updateObservabilitySetting("observabilityFlushIntervalMs", parseInt(e.target.value))}
|
||||
disabled={loading || !observabilityEnabled}
|
||||
className="w-28 text-center"
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="font-medium">Max JSON Size (KB)</p>
|
||||
<p className="text-sm text-text-muted">
|
||||
Maximum size for each JSON field (request/response) before truncation
|
||||
</p>
|
||||
</div>
|
||||
<Input
|
||||
type="number"
|
||||
min="100"
|
||||
max="10240"
|
||||
step="100"
|
||||
value={settings.observabilityMaxJsonSize || 1024}
|
||||
onChange={(e) => updateObservabilitySetting("observabilityMaxJsonSize", parseInt(e.target.value))}
|
||||
disabled={loading || !observabilityEnabled}
|
||||
className="w-28 text-center"
|
||||
/>
|
||||
</div>
|
||||
|
||||
<p className="text-xs text-text-muted italic pt-2 border-t border-border/50">
|
||||
Current: Keeps {settings.observabilityMaxRecords || 1000} records, batches every {settings.observabilityBatchSize || 20} requests, max {settings.observabilityMaxJsonSize || 1024}KB per field
|
||||
</p>
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="font-medium">Enable Observability</p>
|
||||
<p className="text-sm text-text-muted">
|
||||
Record request details for inspection in the logs view
|
||||
</p>
|
||||
</div>
|
||||
<Toggle
|
||||
checked={observabilityEnabled}
|
||||
onChange={updateObservabilityEnabled}
|
||||
disabled={loading}
|
||||
/>
|
||||
</div>
|
||||
</Card>
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { NextResponse } from "next/server";
|
||||
import { getRequestDetailsDb } from "@/lib/requestDetailsDb";
|
||||
import { getRequestDetails } from "@/lib/requestDetailsDb";
|
||||
import { getProviderNodes } from "@/lib/localDb";
|
||||
import { AI_PROVIDERS, getProviderByAlias } from "@/shared/constants/providers";
|
||||
|
||||
|
|
@ -9,46 +9,26 @@ import { AI_PROVIDERS, getProviderByAlias } from "@/shared/constants/providers";
|
|||
*/
|
||||
export async function GET() {
|
||||
try {
|
||||
const db = await getRequestDetailsDb();
|
||||
const { details } = await getRequestDetails({ pageSize: 9999 });
|
||||
|
||||
const stmt = db.prepare(`
|
||||
SELECT DISTINCT provider
|
||||
FROM request_details
|
||||
WHERE provider IS NOT NULL AND provider != ''
|
||||
ORDER BY provider ASC
|
||||
`);
|
||||
// Extract unique providers
|
||||
const providerIds = [...new Set(details.map(r => r.provider).filter(Boolean))].sort();
|
||||
|
||||
const rows = stmt.all();
|
||||
|
||||
// Fetch all provider nodes to get names for custom providers
|
||||
const providerNodes = await getProviderNodes();
|
||||
const nodeMap = {};
|
||||
for (const node of providerNodes) {
|
||||
nodeMap[node.id] = node.name;
|
||||
}
|
||||
|
||||
const providers = rows.map(row => {
|
||||
const providerId = row.provider;
|
||||
|
||||
// Try to find name from various sources
|
||||
const providers = providerIds.map(providerId => {
|
||||
let name = providerId;
|
||||
|
||||
// 1. Check if it's a custom provider node
|
||||
if (nodeMap[providerId]) {
|
||||
name = nodeMap[providerId];
|
||||
}
|
||||
// 2. Check predefined providers
|
||||
else {
|
||||
} else {
|
||||
const providerConfig = getProviderByAlias(providerId) || AI_PROVIDERS[providerId];
|
||||
if (providerConfig?.name) {
|
||||
name = providerConfig.name;
|
||||
}
|
||||
if (providerConfig?.name) name = providerConfig.name;
|
||||
}
|
||||
|
||||
return {
|
||||
id: providerId,
|
||||
name
|
||||
};
|
||||
return { id: providerId, name };
|
||||
});
|
||||
|
||||
return NextResponse.json({ providers });
|
||||
|
|
|
|||
|
|
@ -1,19 +1,65 @@
|
|||
import Database from "better-sqlite3";
|
||||
import path from "path";
|
||||
import os from "os";
|
||||
import fs from "fs";
|
||||
import { Low } from "lowdb";
|
||||
import { JSONFile } from "lowdb/node";
|
||||
import path from "node:path";
|
||||
import os from "node:os";
|
||||
import fs from "node:fs";
|
||||
|
||||
const isCloud = typeof caches !== 'undefined' || typeof caches === 'object';
|
||||
const isCloud = typeof caches !== "undefined" && typeof caches === "object";
|
||||
|
||||
// ============================================================================
|
||||
// CONFIGURATION: Batch Processing Settings
|
||||
// ============================================================================
|
||||
const DEFAULT_MAX_RECORDS = 200;
|
||||
const DEFAULT_BATCH_SIZE = 20;
|
||||
const DEFAULT_FLUSH_INTERVAL_MS = 5000;
|
||||
const DEFAULT_MAX_JSON_SIZE = 5 * 1024; // 5KB default, configurable via settings
|
||||
const CONFIG_CACHE_TTL_MS = 5000;
|
||||
|
||||
function getAppName() {
|
||||
return "9router";
|
||||
}
|
||||
|
||||
function getUserDataDir() {
|
||||
if (isCloud) return "/tmp";
|
||||
if (process.env.DATA_DIR) return process.env.DATA_DIR;
|
||||
|
||||
const platform = process.platform;
|
||||
const homeDir = os.homedir();
|
||||
const appName = getAppName();
|
||||
|
||||
if (platform === "win32") {
|
||||
return path.join(process.env.APPDATA || path.join(homeDir, "AppData", "Roaming"), appName);
|
||||
}
|
||||
return path.join(homeDir, `.${appName}`);
|
||||
}
|
||||
|
||||
const DATA_DIR = getUserDataDir();
|
||||
const DB_FILE = isCloud ? null : path.join(DATA_DIR, "request-details.json");
|
||||
|
||||
if (!isCloud && !fs.existsSync(DATA_DIR)) {
|
||||
fs.mkdirSync(DATA_DIR, { recursive: true });
|
||||
}
|
||||
|
||||
let dbInstance = null;
|
||||
|
||||
async function getDb() {
|
||||
if (isCloud) return null;
|
||||
if (!dbInstance) {
|
||||
const adapter = new JSONFile(DB_FILE);
|
||||
const db = new Low(adapter, { records: [] });
|
||||
await db.read();
|
||||
if (!db.data?.records) db.data = { records: [] };
|
||||
dbInstance = db;
|
||||
}
|
||||
return dbInstance;
|
||||
}
|
||||
|
||||
// Config cache
|
||||
let cachedConfig = null;
|
||||
let cachedConfigTs = 0;
|
||||
|
||||
/**
|
||||
* Get observability configuration from settings.
|
||||
* Falls back to environment variables, then defaults.
|
||||
*/
|
||||
async function getObservabilityConfig() {
|
||||
if (cachedConfig && (Date.now() - cachedConfigTs) < CONFIG_CACHE_TTL_MS) {
|
||||
return cachedConfig;
|
||||
}
|
||||
|
||||
try {
|
||||
const { getSettings } = await import("@/lib/localDb");
|
||||
const settings = await getSettings();
|
||||
|
|
@ -22,266 +68,120 @@ async function getObservabilityConfig() {
|
|||
? settings.observabilityEnabled
|
||||
: envEnabled;
|
||||
|
||||
return {
|
||||
cachedConfig = {
|
||||
enabled,
|
||||
maxRecords: settings.observabilityMaxRecords || parseInt(process.env.OBSERVABILITY_MAX_RECORDS || '1000', 10),
|
||||
batchSize: settings.observabilityBatchSize || parseInt(process.env.OBSERVABILITY_BATCH_SIZE || '20', 10),
|
||||
flushIntervalMs: settings.observabilityFlushIntervalMs || parseInt(process.env.OBSERVABILITY_FLUSH_INTERVAL_MS || '5000', 10),
|
||||
maxJsonSize: (settings.observabilityMaxJsonSize || parseInt(process.env.OBSERVABILITY_MAX_JSON_SIZE || '1024', 10)) * 1024
|
||||
maxRecords: settings.observabilityMaxRecords || parseInt(process.env.OBSERVABILITY_MAX_RECORDS || String(DEFAULT_MAX_RECORDS), 10),
|
||||
batchSize: settings.observabilityBatchSize || parseInt(process.env.OBSERVABILITY_BATCH_SIZE || String(DEFAULT_BATCH_SIZE), 10),
|
||||
flushIntervalMs: settings.observabilityFlushIntervalMs || parseInt(process.env.OBSERVABILITY_FLUSH_INTERVAL_MS || String(DEFAULT_FLUSH_INTERVAL_MS), 10),
|
||||
maxJsonSize: (settings.observabilityMaxJsonSize || parseInt(process.env.OBSERVABILITY_MAX_JSON_SIZE || "5", 10)) * 1024,
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("[requestDetailsDb] Failed to load observability config:", error);
|
||||
return {
|
||||
} catch {
|
||||
cachedConfig = {
|
||||
enabled: true,
|
||||
maxRecords: 1000,
|
||||
batchSize: 20,
|
||||
flushIntervalMs: 5000,
|
||||
maxJsonSize: 1024 * 1024
|
||||
maxRecords: DEFAULT_MAX_RECORDS,
|
||||
batchSize: DEFAULT_BATCH_SIZE,
|
||||
flushIntervalMs: DEFAULT_FLUSH_INTERVAL_MS,
|
||||
maxJsonSize: DEFAULT_MAX_JSON_SIZE,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Cache config to avoid repeated database reads
|
||||
let cachedConfig = null;
|
||||
let cachedConfigTs = 0;
|
||||
const CONFIG_CACHE_TTL_MS = 5000;
|
||||
|
||||
async function getCachedObservabilityConfig() {
|
||||
if (!cachedConfig || (Date.now() - cachedConfigTs) > CONFIG_CACHE_TTL_MS) {
|
||||
cachedConfig = await getObservabilityConfig();
|
||||
cachedConfigTs = Date.now();
|
||||
}
|
||||
|
||||
cachedConfigTs = Date.now();
|
||||
return cachedConfig;
|
||||
}
|
||||
|
||||
let dbInstance = null;
|
||||
|
||||
// Get app name
|
||||
function getAppName() {
|
||||
return "9router";
|
||||
}
|
||||
|
||||
// Get user data directory based on platform
|
||||
function getUserDataDir() {
|
||||
if (isCloud) return "/tmp";
|
||||
|
||||
if (process.env.DATA_DIR) return process.env.DATA_DIR;
|
||||
|
||||
try {
|
||||
const platform = process.platform;
|
||||
const homeDir = os.homedir();
|
||||
const appName = getAppName();
|
||||
|
||||
if (platform === "win32") {
|
||||
return path.join(process.env.APPDATA || path.join(homeDir, "AppData", "Roaming"), appName);
|
||||
} else {
|
||||
return path.join(homeDir, `.${appName}`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("[requestDetailsDb] Failed to get user data directory:", error.message);
|
||||
return path.join(process.cwd(), ".9router");
|
||||
}
|
||||
}
|
||||
|
||||
// Database file path
|
||||
const DATA_DIR = getUserDataDir();
|
||||
const DB_FILE = isCloud ? null : path.join(DATA_DIR, "request-details.sqlite");
|
||||
|
||||
// Ensure data directory exists
|
||||
if (!isCloud && fs && typeof fs.existsSync === "function") {
|
||||
try {
|
||||
if (!fs.existsSync(DATA_DIR)) {
|
||||
fs.mkdirSync(DATA_DIR, { recursive: true });
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("[requestDetailsDb] Failed to create data directory:", error.message);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// BATCH WRITE QUEUE
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* In-memory buffer for batch writes.
|
||||
* Accumulates request details before flushing to database in a transaction.
|
||||
* @type {Array<object>}
|
||||
*/
|
||||
// Batch write queue
|
||||
let writeBuffer = [];
|
||||
|
||||
/**
|
||||
* Timer reference for auto-flush mechanism.
|
||||
* Ensures data is written even during low traffic periods.
|
||||
* @type {NodeJS.Timeout|null}
|
||||
*/
|
||||
let flushTimer = null;
|
||||
|
||||
/**
|
||||
* Flag indicating if a flush operation is currently in progress.
|
||||
* Prevents concurrent flushes.
|
||||
* @type {boolean}
|
||||
*/
|
||||
let isFlushing = false;
|
||||
|
||||
/**
|
||||
* Get SQLite database instance (singleton)
|
||||
*/
|
||||
export async function getRequestDetailsDb() {
|
||||
if (isCloud) {
|
||||
// In-memory mock for Workers
|
||||
if (!dbInstance) {
|
||||
dbInstance = {
|
||||
prepare: () => ({
|
||||
run: () => {},
|
||||
get: () => null,
|
||||
all: () => []
|
||||
}),
|
||||
exec: () => {},
|
||||
pragma: () => {}
|
||||
};
|
||||
function safeJsonStringify(obj, maxSize) {
|
||||
try {
|
||||
const str = JSON.stringify(obj);
|
||||
if (str.length > maxSize) {
|
||||
return JSON.stringify({ _truncated: true, _originalSize: str.length, _preview: str.substring(0, 200) });
|
||||
}
|
||||
return dbInstance;
|
||||
return str;
|
||||
} catch {
|
||||
return "{}";
|
||||
}
|
||||
|
||||
if (!dbInstance) {
|
||||
const db = new Database(DB_FILE);
|
||||
|
||||
// Configure for better concurrency
|
||||
db.pragma('journal_mode = WAL'); // Write-Ahead Logging for concurrent access
|
||||
db.pragma('synchronous = NORMAL'); // Faster than FULL, still safe
|
||||
db.pragma('cache_size = -64000'); // 64MB cache
|
||||
db.pragma('temp_store = MEMORY'); // Use memory for temp tables
|
||||
|
||||
// Create table with indexes
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS request_details (
|
||||
id TEXT PRIMARY KEY,
|
||||
provider TEXT,
|
||||
model TEXT,
|
||||
connection_id TEXT,
|
||||
timestamp INTEGER NOT NULL,
|
||||
status TEXT,
|
||||
latency TEXT,
|
||||
tokens TEXT,
|
||||
request TEXT,
|
||||
provider_request TEXT,
|
||||
provider_response TEXT,
|
||||
response TEXT
|
||||
);
|
||||
|
||||
-- Indexes for common queries
|
||||
CREATE INDEX IF NOT EXISTS idx_timestamp
|
||||
ON request_details(timestamp DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_provider
|
||||
ON request_details(provider);
|
||||
CREATE INDEX IF NOT EXISTS idx_model
|
||||
ON request_details(model);
|
||||
CREATE INDEX IF NOT EXISTS idx_connection
|
||||
ON request_details(connection_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_status
|
||||
ON request_details(status);
|
||||
`);
|
||||
|
||||
dbInstance = db;
|
||||
|
||||
// Register shutdown handler on first database initialization
|
||||
ensureShutdownHandler();
|
||||
}
|
||||
|
||||
return dbInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate unique ID for request detail
|
||||
*/
|
||||
function sanitizeHeaders(headers) {
|
||||
if (!headers || typeof headers !== "object") return {};
|
||||
const sensitiveKeys = ["authorization", "x-api-key", "cookie", "token", "api-key"];
|
||||
const sanitized = { ...headers };
|
||||
for (const key of Object.keys(sanitized)) {
|
||||
if (sensitiveKeys.some(s => key.toLowerCase().includes(s))) {
|
||||
delete sanitized[key];
|
||||
}
|
||||
}
|
||||
return sanitized;
|
||||
}
|
||||
|
||||
function generateDetailId(model) {
|
||||
const timestamp = new Date().toISOString();
|
||||
const random = Math.random().toString(36).substring(2, 8);
|
||||
const modelPart = model ? model.replace(/[^a-zA-Z0-9-]/g, '-') : 'unknown';
|
||||
const modelPart = model ? model.replace(/[^a-zA-Z0-9-]/g, "-") : "unknown";
|
||||
return `${timestamp}-${random}-${modelPart}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all buffered items to database in a single transaction.
|
||||
* This function is called automatically when:
|
||||
* 1. Buffer size reaches OBSERVABILITY_BATCH_SIZE
|
||||
* 2. OBSERVABILITY_FLUSH_INTERVAL_MS elapses
|
||||
* 3. Process is shutting down (graceful shutdown)
|
||||
*
|
||||
* @private
|
||||
*/
|
||||
async function flushToDatabase() {
|
||||
if (isCloud || isFlushing || writeBuffer.length === 0) {
|
||||
return;
|
||||
}
|
||||
if (isCloud || isFlushing || writeBuffer.length === 0) return;
|
||||
|
||||
isFlushing = true;
|
||||
|
||||
try {
|
||||
// Take a snapshot of the buffer and clear it immediately
|
||||
const itemsToSave = [...writeBuffer];
|
||||
writeBuffer = [];
|
||||
|
||||
const db = await getRequestDetailsDb();
|
||||
const db = await getDb();
|
||||
const config = await getObservabilityConfig();
|
||||
|
||||
// Prepare statements outside transaction for better performance
|
||||
const insertStmt = db.prepare(`
|
||||
INSERT OR REPLACE INTO request_details
|
||||
(id, provider, model, connection_id, timestamp, status, latency, tokens,
|
||||
request, provider_request, provider_response, response)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`);
|
||||
for (const item of itemsToSave) {
|
||||
if (!item.id) item.id = generateDetailId(item.model);
|
||||
if (!item.timestamp) item.timestamp = new Date().toISOString();
|
||||
if (item.request?.headers) item.request.headers = sanitizeHeaders(item.request.headers);
|
||||
|
||||
const deleteStmt = db.prepare(`
|
||||
DELETE FROM request_details
|
||||
WHERE id NOT IN (
|
||||
SELECT id FROM request_details
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ?
|
||||
)
|
||||
`);
|
||||
// Serialize large fields
|
||||
const record = {
|
||||
id: item.id,
|
||||
provider: item.provider || null,
|
||||
model: item.model || null,
|
||||
connectionId: item.connectionId || null,
|
||||
timestamp: item.timestamp,
|
||||
status: item.status || null,
|
||||
latency: item.latency || {},
|
||||
tokens: item.tokens || {},
|
||||
request: item.request || {},
|
||||
providerRequest: item.providerRequest || {},
|
||||
providerResponse: item.providerResponse || {},
|
||||
response: item.response || {},
|
||||
};
|
||||
|
||||
// Execute all writes in a single transaction for atomicity
|
||||
const transaction = db.transaction((items) => {
|
||||
const maxJsonSize = config.maxJsonSize;
|
||||
|
||||
for (const item of items) {
|
||||
if (!item.id) {
|
||||
item.id = generateDetailId(item.model);
|
||||
// Truncate oversized JSON fields
|
||||
const maxSize = config.maxJsonSize;
|
||||
for (const field of ["request", "providerRequest", "providerResponse", "response"]) {
|
||||
const str = JSON.stringify(record[field]);
|
||||
if (str.length > maxSize) {
|
||||
record[field] = { _truncated: true, _originalSize: str.length, _preview: str.substring(0, 200) };
|
||||
}
|
||||
|
||||
if (!item.timestamp) {
|
||||
item.timestamp = new Date().toISOString();
|
||||
}
|
||||
|
||||
// Sanitize headers if present
|
||||
if (item.request && item.request.headers) {
|
||||
item.request.headers = sanitizeHeaders(item.request.headers);
|
||||
}
|
||||
|
||||
insertStmt.run(
|
||||
item.id,
|
||||
item.provider || null,
|
||||
item.model || null,
|
||||
item.connectionId || null,
|
||||
new Date(item.timestamp).getTime(),
|
||||
item.status || null,
|
||||
JSON.stringify(item.latency || {}),
|
||||
JSON.stringify(item.tokens || {}),
|
||||
safeJsonStringify(item.request || {}, maxJsonSize),
|
||||
safeJsonStringify(item.providerRequest || {}, maxJsonSize),
|
||||
safeJsonStringify(item.providerResponse || {}, maxJsonSize),
|
||||
safeJsonStringify(item.response || {}, maxJsonSize)
|
||||
);
|
||||
}
|
||||
|
||||
// Cleanup old records once per batch (not per item)
|
||||
deleteStmt.run(config.maxRecords);
|
||||
});
|
||||
// Upsert: replace existing record with same id
|
||||
const idx = db.data.records.findIndex(r => r.id === record.id);
|
||||
if (idx !== -1) {
|
||||
db.data.records[idx] = record;
|
||||
} else {
|
||||
db.data.records.push(record);
|
||||
}
|
||||
}
|
||||
|
||||
transaction(itemsToSave);
|
||||
// Keep only latest maxRecords (sorted by timestamp desc)
|
||||
db.data.records.sort((a, b) => new Date(b.timestamp) - new Date(a.timestamp));
|
||||
if (db.data.records.length > config.maxRecords) {
|
||||
db.data.records = db.data.records.slice(0, config.maxRecords);
|
||||
}
|
||||
|
||||
await db.write();
|
||||
} catch (error) {
|
||||
console.error("[requestDetailsDb] Batch write failed:", error);
|
||||
} finally {
|
||||
|
|
@ -289,68 +189,17 @@ async function flushToDatabase() {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely stringify an object with a size limit.
|
||||
* Truncates the result if it exceeds the limit.
|
||||
* @param {object} obj - Object to stringify
|
||||
* @param {number} maxSize - Maximum string size in bytes
|
||||
* @returns {string}
|
||||
*/
|
||||
function safeJsonStringify(obj, maxSize) {
|
||||
try {
|
||||
const str = JSON.stringify(obj);
|
||||
if (str.length > maxSize) {
|
||||
// Return valid JSON instead of truncated invalid string
|
||||
return JSON.stringify({ _truncated: true, _originalSize: str.length, _preview: str.substring(0, 200) });
|
||||
}
|
||||
return str;
|
||||
} catch (error) {
|
||||
return JSON.stringify({ error: "Failed to stringify object", message: error.message });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sanitize sensitive headers from request
|
||||
*/
|
||||
function sanitizeHeaders(headers) {
|
||||
if (!headers || typeof headers !== 'object') return {};
|
||||
|
||||
const sensitiveKeys = ['authorization', 'x-api-key', 'cookie', 'token', 'api-key'];
|
||||
const sanitized = { ...headers };
|
||||
|
||||
for (const key of Object.keys(sanitized)) {
|
||||
if (sensitiveKeys.some(sensitive => key.toLowerCase().includes(sensitive))) {
|
||||
delete sanitized[key];
|
||||
}
|
||||
}
|
||||
|
||||
return sanitized;
|
||||
}
|
||||
|
||||
/**
|
||||
* Save request detail to SQLite (batched for performance).
|
||||
* Details are accumulated in memory and flushed to database in batches.
|
||||
*
|
||||
* @param {object} detail - Request detail object
|
||||
* @see {@link flushToDatabase} for batch write implementation
|
||||
*/
|
||||
export async function saveRequestDetail(detail) {
|
||||
if (isCloud) return;
|
||||
|
||||
const config = await getCachedObservabilityConfig();
|
||||
if (!config.enabled) {
|
||||
return;
|
||||
}
|
||||
const config = await getObservabilityConfig();
|
||||
if (!config.enabled) return;
|
||||
|
||||
writeBuffer.push(detail);
|
||||
|
||||
if (writeBuffer.length >= config.batchSize) {
|
||||
await flushToDatabase();
|
||||
|
||||
if (flushTimer) {
|
||||
clearTimeout(flushTimer);
|
||||
flushTimer = null;
|
||||
}
|
||||
if (flushTimer) { clearTimeout(flushTimer); flushTimer = null; }
|
||||
} else if (!flushTimer) {
|
||||
flushTimer = setTimeout(() => {
|
||||
flushToDatabase().catch(() => {});
|
||||
|
|
@ -359,173 +208,61 @@ export async function saveRequestDetail(detail) {
|
|||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// GRACEFUL SHUTDOWN HANDLER
|
||||
// ============================================================================
|
||||
|
||||
let shutdownHandlerRegistered = false;
|
||||
|
||||
/**
|
||||
* Register process shutdown handlers to flush remaining data before exit.
|
||||
* Should be called once when the module initializes.
|
||||
*/
|
||||
function ensureShutdownHandler() {
|
||||
if (shutdownHandlerRegistered || isCloud) {
|
||||
return;
|
||||
export async function getRequestDetails(filter = {}) {
|
||||
if (isCloud) {
|
||||
return { details: [], pagination: { page: 1, pageSize: 50, totalItems: 0, totalPages: 0, hasNext: false, hasPrev: false } };
|
||||
}
|
||||
|
||||
const handler = async () => {
|
||||
// Clear timer to prevent any pending flush
|
||||
if (flushTimer) {
|
||||
clearTimeout(flushTimer);
|
||||
flushTimer = null;
|
||||
}
|
||||
const db = await getDb();
|
||||
let records = [...db.data.records];
|
||||
|
||||
// Flush any remaining data in buffer
|
||||
if (writeBuffer.length > 0) {
|
||||
console.log(`[requestDetailsDb] Flushing ${writeBuffer.length} items before shutdown...`);
|
||||
await flushToDatabase();
|
||||
}
|
||||
// Apply filters
|
||||
if (filter.provider) records = records.filter(r => r.provider === filter.provider);
|
||||
if (filter.model) records = records.filter(r => r.model === filter.model);
|
||||
if (filter.connectionId) records = records.filter(r => r.connectionId === filter.connectionId);
|
||||
if (filter.status) records = records.filter(r => r.status === filter.status);
|
||||
if (filter.startDate) records = records.filter(r => new Date(r.timestamp) >= new Date(filter.startDate));
|
||||
if (filter.endDate) records = records.filter(r => new Date(r.timestamp) <= new Date(filter.endDate));
|
||||
|
||||
// Sort desc
|
||||
records.sort((a, b) => new Date(b.timestamp) - new Date(a.timestamp));
|
||||
|
||||
const totalItems = records.length;
|
||||
const page = filter.page || 1;
|
||||
const pageSize = filter.pageSize || 50;
|
||||
const totalPages = Math.ceil(totalItems / pageSize);
|
||||
const details = records.slice((page - 1) * pageSize, page * pageSize);
|
||||
|
||||
return {
|
||||
details,
|
||||
pagination: { page, pageSize, totalItems, totalPages, hasNext: page < totalPages, hasPrev: page > 1 },
|
||||
};
|
||||
}
|
||||
|
||||
export async function getRequestDetailById(id) {
|
||||
if (isCloud) return null;
|
||||
|
||||
const db = await getDb();
|
||||
return db.data.records.find(r => r.id === id) || null;
|
||||
}
|
||||
|
||||
// Graceful shutdown
|
||||
let shutdownHandlerRegistered = false;
|
||||
|
||||
function ensureShutdownHandler() {
|
||||
if (shutdownHandlerRegistered || isCloud) return;
|
||||
|
||||
const handler = async () => {
|
||||
if (flushTimer) { clearTimeout(flushTimer); flushTimer = null; }
|
||||
if (writeBuffer.length > 0) await flushToDatabase();
|
||||
};
|
||||
|
||||
// Register handlers for various termination signals
|
||||
process.on('beforeExit', handler);
|
||||
process.on('SIGINT', handler);
|
||||
process.on('SIGTERM', handler);
|
||||
process.on('exit', handler);
|
||||
process.on("beforeExit", handler);
|
||||
process.on("SIGINT", handler);
|
||||
process.on("SIGTERM", handler);
|
||||
process.on("exit", handler);
|
||||
|
||||
shutdownHandlerRegistered = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get request details with filtering and pagination
|
||||
* @param {object} filter - Filter options
|
||||
* @returns {Promise<object>} Details with pagination info
|
||||
*/
|
||||
export async function getRequestDetails(filter = {}) {
|
||||
const db = await getRequestDetailsDb();
|
||||
|
||||
if (isCloud) {
|
||||
return { details: [], pagination: { page: 1, pageSize: filter.pageSize || 50, totalItems: 0, totalPages: 0, hasNext: false, hasPrev: false } };
|
||||
}
|
||||
|
||||
let query = 'SELECT * FROM request_details WHERE 1=1';
|
||||
const params = [];
|
||||
|
||||
if (filter.provider) {
|
||||
query += ' AND provider = ?';
|
||||
params.push(filter.provider);
|
||||
}
|
||||
|
||||
if (filter.model) {
|
||||
query += ' AND model = ?';
|
||||
params.push(filter.model);
|
||||
}
|
||||
|
||||
if (filter.connectionId) {
|
||||
query += ' AND connection_id = ?';
|
||||
params.push(filter.connectionId);
|
||||
}
|
||||
|
||||
if (filter.status) {
|
||||
query += ' AND status = ?';
|
||||
params.push(filter.status);
|
||||
}
|
||||
|
||||
if (filter.startDate) {
|
||||
query += ' AND timestamp >= ?';
|
||||
params.push(new Date(filter.startDate).getTime());
|
||||
}
|
||||
|
||||
if (filter.endDate) {
|
||||
query += ' AND timestamp <= ?';
|
||||
params.push(new Date(filter.endDate).getTime());
|
||||
}
|
||||
|
||||
// Get total count first
|
||||
const countQuery = query.replace('SELECT *', 'SELECT COUNT(*)');
|
||||
const countStmt = db.prepare(countQuery);
|
||||
const totalResult = countStmt.get(...params);
|
||||
const total = totalResult['COUNT(*)'];
|
||||
|
||||
// Add pagination
|
||||
query += ' ORDER BY timestamp DESC';
|
||||
const page = filter.page || 1;
|
||||
const pageSize = filter.pageSize || 50;
|
||||
query += ' LIMIT ? OFFSET ?';
|
||||
params.push(pageSize, (page - 1) * pageSize);
|
||||
|
||||
// Execute query
|
||||
const stmt = db.prepare(query);
|
||||
const rows = stmt.all(...params);
|
||||
|
||||
// Safe JSON parse — returns fallback on corrupt/truncated data
|
||||
const safeJsonParse = (str, fallback = {}) => {
|
||||
try { return JSON.parse(str || '{}'); }
|
||||
catch { return fallback; }
|
||||
};
|
||||
|
||||
// Convert back to original format
|
||||
const details = rows.map(row => ({
|
||||
id: row.id,
|
||||
provider: row.provider,
|
||||
model: row.model,
|
||||
connectionId: row.connection_id,
|
||||
timestamp: new Date(row.timestamp).toISOString(),
|
||||
status: row.status,
|
||||
latency: safeJsonParse(row.latency),
|
||||
tokens: safeJsonParse(row.tokens),
|
||||
request: safeJsonParse(row.request),
|
||||
providerRequest: safeJsonParse(row.provider_request),
|
||||
providerResponse: safeJsonParse(row.provider_response),
|
||||
response: safeJsonParse(row.response)
|
||||
}));
|
||||
|
||||
return {
|
||||
details,
|
||||
pagination: {
|
||||
page,
|
||||
pageSize,
|
||||
totalItems: total,
|
||||
totalPages: Math.ceil(total / pageSize),
|
||||
hasNext: page < Math.ceil(total / pageSize),
|
||||
hasPrev: page > 1
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get single request detail by ID
|
||||
* @param {string} id - Request detail ID
|
||||
* @returns {Promise<object|null>} Request detail or null
|
||||
*/
|
||||
export async function getRequestDetailById(id) {
|
||||
const db = await getRequestDetailsDb();
|
||||
|
||||
if (isCloud) return null;
|
||||
|
||||
const stmt = db.prepare('SELECT * FROM request_details WHERE id = ?');
|
||||
const row = stmt.get(id);
|
||||
|
||||
if (!row) return null;
|
||||
|
||||
const safeJsonParse = (str, fallback = {}) => {
|
||||
try { return JSON.parse(str || '{}'); }
|
||||
catch { return fallback; }
|
||||
};
|
||||
|
||||
return {
|
||||
id: row.id,
|
||||
provider: row.provider,
|
||||
model: row.model,
|
||||
connectionId: row.connection_id,
|
||||
timestamp: new Date(row.timestamp).toISOString(),
|
||||
status: row.status,
|
||||
latency: safeJsonParse(row.latency),
|
||||
tokens: safeJsonParse(row.tokens),
|
||||
request: safeJsonParse(row.request),
|
||||
providerRequest: safeJsonParse(row.provider_request),
|
||||
providerResponse: safeJsonParse(row.provider_response),
|
||||
response: safeJsonParse(row.response)
|
||||
};
|
||||
}
|
||||
ensureShutdownHandler();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue