Spaces:
Configuration error
Configuration error
| #!/usr/bin/env python3 | |
| """ | |
| Avatar em Tempo Real - StyleTTS2 + MuseTalk | |
| Mantém modelos carregados em memória para RTF < 1 | |
| Uso: | |
| python realtime_avatar.py --avatar video.mp4 --voice voice_ref.wav | |
| Uma vez carregado, você pode enviar textos e receber videos em tempo real. | |
| """ | |
| import argparse | |
| import os | |
| import sys | |
| import time | |
| import torch | |
| import numpy as np | |
| import scipy.io.wavfile as wavfile | |
| # Fix PyTorch 2.6 | |
| original_load = torch.load | |
| def patched_load(*args, **kwargs): | |
| kwargs['weights_only'] = False | |
| return original_load(*args, **kwargs) | |
| torch.load = patched_load | |
| class RealtimeAvatar: | |
| """Avatar em tempo real com TTS e Lip Sync pré-carregados.""" | |
| def __init__(self, voice_ref_path: str = None, diffusion_steps: int = 5): | |
| self.voice_ref_path = voice_ref_path | |
| self.diffusion_steps = diffusion_steps | |
| self.tts_model = None | |
| self.musetalk_loaded = False | |
| def load_tts(self): | |
| """Carrega StyleTTS2 em memória.""" | |
| print("[TTS] Carregando StyleTTS2...") | |
| start = time.time() | |
| from styletts2 import tts | |
| self.tts_model = tts.StyleTTS2() | |
| # Warm-up | |
| _ = self.tts_model.inference("Hello", diffusion_steps=3) | |
| torch.cuda.synchronize() | |
| print(f"[TTS] Carregado em {time.time() - start:.2f}s") | |
| def generate_audio(self, text: str, output_path: str = None) -> tuple: | |
| """ | |
| Gera audio a partir de texto. | |
| Retorna: (wav_array, audio_duration, synthesis_time, rtf) | |
| """ | |
| if self.tts_model is None: | |
| self.load_tts() | |
| start = time.time() | |
| if self.voice_ref_path: | |
| wav = self.tts_model.inference( | |
| text, | |
| target_voice_path=self.voice_ref_path, | |
| diffusion_steps=self.diffusion_steps | |
| ) | |
| else: | |
| wav = self.tts_model.inference( | |
| text, | |
| diffusion_steps=self.diffusion_steps | |
| ) | |
| torch.cuda.synchronize() | |
| synthesis_time = time.time() - start | |
| audio_duration = len(wav) / 24000 | |
| rtf = synthesis_time / audio_duration | |
| if output_path: | |
| wavfile.write(output_path, 24000, wav) | |
| return wav, audio_duration, synthesis_time, rtf | |
| def load_musetalk(self, avatar_video: str, bbox_shift: int = 5): | |
| """ | |
| Carrega MuseTalk e prepara avatar. | |
| O avatar é pre-processado uma vez e reutilizado. | |
| """ | |
| print("[LipSync] Carregando MuseTalk...") | |
| start = time.time() | |
| # Adicionar path do MuseTalk | |
| musetalk_path = os.environ.get('MUSETALK_DIR', '/root/musetalk-space') | |
| sys.path.insert(0, musetalk_path) | |
| os.chdir(musetalk_path) | |
| from musetalk.utils.utils import load_all_model | |
| from musetalk.utils.preprocessing import get_landmark_and_bbox | |
| # Carregar modelos | |
| self.audio_processor, self.vae, self.unet, self.pe = load_all_model() | |
| # Pre-processar avatar (isso é feito uma vez só) | |
| print("[LipSync] Pre-processando avatar...") | |
| # ... (código de pre-processamento do avatar) | |
| self.musetalk_loaded = True | |
| print(f"[LipSync] Carregado em {time.time() - start:.2f}s") | |
| def benchmark(self, test_text: str = "Hello, this is a real time test."): | |
| """Executa benchmark de RTF.""" | |
| print("\n" + "="*60) | |
| print("BENCHMARK RTF") | |
| print("="*60) | |
| if self.tts_model is None: | |
| self.load_tts() | |
| # Testar diferentes configurações | |
| for steps in [3, 5, 10]: | |
| self.diffusion_steps = steps | |
| # Warm-up | |
| self.generate_audio(test_text) | |
| # Benchmark (média de 3 runs) | |
| rtfs = [] | |
| for _ in range(3): | |
| _, duration, synth_time, rtf = self.generate_audio(test_text) | |
| rtfs.append(rtf) | |
| avg_rtf = np.mean(rtfs) | |
| print(f"diffusion_steps={steps:2d}: RTF={avg_rtf:.4f} ({1/avg_rtf:.1f}x tempo real)") | |
| print("="*60 + "\n") | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Avatar em Tempo Real') | |
| parser.add_argument('--voice', '-v', help='Audio de referencia para clonagem') | |
| parser.add_argument('--steps', '-s', type=int, default=5, help='Diffusion steps (3-5 para tempo real)') | |
| parser.add_argument('--benchmark', '-b', action='store_true', help='Executar benchmark') | |
| parser.add_argument('--interactive', '-i', action='store_true', help='Modo interativo') | |
| args = parser.parse_args() | |
| avatar = RealtimeAvatar( | |
| voice_ref_path=args.voice, | |
| diffusion_steps=args.steps | |
| ) | |
| if args.benchmark: | |
| avatar.benchmark() | |
| return | |
| # Carregar modelos | |
| avatar.load_tts() | |
| if args.interactive: | |
| print("\n[MODO INTERATIVO]") | |
| print("Digite um texto para gerar audio (ou 'quit' para sair):\n") | |
| while True: | |
| text = input("> ") | |
| if text.lower() in ['quit', 'exit', 'q']: | |
| break | |
| wav, duration, synth_time, rtf = avatar.generate_audio(text) | |
| print(f" Audio: {duration:.2f}s | Sintese: {synth_time:.3f}s | RTF: {rtf:.4f} ({1/rtf:.1f}x)") | |
| else: | |
| # Teste rapido | |
| text = "Hello everyone, this is a real time test of the avatar system." | |
| wav, duration, synth_time, rtf = avatar.generate_audio(text, "test_output.wav") | |
| print(f"\nResultado:") | |
| print(f" Audio: {duration:.2f}s") | |
| print(f" Sintese: {synth_time:.3f}s") | |
| print(f" RTF: {rtf:.4f}") | |
| print(f" Velocidade: {1/rtf:.1f}x tempo real") | |
| if __name__ == '__main__': | |
| main() | |