|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any
|
|
|
|
|
|
|
|
from App_Function_Libraries.Audio.Audio_Transcription_Lib import speech_to_text
|
|
|
|
|
|
from pyannote.audio.pipelines.speaker_diarization import SpeakerDiarization
|
|
import yaml
|
|
|
|
|
|
|
|
|
|
|
|
def load_pipeline_from_pretrained(path_to_config: str | Path) -> SpeakerDiarization:
|
|
path_to_config = Path(path_to_config).resolve()
|
|
logging.debug(f"Loading pyannote pipeline from {path_to_config}...")
|
|
|
|
if not path_to_config.exists():
|
|
raise FileNotFoundError(f"Config file not found: {path_to_config}")
|
|
|
|
|
|
with open(path_to_config, 'r') as config_file:
|
|
config = yaml.safe_load(config_file)
|
|
|
|
|
|
logging.debug(f"Loaded config: {config}")
|
|
|
|
|
|
try:
|
|
pipeline = SpeakerDiarization(
|
|
segmentation=config['pipeline']['params']['segmentation'],
|
|
embedding=config['pipeline']['params']['embedding'],
|
|
clustering=config['pipeline']['params']['clustering'],
|
|
)
|
|
except KeyError as e:
|
|
logging.error(f"Error accessing config key: {e}")
|
|
raise
|
|
|
|
|
|
try:
|
|
pipeline_params = {
|
|
"segmentation": {},
|
|
"clustering": {},
|
|
}
|
|
|
|
if 'params' in config and 'segmentation' in config['params']:
|
|
if 'min_duration_off' in config['params']['segmentation']:
|
|
pipeline_params["segmentation"]["min_duration_off"] = config['params']['segmentation']['min_duration_off']
|
|
|
|
if 'params' in config and 'clustering' in config['params']:
|
|
if 'method' in config['params']['clustering']:
|
|
pipeline_params["clustering"]["method"] = config['params']['clustering']['method']
|
|
if 'min_cluster_size' in config['params']['clustering']:
|
|
pipeline_params["clustering"]["min_cluster_size"] = config['params']['clustering']['min_cluster_size']
|
|
if 'threshold' in config['params']['clustering']:
|
|
pipeline_params["clustering"]["threshold"] = config['params']['clustering']['threshold']
|
|
|
|
if 'pipeline' in config and 'params' in config['pipeline']:
|
|
if 'embedding_batch_size' in config['pipeline']['params']:
|
|
pipeline_params["embedding_batch_size"] = config['pipeline']['params']['embedding_batch_size']
|
|
if 'embedding_exclude_overlap' in config['pipeline']['params']:
|
|
pipeline_params["embedding_exclude_overlap"] = config['pipeline']['params']['embedding_exclude_overlap']
|
|
if 'segmentation_batch_size' in config['pipeline']['params']:
|
|
pipeline_params["segmentation_batch_size"] = config['pipeline']['params']['segmentation_batch_size']
|
|
|
|
logging.debug(f"Pipeline params: {pipeline_params}")
|
|
pipeline.instantiate(pipeline_params)
|
|
except KeyError as e:
|
|
logging.error(f"Error accessing config key: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logging.error(f"Error instantiating pipeline: {e}")
|
|
raise
|
|
|
|
return pipeline
|
|
|
|
|
|
def audio_diarization(audio_file_path: str) -> list:
|
|
logging.info('audio-diarization: Loading pyannote pipeline')
|
|
|
|
base_dir = Path(__file__).parent.resolve()
|
|
config_path = base_dir / 'models' / 'pyannote_diarization_config.yaml'
|
|
logging.info(f"audio-diarization: Loading pipeline from {config_path}")
|
|
|
|
try:
|
|
pipeline = load_pipeline_from_pretrained(config_path)
|
|
except Exception as e:
|
|
logging.error(f"Failed to load pipeline: {str(e)}")
|
|
raise
|
|
|
|
logging.info(f"audio-diarization: Audio file path: {audio_file_path}")
|
|
|
|
try:
|
|
logging.info('audio-diarization: Starting diarization...')
|
|
diarization_result = pipeline(audio_file_path)
|
|
|
|
segments = []
|
|
for turn, _, speaker in diarization_result.itertracks(yield_label=True):
|
|
segment = {
|
|
"start": turn.start,
|
|
"end": turn.end,
|
|
"speaker": speaker
|
|
}
|
|
logging.debug(f"Segment: {segment}")
|
|
segments.append(segment)
|
|
logging.info("audio-diarization: Diarization completed with pyannote")
|
|
|
|
return segments
|
|
|
|
except Exception as e:
|
|
logging.error(f"audio-diarization: Error performing diarization: {str(e)}")
|
|
raise RuntimeError("audio-diarization: Error performing diarization") from e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def combine_transcription_and_diarization(audio_file_path: str) -> List[Dict[str, Any]]:
|
|
logging.info('combine-transcription-and-diarization: Starting transcription and diarization...')
|
|
|
|
try:
|
|
logging.info('Performing speech-to-text...')
|
|
transcription_result = speech_to_text(audio_file_path)
|
|
logging.info(f"Transcription result type: {type(transcription_result)}")
|
|
logging.info(f"Transcription result: {transcription_result[:3] if isinstance(transcription_result, list) and len(transcription_result) > 3 else transcription_result}")
|
|
|
|
logging.info('Performing audio diarization...')
|
|
diarization_result = audio_diarization(audio_file_path)
|
|
logging.info(f"Diarization result type: {type(diarization_result)}")
|
|
logging.info(f"Diarization result sample: {diarization_result[:3] if isinstance(diarization_result, list) and len(diarization_result) > 3 else diarization_result}")
|
|
|
|
if not transcription_result:
|
|
logging.error("Empty result from transcription")
|
|
return []
|
|
|
|
if not diarization_result:
|
|
logging.error("Empty result from diarization")
|
|
return []
|
|
|
|
|
|
if isinstance(transcription_result, dict) and 'segments' in transcription_result:
|
|
transcription_segments = transcription_result['segments']
|
|
elif isinstance(transcription_result, list):
|
|
transcription_segments = transcription_result
|
|
else:
|
|
logging.error(f"Unexpected transcription result format: {type(transcription_result)}")
|
|
return []
|
|
|
|
logging.info(f"Number of transcription segments: {len(transcription_segments)}")
|
|
logging.info(f"Transcription segments sample: {transcription_segments[:3] if len(transcription_segments) > 3 else transcription_segments}")
|
|
|
|
if not isinstance(diarization_result, list):
|
|
logging.error(f"Unexpected diarization result format: {type(diarization_result)}")
|
|
return []
|
|
|
|
combined_result = []
|
|
for transcription_segment in transcription_segments:
|
|
if not isinstance(transcription_segment, dict):
|
|
logging.warning(f"Unexpected transcription segment format: {transcription_segment}")
|
|
continue
|
|
|
|
for diarization_segment in diarization_result:
|
|
if not isinstance(diarization_segment, dict):
|
|
logging.warning(f"Unexpected diarization segment format: {diarization_segment}")
|
|
continue
|
|
|
|
try:
|
|
trans_start = transcription_segment.get('Time_Start', 0)
|
|
trans_end = transcription_segment.get('Time_End', 0)
|
|
diar_start = diarization_segment.get('start', 0)
|
|
diar_end = diarization_segment.get('end', 0)
|
|
|
|
if trans_start >= diar_start and trans_end <= diar_end:
|
|
combined_segment = {
|
|
"Time_Start": trans_start,
|
|
"Time_End": trans_end,
|
|
"Speaker": diarization_segment.get('speaker', 'Unknown'),
|
|
"Text": transcription_segment.get('Text', '')
|
|
}
|
|
combined_result.append(combined_segment)
|
|
break
|
|
except Exception as e:
|
|
logging.error(f"Error processing segment: {str(e)}")
|
|
logging.error(f"Transcription segment: {transcription_segment}")
|
|
logging.error(f"Diarization segment: {diarization_segment}")
|
|
continue
|
|
|
|
logging.info(f"Combined result length: {len(combined_result)}")
|
|
logging.info(f"Combined result sample: {combined_result[:3] if len(combined_result) > 3 else combined_result}")
|
|
return combined_result
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in combine_transcription_and_diarization: {str(e)}", exc_info=True)
|
|
return []
|
|
|
|
|
|
|
|
|
|
|