chore: handle race conditions during vad + transcription flow

This commit is contained in:
haritabh-z01 2025-07-06 21:51:05 +05:30
parent 522cd05a9e
commit 3be5f9b1a5
2 changed files with 95 additions and 110 deletions

View file

@ -36,12 +36,19 @@ export class WhisperProvider implements TranscriptionProvider {
await this.initializeWhisper();
}
async transcribe(params: TranscribeParams): Promise<string> {
async transcribe(
params: TranscribeParams & { flush?: boolean },
): Promise<string> {
try {
await this.initializeWhisper();
// Extract parameters from the new structure
const { audioData, speechProbability = 0, context } = params;
const {
audioData,
speechProbability = 0,
context,
flush = false,
} = params;
const { vocabulary, previousChunk, aggregatedTranscription } = context;
// Convert audio buffer to the format expected by smart-whisper
@ -67,7 +74,7 @@ export class WhisperProvider implements TranscriptionProvider {
}
// Determine if we should transcribe
const shouldTranscribe = this.shouldTranscribe();
const shouldTranscribe = flush || this.shouldTranscribe();
if (!shouldTranscribe) {
// Keep buffering
@ -77,12 +84,14 @@ export class WhisperProvider implements TranscriptionProvider {
// Aggregate buffered frames
const aggregatedAudio = this.aggregateFrames();
// Clear buffers immediately after aggregation, before async operations
this.frameBuffer = [];
this.frameBufferSpeechProbabilities = [];
this.silenceFrameCount = 0;
// Skip if too short or only silence
/* if (aggregatedAudio.length < this.FRAME_SIZE * 2) {
logger.transcription.debug("Skipping transcription - audio too short");
this.frameBuffer = [];
this.frameBufferSpeechProbabilities = [];
this.silenceFrameCount = 0;
return "";
} */
@ -124,11 +133,6 @@ export class WhisperProvider implements TranscriptionProvider {
`Transcription completed, length: ${text.length}`,
);
// Clear buffer after successful transcription
this.frameBuffer = [];
this.frameBufferSpeechProbabilities = [];
this.silenceFrameCount = 0;
return text;
} catch (error) {
logger.transcription.error("Transcription failed:", error);
@ -225,23 +229,6 @@ export class WhisperProvider implements TranscriptionProvider {
return audio.slice(startIdx, Math.min(endIdx, audio.length));
}
// Force transcription of any remaining frames
async flush(): Promise<string> {
if (this.frameBuffer.length === 0) {
return "";
}
logger.transcription.error(`Flushing ${this.frameBuffer.length} frames`);
// Force transcription by setting high silence count
this.silenceFrameCount = 999;
return this.transcribe({
audioData: Buffer.alloc(0), // Empty buffer, we'll use the buffered frames
speechProbability: 0,
context: {},
});
}
private generateInitialPrompt(
vocabulary?: Map<string, string>,
aggregatedTranscription?: string,

View file

@ -17,6 +17,7 @@ import { app } from "electron";
import * as fs from "node:fs";
import * as path from "node:path";
import { convertRawToWav } from "../utils/audio-converter";
import { Mutex } from "async-mutex";
/**
* Service for audio transcription and optional formatting
@ -34,6 +35,8 @@ export class TranscriptionService {
private streamingSessions: Map<string, ExtendedStreamingSession> = new Map();
private vadService: VADService | null;
private settingsService: SettingsService;
private vadMutex: Mutex;
private transcriptionMutex: Mutex;
constructor(
modelManagerService: ModelManagerService,
@ -43,6 +46,8 @@ export class TranscriptionService {
this.whisperProvider = new WhisperProvider(modelManagerService);
this.vadService = vadService;
this.settingsService = settingsService;
this.vadMutex = new Mutex();
this.transcriptionMutex = new Mutex();
}
async initialize(): Promise<void> {
@ -167,18 +172,22 @@ export class TranscriptionService {
isFinal?: boolean;
}): Promise<string> {
const { sessionId, audioChunk, isFinal = false } = options;
console.error("processing streaming chunk", {
length: audioChunk.length,
});
// Run VAD on the audio chunk
let speechProbability = 0;
let isSpeaking = false;
if (audioChunk.length > 0 && this.vadService) {
// Acquire VAD mutex
await this.vadMutex.acquire();
const vadResult = await this.vadService.processAudioFrame(
audioChunk.buffer as ArrayBuffer,
);
// Release VAD mutex
this.vadMutex.release();
speechProbability = vadResult.probability;
isSpeaking = vadResult.isSpeaking;
@ -188,6 +197,8 @@ export class TranscriptionService {
});
}
// Acquire transcription mutex
await this.transcriptionMutex.acquire();
// Auto-create session if it doesn't exist
let session = this.streamingSessions.get(sessionId);
if (!session) {
@ -248,6 +259,7 @@ export class TranscriptionService {
previousChunk,
aggregatedTranscription: aggregatedTranscription || undefined,
},
flush: isFinal,
});
// Accumulate the result only if Whisper returned something
@ -269,38 +281,28 @@ export class TranscriptionService {
});
}
// If this is the final chunk, flush any remaining audio and apply formatting
if (isFinal) {
if (!session) {
logger.transcription.error("No session found for final chunk", {
sessionId,
});
return "";
}
// Flush any remaining buffered audio in Whisper
if (this.whisperProvider.flush) {
const flushResult = await this.whisperProvider.flush();
if (flushResult.trim()) {
session.transcriptionResults.push(flushResult);
logger.transcription.info("Flushed final audio", {
sessionId,
flushLength: flushResult.length,
});
}
}
// Release transcription mutex
this.transcriptionMutex.release();
let completeTranscriptionTillNow = session.transcriptionResults
.join(" ")
.trim();
// Get complete transcription
let completeTranscription = session.transcriptionResults.join(" ").trim();
// this is the final chunk, save the transcription
if (!isFinal) {
return completeTranscriptionTillNow;
}
logger.transcription.info("Finalizing streaming session", {
sessionId,
rawTranscriptionLength: completeTranscription.length,
chunkCount: session.transcriptionResults.length,
});
let completeTranscription = completeTranscriptionTillNow;
// Format if enabled (currently disabled with && false)
// Commenting out to fix TypeScript errors since this code path is never executed
/*
logger.transcription.info("Finalizing streaming session", {
sessionId,
rawTranscriptionLength: completeTranscription.length,
chunkCount: session.transcriptionResults.length,
});
// Format if enabled (currently disabled with && false)
// Commenting out to fix TypeScript errors since this code path is never executed
/*
if (this.formatterEnabled && this.openRouterProvider && false) {
const style =
session.context.sharedData.userPreferences?.formattingStyle;
@ -323,63 +325,59 @@ export class TranscriptionService {
}
*/
// Convert buffered audio to WAV and save
if (
session.audioBuffers &&
session.audioBuffers.length > 0 &&
session.audioFilePath
) {
// Concatenate all audio buffers
const totalLength = session.audioBuffers.reduce(
(acc, buf) => acc + buf.length,
0,
);
const combinedBuffer = Buffer.concat(session.audioBuffers, totalLength);
// Convert buffered audio to WAV and save
if (
session.audioBuffers &&
session.audioBuffers.length > 0 &&
session.audioFilePath
) {
// Concatenate all audio buffers
const totalLength = session.audioBuffers.reduce(
(acc, buf) => acc + buf.length,
0,
);
const combinedBuffer = Buffer.concat(session.audioBuffers, totalLength);
// Convert to WAV
const wavBuffer = convertRawToWav(combinedBuffer);
// Convert to WAV
const wavBuffer = convertRawToWav(combinedBuffer);
// Write WAV file
await fs.promises.writeFile(session.audioFilePath, wavBuffer);
// Write WAV file
await fs.promises.writeFile(session.audioFilePath, wavBuffer);
logger.transcription.info("Saved audio as WAV file", {
sessionId,
filePath: session.audioFilePath,
size: wavBuffer.length,
});
}
// Save directly to database
logger.transcription.info("Saving transcription with audio file", {
logger.transcription.info("Saved audio as WAV file", {
sessionId,
audioFilePath: session.audioFilePath,
hasAudioFile: !!session.audioFilePath,
filePath: session.audioFilePath,
size: wavBuffer.length,
});
await createTranscription({
text: completeTranscription,
language: session.context.sharedData.userPreferences?.language || "en",
duration: session.context.sharedData.audioMetadata?.duration,
speechModel: "whisper-local",
formattingModel: this.formatterEnabled ? "openrouter" : undefined,
audioFile: session.audioFilePath,
meta: {
sessionId,
source: session.context.sharedData.audioMetadata?.source,
vocabularySize: session.context.sharedData.vocabulary?.size || 0,
formattingStyle:
session.context.sharedData.userPreferences?.formattingStyle,
},
});
this.streamingSessions.delete(sessionId);
logger.transcription.info("Streaming session completed", { sessionId });
return completeTranscription;
}
// Return accumulated transcription so far (for UI feedback)
return session ? session.transcriptionResults.join(" ") : "";
// Save directly to database
logger.transcription.info("Saving transcription with audio file", {
sessionId,
audioFilePath: session.audioFilePath,
hasAudioFile: !!session.audioFilePath,
});
await createTranscription({
text: completeTranscription,
language: session.context.sharedData.userPreferences?.language || "en",
duration: session.context.sharedData.audioMetadata?.duration,
speechModel: "whisper-local",
formattingModel: this.formatterEnabled ? "openrouter" : undefined,
audioFile: session.audioFilePath,
meta: {
sessionId,
source: session.context.sharedData.audioMetadata?.source,
vocabularySize: session.context.sharedData.vocabulary?.size || 0,
formattingStyle:
session.context.sharedData.userPreferences?.formattingStyle,
},
});
this.streamingSessions.delete(sessionId);
logger.transcription.info("Streaming session completed", { sessionId });
return completeTranscription;
}
private async buildContext(): Promise<PipelineContext> {