Ultravox Pipeline
Plataforma de comunicação em tempo real com IA conversacional (STT + LLM + TTS)
🎯 O que é?
Pipeline completo de conversação por voz usando:
- Speech-to-Text (Whisper)
- LLM (Ultravox multimodal ou Groq/OpenAI)
- Text-to-Speech (Kokoro PT-BR/EN)
- WebRTC para streaming de áudio
✨ Novidades V2.0:
- ⚡ Arquitetura Híbrida: Main process unificado + serviços isolados com venvs separados
- 🔗 Communication Service: Hub centralizado com gRPC (primary) + HTTP Binary (fallback) + JSON (last resort)
- 🔒 Isolated Services: STT, TTS, LLM rodando em subprocessos gerenciados com ambientes Python isolados
- 🤖 Auto-Management: Health monitoring, auto-restart, graceful shutdown
- 📊 Métricas Completas: Performance tracking por serviço e protocolo
🏗️ Arquitetura V2.0
Arquitetura Híbrida
┌──────────────────────────────────────────────────────────┐
│ MAIN PROCESS (Port 8080) │
│ ├─ API Gateway (REST) - in-process │
│ ├─ HTTP Polling - in-process │
│ ├─ WebRTC Server - in-process │
│ ├─ WebSocket Server - in-process │
│ ├─ Orchestrator Engine - in-process │
│ └─ Session Manager - in-process │
│ │
│ IsolatedServiceManager (gerencia subprocessos): │
│ ├─ spawns → STT Service (Port 8120, venv isolado) │
│ ├─ spawns → TTS Service (Port 8130, venv isolado) │
│ └─ spawns → LLM Service (Port 8110, venv isolado) │
└──────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────┐
│ COMMUNICATION SERVICE (Port 8888 HTTP, 50888 gRPC) │
│ Hub centralizado para inter-service communication │
│ ├─ gRPC Server (PRIMARY) - ~2ms latency │
│ ├─ HTTP Binary (FALLBACK) - ~8ms latency │
│ └─ HTTP JSON (LAST RESORT) - ~22ms latency │
└──────────────────────────────────────────────────────────┘
3 Modos de Execução
1. LOCAL (Sem GPU)
└─> APIs Externas (Groq/OpenAI)
└─> Latência: ~800-1500ms
└─> Custo: Paga por uso
2. GPU LOCAL (24GB VRAM)
└─> Modelos carregados localmente
└─> Latência: ~150-300ms
└─> Custo: Apenas hardware
3. RUNPOD (Serverless)
└─> Auto-scaling em RunPod
└─> Latência: ~400-700ms
└─> Custo: Paga por uso + idle shutdown
4 Camadas de Context (Dependency Injection)
┌────────────────────────────────────────┐
│ 1. GlobalContext │
│ └─> GPU Manager │
│ └─> Metrics Collector │
│ └─> Profile Config │
├────────────────────────────────────────┤
│ 2. ProcessContext │
│ └─> GlobalContext (herda) │
│ └─> Communication Manager │
├────────────────────────────────────────┤
│ 3. ServiceContext (Service Manager) │
│ └─> ProcessContext (herda tudo) │
│ └─> Logger │
│ └─> Config │
├────────────────────────────────────────┤
│ 4. StandaloneContext (Standalone) │
│ └─> Logger apenas │
│ └─> Config apenas │
│ └─> SEM GPU/Metrics/Communication │
└────────────────────────────────────────┘
SERVICE MANAGER MODE:
└─> ServiceContext (completo com GPU, Metrics, Communication)
STANDALONE MODE:
└─> StandaloneContext (minimalista: só logger + config)
Fluxo de Conversação
Cliente (WebRTC/WebSocket/REST)
↓
Entry Point (4 opções: API Gateway, WebRTC, WebSocket, REST Polling)
↓
ConversationController (unified)
↓
OrchestratorClient
↓
Orchestrator Service
├─> External LLM → Groq/OpenAI API
├─> TTS Service
└─> STT Service
↓
Conversation Store
↓
Response → Cliente
🚀 Quick Start
1. Instalação
git clone https://github.com/your-org/ultravox-pipeline.git
cd ultravox-pipeline
pip install -r requirements.txt
cp .env.example .env
# Edite .env com suas API keys
2. Escolha o Modo
Sem GPU (APIs externas):
export PROFILE=local-pc
export GROQ_API_KEY=your_key
./start_service_manager.sh start
Com GPU local:
export PROFILE=gpu-machine
export CUDA_VISIBLE_DEVICES=0
./start_service_manager.sh start
Com RunPod:
export PROFILE=main-server
export RUNPOD_API_KEY=your_key
./start_service_manager.sh start
3. Testar
# Health check
curl http://localhost:8888/health
# Conversação
curl -X POST http://localhost:8900/api/orchestrator/conversation \
-H "Content-Type: application/json" \
-d '{
"session_id": "test",
"message": "Olá, como você está?",
"voice_id": "pf_dora"
}'
Testes Automatizados (Níveis)
Sistema de testes com 5 níveis para controlar custo e velocidade:
# Testes rápidos e gratuitos (padrão)
./main.sh test <service> # Standard: unit + integration
./main.sh test --all # Todos os serviços (paralelo)
# Níveis específicos
./main.sh test <service> --level unit # Só unit tests (super rápido)
./main.sh test <service> --level real # Validação com APIs reais (dry-run)
./main.sh test <service> --level expensive --confirm-real # Lança clusters reais ($$)
| Nível | Descrição | Custo | Tempo |
|---|---|---|---|
unit |
Testes isolados | Grátis | <1min |
integration |
Mocks de serviços | Grátis | ~2min |
standard |
Unit + integration (padrão) | Grátis | ~3-5min |
real |
Dry-run com APIs reais | Grátis | ~5-10min |
expensive |
Clusters/GPUs reais | PAGO | ~10-30min |
Exemplo - SkyPilot:
./main.sh test skypilot # Testes padrão (grátis)
./main.sh test skypilot --level real # Valida RunPod/VastAI (grátis)
./main.sh test skypilot --level expensive --confirm-real # Lança cluster (~$0.02)
4. Quick Start V2.0 (Novo)
Método simplificado com arquitetura híbrida:
# 1. Iniciar tudo (Main + Isolated Services)
./start_ultravox_v2.sh
# 2. Verificar status
curl http://localhost:8080/health
# 3. Testar conversação
curl -X POST http://localhost:8080/api/orchestrator/conversation \
-H "Content-Type: application/json" \
-d '{
"session_id": "test",
"message": "Hello",
"voice_id": "pf_dora"
}'
# 4. Parar tudo
./stop_ultravox_v2.sh
🔗 Communication Service (Novo)
Hub centralizado que gerencia TODA comunicação entre serviços:
Protocol Priority
- gRPC (Primary) - Latência ~2ms, binary protocol
- HTTP Binary (Fallback) - Latência ~8ms, msgpack/protobuf
- HTTP JSON (Last Resort) - Latência ~22ms, human-readable
Endpoints
# Health Check
curl http://localhost:8888/health
# Call Service via Communication Hub
curl -X POST http://localhost:8888/call \
-H "Content-Type: application/json" \
-d '{
"service_name": "stt",
"endpoint": "/transcribe",
"method": "POST",
"data": {"audio": "base64..."}
}'
# Proxy Request
curl -X POST http://localhost:8888/proxy/stt/transcribe \
-d '{"audio": "base64..."}'
# Metrics
curl http://localhost:8888/metrics
curl http://localhost:8888/metrics/stt
Uso Programático
from src.core.managers.communication_manager import get_communication_manager
comm_manager = get_communication_manager()
# Automatic protocol selection (gRPC → HTTP Binary → JSON)
result = await comm_manager.call_service(
service_name="stt",
endpoint="/transcribe",
method="POST",
json_data={"audio": audio_bytes}
)
🔒 Isolated Services (Novo)
O que são?
Serviços (STT, TTS, LLM) que rodam em subprocessos separados com venvs Python isolados, gerenciados pelo IsolatedServiceManager.
Benefícios
- ✅ Isolamento de Dependências: Cada serviço tem seu próprio venv
- ✅ Auto-Restart: Falhas são detectadas e serviço reinicia automaticamente
- ✅ Health Monitoring: Check automático a cada 10 segundos
- ✅ Graceful Shutdown: SIGTERM → 5s → SIGKILL
Como funciona
from src.core.managers.isolated_service_manager import IsolatedServiceManager
manager = IsolatedServiceManager(project_root)
# Registrar serviços
manager.register_service(ServiceConfig(
name="stt",
script_path="stt_server.py",
venv_path=".venvs/stt_service",
port=8120,
restart_on_failure=True
))
# Iniciar todos
await manager.start_all()
# Restart individual
await manager.restart_service("stt")
# Status
status = manager.get_all_status()
📦 Serviços Principais
Core IA
- LLM (:8100) - Ultravox local ou Groq/OpenAI externo
- STT (:8099) - Whisper local ou Groq externo
- TTS (:8101) - Kokoro local
Orquestração
- Service Manager (:8888) - Orquestração central
- Orchestrator (:8900) - Coordenação de conversação
- API Gateway (:8010) - Entry point REST
Comunicação
- WebRTC (:8020) - Streaming de áudio
- WebSocket (:8022) - Comunicação bidirecional
- REST Polling (:8106) - Long polling
Dados
- Database (:8102) - Vector store (FAISS + SQLite)
- Conversation Store (:8101) - Histórico
- Session (:8800) - Gerenciamento de sessões
🔧 Configuração
Variáveis de Ambiente Principais
# APIs Externas
GROQ_API_KEY=gsk_your_key
OPENAI_API_KEY=sk_your_key
RUNPOD_API_KEY=your_key
# Service Manager
SERVICE_MANAGER_PORT=8888
PROFILE=local-pc # ou gpu-machine, main-server
# Redis (opcional)
REDIS_URL=redis://localhost:6379
# GPU (se disponível)
CUDA_VISIBLE_DEVICES=0
LLM_GPU_MEMORY_UTILIZATION=0.9
Trocar Perfil
# Via CLI
sm profile activate gpu-machine
# Via API
curl -X POST http://localhost:8888/profiles/gpu-machine/activate
📊 Monitoramento
# Stack: Prometheus + Grafana + Loki
cd monitoring
./start_monitoring.sh
# Acessar
open http://localhost:3000 # Grafana (admin/admin)
open http://localhost:9090 # Prometheus
Métricas expostas: Cada serviço expõe em 9200+ (ex: LLM em :9210)
🛠️ CLI Tool
sm list # Lista serviços
sm describe llm # Detalhes do serviço
sm health --all # Health de todos
sm start llm # Iniciar serviço
sm stop llm # Parar serviço
sm call llm /health # Chamar endpoint
📁 Estrutura do Projeto
⚠️ Virtual Environments: Para evitar problemas de file watchers (ENOSPC) e reduzir tamanho do workspace, os virtual environments estão localizados em
~/.cache/ultravox-venvs/com symlinks no projeto:
.venv→~/.cache/ultravox-venvs/.venv(main venv - Python 3.13).venvs→~/.cache/ultravox-venvs/.venvs(service venvs - Python 3.11/3.13)
ultravox-pipeline/
├── 🎯 Servidores Standalone (Entry Points)
│ ├── api_gateway_server.py # REST API (:8010)
│ ├── webrtc_server.py # WebRTC (:8020)
│ ├── websocket_server.py # WebSocket (:8022)
│ ├── orchestrator_server.py # Orchestrator (:8900) ✨ Updated
│ ├── external_llm_server.py # External LLM (:8110) ✨ Updated
│ └── session_server.py # Session (:8800)
│
├── 📁 src/
│ ├── core/ # Components core
│ │ ├── base_service.py # Base para serviços
│ │ ├── context/ # Dependency Injection
│ │ │ ├── service_context.py # ServiceContext (Service Manager)
│ │ │ └── standalone_context.py # StandaloneContext (Standalone)
│ │ ├── controllers/ # ConversationController
│ │ ├── managers/ # Communication, Metrics
│ │ ├── service_manager/ # Service Manager core
│ │ │
│ │ ├── resilience/ # 🆕 Resilience Patterns
│ │ │ ├── circuit_breaker.py # Circuit Breaker Pattern
│ │ │ └── retry_policy.py # Retry com Exponential Backoff
│ │ │
│ │ ├── middleware/ # 🆕 FastAPI Middleware
│ │ │ ├── rate_limiting.py # Rate Limiting
│ │ │ ├── input_validation.py # Input Validation
│ │ │ └── authentication.py # JWT Authentication
│ │ │
│ │ └── logging/ # 🆕 Structured Logging
│ │ └── structured_logger.py # JSON Logs
│ │
│ └── services/ # 20+ Microserviços
│ ├── orchestrator/ # Coordenação
│ ├── external_llm/ # LLM externo
│ ├── llm/ # LLM local
│ ├── tts/ # TTS local
│ ├── database/ # Vector store
│ ├── conversation_store/ # Histórico
│ ├── session/ # Sessions
│ └── ...
│
├── 🔌 modules/ # Módulos reutilizáveis
│ ├── providers/ # LLM/STT/TTS providers
│ ├── pipeline/ # Circuit breaker
│ └── ultravox/ # Ultravox implementation
│
├── 📊 config/ # Configs YAML
│ ├── profiles.yaml # 4 perfis de execução
│ └── services.yaml # Config de serviços
│
├── 📈 monitoring/ # Stack de monitoramento
│ ├── prometheus.yml
│ └── grafana/dashboards/
│
├── 🎓 examples/ # 🆕 Exemplos
│ └── full_middleware_integration.py # Exemplo completo de middleware
│
├── IMPROVEMENTS_SUMMARY.md # 🆕 Documentação v2.0
└── README.md # ✨ Updated
🔄 Comparação de Modos
| Característica | Local (sem GPU) | GPU Local | RunPod |
|---|---|---|---|
| GPU | ❌ | ✅ 24GB | 🚀 Serverless |
| Latência | 800-1500ms | 150-300ms | 400-700ms |
| Custo | $ API | $ Hardware | $$ Uso |
| Ideal para | Dev local | Produção | Auto-scaling |
📊 Comparação V1 vs V2
| Aspecto | V1 (Anterior) | V2 (Atual) |
|---|---|---|
| Processos | 5+ processos separados | 1 Main + 3 subprocessos gerenciados |
| Startup | Scripts múltiplos | ./start_ultravox_v2.sh |
| Latência (entry points) | HTTP (~5ms) | In-memory (<1ms) |
| Isolamento de venv | ❌ Compartilhado | ✅ Isolado (STT/TTS/LLM) |
| Auto-restart | ❌ Manual | ✅ Automático |
| Communication | HTTP direto | Communication Service (gRPC/HTTP) |
| Monitoring | ❌ Manual | ✅ Automático (health checks) |
🛡️ Features Enterprise (v2.0)
Resilience & Fault Tolerance
Circuit Breaker Pattern
Previne cascading failures em serviços distribuídos.
from src.core.resilience import get_circuit_breaker_registry, CircuitBreakerConfig
# Configurar circuit breaker
registry = get_circuit_breaker_registry()
circuit = registry.get(
"groq_api",
config=CircuitBreakerConfig(
failure_threshold=5, # Abrir após 5 falhas
recovery_timeout=60.0, # Testar recovery após 60s
success_threshold=2 # Fechar após 2 sucessos
)
)
# Usar circuit breaker
result = await circuit.call(external_api_function, *args)
Estados:
- 🟢 CLOSED: Normal operation
- 🔴 OPEN: Service down, requests fail fast
- 🟡 HALF_OPEN: Testing recovery
Retry Policy com Exponential Backoff
Recuperação automática de falhas temporárias.
from src.core.resilience import get_retry_policy_registry, RetryPolicyConfig, RetryStrategy
# Configurar retry policy
registry = get_retry_policy_registry()
retry = registry.get(
"groq_api",
config=RetryPolicyConfig(
max_attempts=5,
initial_delay=1.0,
max_delay=60.0,
strategy=RetryStrategy.EXPONENTIAL, # 1s, 2s, 4s, 8s, ...
jitter=True # Adiciona variação aleatória
)
)
# Usar retry policy
result = await retry.execute(flaky_operation, *args)
Estratégias:
- FIXED: Delay constante (1s, 1s, 1s, ...)
- LINEAR: Delay linear (1s, 2s, 3s, ...)
- EXPONENTIAL: Delay exponencial (1s, 2s, 4s, 8s, ...)
Integração Automática no Communication Manager 🆕
Resilience patterns já integrados automaticamente em TODAS as chamadas entre serviços!
from src.core.managers.communication_manager import ServiceCommunicationManager
comm = ServiceCommunicationManager()
# Circuit Breaker + Retry AUTOMÁTICOS em todas as chamadas
result = await comm.call_service(
service_name="llm",
endpoint_path="/chat/completions",
method="POST",
json_data={"messages": [...]}
# enable_resilience=True (padrão)
)
Configuração padrão por serviço:
- Circuit Breaker: 5 falhas → OPEN por 60s
- Retry Policy: 3 tentativas, exponential backoff (1s, 2s, 4s)
- Jitter: 10% de variação aleatória
Controle global:
# Desabilitar resilience patterns
export ENABLE_RESILIENCE=false
# Desabilitar por chamada individual
result = await comm.call_service(..., enable_resilience=False)
Endpoints de Monitoring de Resilience 🆕
API Gateway expõe endpoints REST para monitorar e controlar resilience patterns.
# Ver estatísticas completas
curl http://localhost:8010/resilience/stats
# Ver todos os circuit breakers
curl http://localhost:8010/resilience/circuit-breakers
# Ver circuit breaker específico
curl http://localhost:8010/resilience/circuit-breakers/llm
# Resetar circuit breaker manualmente
curl -X POST http://localhost:8010/resilience/circuit-breakers/llm/reset
# Resetar todos os circuit breakers
curl -X POST http://localhost:8010/resilience/circuit-breakers/reset-all
# Ver políticas de retry
curl http://localhost:8010/resilience/retry-policies
# Ver política de retry específica
curl http://localhost:8010/resilience/retry-policies/llm
# Resetar estatísticas de retry
curl -X POST http://localhost:8010/resilience/retry-policies/reset-stats
Response de stats:
{
"circuit_breakers": {
"llm": {
"state": "closed",
"total_calls": 150,
"total_failures": 2,
"total_successes": 148,
"failure_rate": 0.013,
"last_failure_time": "2025-10-11T14:30:00Z"
}
},
"retry_policies": {
"llm": {
"total_attempts": 152,
"successful_attempts": 148,
"failed_attempts": 4,
"total_retries": 4,
"avg_attempts": 1.03
}
}
}
---
### **Security & Protection**
#### **JWT Authentication**
Autenticação enterprise-grade com tokens JWT.
```python
from src.core.middleware import AuthenticationMiddleware, AuthConfig, UserRole
app.add_middleware(
AuthenticationMiddleware,
config=AuthConfig(
secret_key="your-secret-key-change-in-production",
access_token_expire_minutes=30,
refresh_token_expire_days=7,
public_paths=["/health", "/docs", "/auth/login"],
api_keys={"service-key-123"} # API key alternativo
)
)
Features:
- ✅ Access + Refresh tokens
- ✅ Role-based access control (RBAC)
- ✅ Token blacklist (revocation)
- ✅ API Key authentication
- ✅ Public paths whitelist
Uso:
# Login
curl -X POST http://localhost:8900/auth/login \
-d '{"username":"[email protected]","password":"pass"}'
# Response: {"access_token": "eyJ...", "token_type": "bearer"}
# Chamar endpoint protegido
curl -H "Authorization: Bearer <token>" http://localhost:8900/protected
Input Validation
Validação e sanitização automática de requests.
from src.core.middleware import InputValidationMiddleware, ValidationConfig, ValidationLevel
app.add_middleware(
InputValidationMiddleware,
config=ValidationConfig(
max_content_length=50 * 1024 * 1024, # 50 MB
level=ValidationLevel.MODERATE,
sanitize_html=True, # Remove XSS
sanitize_sql=True, # Detecta SQL injection
validate_base64=True
)
)
Protege contra:
- ✅ XSS (Cross-Site Scripting)
- ✅ SQL Injection
- ✅ JSON bombs (deep nesting)
- ✅ Oversized payloads
- ✅ Invalid Content-Type
Rate Limiting
Proteção contra abuso de API.
from src.core.middleware import RateLimitMiddleware, RateLimitConfig, RateLimitStrategy
app.add_middleware(
RateLimitMiddleware,
config=RateLimitConfig(
requests_per_minute=60,
requests_per_hour=1000,
strategy=RateLimitStrategy.SLIDING_WINDOW, # Mais preciso
whitelist=["127.0.0.1"] # IPs permitidos
)
)
Estratégias:
- FIXED_WINDOW: Contador simples por janela de tempo
- SLIDING_WINDOW: Janela deslizante (mais preciso)
- TOKEN_BUCKET: Permite burst traffic
Response em caso de limite:
{
"error": "Rate limit exceeded",
"retry_after": 45.2,
"message": "Too many requests. Please retry after 45.2 seconds."
}
Observability
Structured Logging (JSON)
Logs estruturados para fácil parsing e análise.
from src.core.logging import get_structured_logger, LogLevel
logger = get_structured_logger("my_service", level=LogLevel.INFO)
# Log simples
logger.info("Request processed", endpoint="/api/chat", duration_ms=125.5)
# Com contexto (request tracking)
with logger.context(request_id="req-123", user_id="user-456"):
logger.info("Processing request")
# ... processar
logger.info("Request completed")
# Performance timing
with logger.timer("database_query"):
# ... query
pass # Logs automaticamente: "Operation completed (duration_ms: 45.2)"
Output (JSON):
{
"timestamp": "2025-10-11T14:05:00.123Z",
"level": "INFO",
"service": "orchestrator",
"message": "Request processed",
"request_id": "req-123",
"user_id": "user-456",
"endpoint": "/api/chat",
"duration_ms": 125.5,
"file": "app.py:42"
}
Benefícios:
- ✅ Fácil integração com Elasticsearch, Loki, CloudWatch
- ✅ Correlação de logs entre serviços
- ✅ Métricas de performance embebidas
- ✅ Rastreamento de erros com contexto completo
🚀 Exemplo de Integração Completa
Veja examples/full_middleware_integration.py para um exemplo completo usando TODOS os componentes.
from fastapi import FastAPI
from src.core.middleware import (
RateLimitMiddleware, RateLimitConfig,
InputValidationMiddleware, ValidationConfig,
AuthenticationMiddleware, AuthConfig
)
from src.core.logging import get_structured_logger
from src.core.resilience import get_circuit_breaker_registry
app = FastAPI()
# 1. Authentication (outermost)
app.add_middleware(AuthenticationMiddleware, config=AuthConfig(...))
# 2. Input Validation
app.add_middleware(InputValidationMiddleware, config=ValidationConfig(...))
# 3. Rate Limiting (innermost)
app.add_middleware(RateLimitMiddleware, config=RateLimitConfig(...))
# 4. Structured Logging
logger = get_structured_logger("my_service")
# 5. Circuit Breaker
circuit = get_circuit_breaker_registry().get("external_api")
@app.post("/api/process")
async def process(data: dict, request: Request):
with logger.context(user_id=request.state.user_id):
logger.info("Processing request")
result = await circuit.call(external_api, data)
return {"result": result}
Executar exemplo:
cd examples
python full_middleware_integration.py
# Open: http://localhost:8000/docs
📚 Documentação
- CLAUDE.md - Regras do projeto
- IMPROVEMENTS_SUMMARY.md - 🆕 Novas features v2.0
- ARCHITECTURE_V2.md - 🆕 Arquitetura híbrida detalhada
- COMMUNICATION_SERVICE.md - 🆕 Communication Service completo
- MONITORING_COMPLETE.md - Monitoramento completo
- Database Status - Vector store
🆘 Troubleshooting
# Service Manager não inicia
rm /tmp/service-manager-8888.pid
./start_service_manager.sh start
# Ver logs
./start_service_manager.sh logs
tail -f src/services/llm/tmp/logs/llm.log
# Health check
sm health --all
curl http://localhost:8888/health
📄 Licença
MIT License - Veja LICENSE
Feito com ❤️ pela equipe Ultravox