""" Speaker Diarization Module Pyannote-audio ile konuşmacı ayrımı (kim ne zaman konuşuyor). """ import os from typing import List, Tuple, Optional # PyTorch 2.6+ compatibility: Disable weights_only restriction for pyannote models os.environ["TORCH_FORCE_NO_WEIGHTS_ONLY_LOAD"] = "1" import torch # Check for GPU availability DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"🔧 Diarization device: {DEVICE}") def get_diarization_pipeline(hf_token: Optional[str] = None): """ Load pyannote speaker diarization pipeline. Args: hf_token: Hugging Face token (required for pyannote models) Returns: Diarization pipeline or None if failed """ try: from pyannote.audio import Pipeline # Try to get token from environment if not provided token = hf_token or os.environ.get("HF_TOKEN") if not token: print("⚠️ HF_TOKEN bulunamadı. pyannote modeli yüklenemeyebilir.") pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", token=token ) # Move to GPU if available pipeline.to(DEVICE) print("✅ Diarization pipeline yüklendi!") return pipeline except Exception as e: print(f"❌ Diarization pipeline yüklenemedi: {e}") return None def diarize_audio(audio_path: str, pipeline, num_speakers: int = None) -> List[Tuple[float, float, str]]: """ Perform speaker diarization on audio file. Args: audio_path: Path to audio file pipeline: Pyannote diarization pipeline num_speakers: Expected number of speakers (None for auto-detect) Returns: List of (start_time, end_time, speaker_label) tuples """ if pipeline is None: return [] try: # Run diarization - let pyannote auto-detect if num_speakers not specified print(f"🔍 Diarization parametreleri: num_speakers={num_speakers}") if num_speakers: # Use min/max range for better detection result = pipeline(audio_path, min_speakers=2, max_speakers=num_speakers) else: # Auto-detect number of speakers result = pipeline(audio_path) # Extract segments from DiarizeOutput object segments = [] # DiarizeOutput has speaker_diarization attribute which is the Annotation if hasattr(result, 'speaker_diarization'): diarization = result.speaker_diarization print(f"🔍 Using speaker_diarization attribute") else: diarization = result # Now iterate over the Annotation object unique_speakers = set() for segment, track, speaker in diarization.itertracks(yield_label=True): segments.append((segment.start, segment.end, speaker)) unique_speakers.add(speaker) print(f"✅ Diarization tamamlandı: {len(segments)} segment, {len(unique_speakers)} konuşmacı") print(f"🔍 Bulunan konuşmacılar: {unique_speakers}") return segments except Exception as e: print(f"❌ Diarization hatası: {e}") return [] def format_speaker_label(speaker: str) -> str: """ Convert pyannote speaker labels (SPEAKER_00, SPEAKER_01) to user-friendly format. """ speaker_map = { "SPEAKER_00": "Kişi 1", "SPEAKER_01": "Kişi 2", "SPEAKER_02": "Kişi 3", "SPEAKER_03": "Kişi 4", } return speaker_map.get(speaker, speaker) def format_timestamp(seconds: float) -> str: """ Convert seconds to [HH:MM:SS] or [MM:SS] format. """ hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours > 0: return f"{hours:02d}:{minutes:02d}:{secs:02d}" else: return f"{minutes:02d}:{secs:02d}"