diff --git a/apps/desktop/src/pipeline/providers/transcription/whisper-provider.ts b/apps/desktop/src/pipeline/providers/transcription/whisper-provider.ts index 711c1c3..ca9ed27 100644 --- a/apps/desktop/src/pipeline/providers/transcription/whisper-provider.ts +++ b/apps/desktop/src/pipeline/providers/transcription/whisper-provider.ts @@ -36,12 +36,19 @@ export class WhisperProvider implements TranscriptionProvider { await this.initializeWhisper(); } - async transcribe(params: TranscribeParams): Promise { + async transcribe( + params: TranscribeParams & { flush?: boolean }, + ): Promise { try { await this.initializeWhisper(); // Extract parameters from the new structure - const { audioData, speechProbability = 0, context } = params; + const { + audioData, + speechProbability = 0, + context, + flush = false, + } = params; const { vocabulary, previousChunk, aggregatedTranscription } = context; // Convert audio buffer to the format expected by smart-whisper @@ -67,7 +74,7 @@ export class WhisperProvider implements TranscriptionProvider { } // Determine if we should transcribe - const shouldTranscribe = this.shouldTranscribe(); + const shouldTranscribe = flush || this.shouldTranscribe(); if (!shouldTranscribe) { // Keep buffering @@ -77,12 +84,14 @@ export class WhisperProvider implements TranscriptionProvider { // Aggregate buffered frames const aggregatedAudio = this.aggregateFrames(); + // Clear buffers immediately after aggregation, before async operations + this.frameBuffer = []; + this.frameBufferSpeechProbabilities = []; + this.silenceFrameCount = 0; + // Skip if too short or only silence /* if (aggregatedAudio.length < this.FRAME_SIZE * 2) { logger.transcription.debug("Skipping transcription - audio too short"); - this.frameBuffer = []; - this.frameBufferSpeechProbabilities = []; - this.silenceFrameCount = 0; return ""; } */ @@ -124,11 +133,6 @@ export class WhisperProvider implements TranscriptionProvider { `Transcription completed, length: ${text.length}`, ); - // Clear buffer after successful transcription - this.frameBuffer = []; - this.frameBufferSpeechProbabilities = []; - this.silenceFrameCount = 0; - return text; } catch (error) { logger.transcription.error("Transcription failed:", error); @@ -225,23 +229,6 @@ export class WhisperProvider implements TranscriptionProvider { return audio.slice(startIdx, Math.min(endIdx, audio.length)); } - // Force transcription of any remaining frames - async flush(): Promise { - if (this.frameBuffer.length === 0) { - return ""; - } - - logger.transcription.error(`Flushing ${this.frameBuffer.length} frames`); - - // Force transcription by setting high silence count - this.silenceFrameCount = 999; - return this.transcribe({ - audioData: Buffer.alloc(0), // Empty buffer, we'll use the buffered frames - speechProbability: 0, - context: {}, - }); - } - private generateInitialPrompt( vocabulary?: Map, aggregatedTranscription?: string, diff --git a/apps/desktop/src/services/transcription-service.ts b/apps/desktop/src/services/transcription-service.ts index 07acad8..bece1a3 100644 --- a/apps/desktop/src/services/transcription-service.ts +++ b/apps/desktop/src/services/transcription-service.ts @@ -17,6 +17,7 @@ import { app } from "electron"; import * as fs from "node:fs"; import * as path from "node:path"; import { convertRawToWav } from "../utils/audio-converter"; +import { Mutex } from "async-mutex"; /** * Service for audio transcription and optional formatting @@ -34,6 +35,8 @@ export class TranscriptionService { private streamingSessions: Map = new Map(); private vadService: VADService | null; private settingsService: SettingsService; + private vadMutex: Mutex; + private transcriptionMutex: Mutex; constructor( modelManagerService: ModelManagerService, @@ -43,6 +46,8 @@ export class TranscriptionService { this.whisperProvider = new WhisperProvider(modelManagerService); this.vadService = vadService; this.settingsService = settingsService; + this.vadMutex = new Mutex(); + this.transcriptionMutex = new Mutex(); } async initialize(): Promise { @@ -167,18 +172,22 @@ export class TranscriptionService { isFinal?: boolean; }): Promise { const { sessionId, audioChunk, isFinal = false } = options; - console.error("processing streaming chunk", { - length: audioChunk.length, - }); // Run VAD on the audio chunk let speechProbability = 0; let isSpeaking = false; if (audioChunk.length > 0 && this.vadService) { + // Acquire VAD mutex + await this.vadMutex.acquire(); + const vadResult = await this.vadService.processAudioFrame( audioChunk.buffer as ArrayBuffer, ); + + // Release VAD mutex + this.vadMutex.release(); + speechProbability = vadResult.probability; isSpeaking = vadResult.isSpeaking; @@ -188,6 +197,8 @@ export class TranscriptionService { }); } + // Acquire transcription mutex + await this.transcriptionMutex.acquire(); // Auto-create session if it doesn't exist let session = this.streamingSessions.get(sessionId); if (!session) { @@ -248,6 +259,7 @@ export class TranscriptionService { previousChunk, aggregatedTranscription: aggregatedTranscription || undefined, }, + flush: isFinal, }); // Accumulate the result only if Whisper returned something @@ -269,38 +281,28 @@ export class TranscriptionService { }); } - // If this is the final chunk, flush any remaining audio and apply formatting - if (isFinal) { - if (!session) { - logger.transcription.error("No session found for final chunk", { - sessionId, - }); - return ""; - } - // Flush any remaining buffered audio in Whisper - if (this.whisperProvider.flush) { - const flushResult = await this.whisperProvider.flush(); - if (flushResult.trim()) { - session.transcriptionResults.push(flushResult); - logger.transcription.info("Flushed final audio", { - sessionId, - flushLength: flushResult.length, - }); - } - } + // Release transcription mutex + this.transcriptionMutex.release(); + let completeTranscriptionTillNow = session.transcriptionResults + .join(" ") + .trim(); - // Get complete transcription - let completeTranscription = session.transcriptionResults.join(" ").trim(); + // this is the final chunk, save the transcription + if (!isFinal) { + return completeTranscriptionTillNow; + } - logger.transcription.info("Finalizing streaming session", { - sessionId, - rawTranscriptionLength: completeTranscription.length, - chunkCount: session.transcriptionResults.length, - }); + let completeTranscription = completeTranscriptionTillNow; - // Format if enabled (currently disabled with && false) - // Commenting out to fix TypeScript errors since this code path is never executed - /* + logger.transcription.info("Finalizing streaming session", { + sessionId, + rawTranscriptionLength: completeTranscription.length, + chunkCount: session.transcriptionResults.length, + }); + + // Format if enabled (currently disabled with && false) + // Commenting out to fix TypeScript errors since this code path is never executed + /* if (this.formatterEnabled && this.openRouterProvider && false) { const style = session.context.sharedData.userPreferences?.formattingStyle; @@ -323,63 +325,59 @@ export class TranscriptionService { } */ - // Convert buffered audio to WAV and save - if ( - session.audioBuffers && - session.audioBuffers.length > 0 && - session.audioFilePath - ) { - // Concatenate all audio buffers - const totalLength = session.audioBuffers.reduce( - (acc, buf) => acc + buf.length, - 0, - ); - const combinedBuffer = Buffer.concat(session.audioBuffers, totalLength); + // Convert buffered audio to WAV and save + if ( + session.audioBuffers && + session.audioBuffers.length > 0 && + session.audioFilePath + ) { + // Concatenate all audio buffers + const totalLength = session.audioBuffers.reduce( + (acc, buf) => acc + buf.length, + 0, + ); + const combinedBuffer = Buffer.concat(session.audioBuffers, totalLength); - // Convert to WAV - const wavBuffer = convertRawToWav(combinedBuffer); + // Convert to WAV + const wavBuffer = convertRawToWav(combinedBuffer); - // Write WAV file - await fs.promises.writeFile(session.audioFilePath, wavBuffer); + // Write WAV file + await fs.promises.writeFile(session.audioFilePath, wavBuffer); - logger.transcription.info("Saved audio as WAV file", { - sessionId, - filePath: session.audioFilePath, - size: wavBuffer.length, - }); - } - - // Save directly to database - logger.transcription.info("Saving transcription with audio file", { + logger.transcription.info("Saved audio as WAV file", { sessionId, - audioFilePath: session.audioFilePath, - hasAudioFile: !!session.audioFilePath, + filePath: session.audioFilePath, + size: wavBuffer.length, }); - - await createTranscription({ - text: completeTranscription, - language: session.context.sharedData.userPreferences?.language || "en", - duration: session.context.sharedData.audioMetadata?.duration, - speechModel: "whisper-local", - formattingModel: this.formatterEnabled ? "openrouter" : undefined, - audioFile: session.audioFilePath, - meta: { - sessionId, - source: session.context.sharedData.audioMetadata?.source, - vocabularySize: session.context.sharedData.vocabulary?.size || 0, - formattingStyle: - session.context.sharedData.userPreferences?.formattingStyle, - }, - }); - - this.streamingSessions.delete(sessionId); - - logger.transcription.info("Streaming session completed", { sessionId }); - return completeTranscription; } - // Return accumulated transcription so far (for UI feedback) - return session ? session.transcriptionResults.join(" ") : ""; + // Save directly to database + logger.transcription.info("Saving transcription with audio file", { + sessionId, + audioFilePath: session.audioFilePath, + hasAudioFile: !!session.audioFilePath, + }); + + await createTranscription({ + text: completeTranscription, + language: session.context.sharedData.userPreferences?.language || "en", + duration: session.context.sharedData.audioMetadata?.duration, + speechModel: "whisper-local", + formattingModel: this.formatterEnabled ? "openrouter" : undefined, + audioFile: session.audioFilePath, + meta: { + sessionId, + source: session.context.sharedData.audioMetadata?.source, + vocabularySize: session.context.sharedData.vocabulary?.size || 0, + formattingStyle: + session.context.sharedData.userPreferences?.formattingStyle, + }, + }); + + this.streamingSessions.delete(sessionId); + + logger.transcription.info("Streaming session completed", { sessionId }); + return completeTranscription; } private async buildContext(): Promise {