YAML Metadata Warning: empty or missing yaml metadata in repo card (https://huggingface.co/docs/hub/model-cards#model-card-metadata)

Ultravox Pipeline

Plataforma de comunicação em tempo real com IA conversacional (STT + LLM + TTS)

Services Architecture Version Communication Security Resilience Logging Isolated


🎯 O que é?

Pipeline completo de conversação por voz usando:

  • Speech-to-Text (Whisper)
  • LLM (Ultravox multimodal ou Groq/OpenAI)
  • Text-to-Speech (Kokoro PT-BR/EN)
  • WebRTC para streaming de áudio

Novidades V2.0:

  • Arquitetura Híbrida: Main process unificado + serviços isolados com venvs separados
  • 🔗 Communication Service: Hub centralizado com gRPC (primary) + HTTP Binary (fallback) + JSON (last resort)
  • 🔒 Isolated Services: STT, TTS, LLM rodando em subprocessos gerenciados com ambientes Python isolados
  • 🤖 Auto-Management: Health monitoring, auto-restart, graceful shutdown
  • 📊 Métricas Completas: Performance tracking por serviço e protocolo

🏗️ Arquitetura V2.0

Arquitetura Híbrida

┌──────────────────────────────────────────────────────────┐
│  MAIN PROCESS (Port 8080)                                │
│  ├─ API Gateway (REST) - in-process                     │
│  ├─ HTTP Polling - in-process                           │
│  ├─ WebRTC Server - in-process                          │
│  ├─ WebSocket Server - in-process                       │
│  ├─ Orchestrator Engine - in-process                    │
│  └─ Session Manager - in-process                        │
│                                                           │
│  IsolatedServiceManager (gerencia subprocessos):         │
│  ├─ spawns → STT Service (Port 8120, venv isolado)     │
│  ├─ spawns → TTS Service (Port 8130, venv isolado)     │
│  └─ spawns → LLM Service (Port 8110, venv isolado)     │
└──────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────┐
│  COMMUNICATION SERVICE (Port 8888 HTTP, 50888 gRPC)      │
│  Hub centralizado para inter-service communication        │
│  ├─ gRPC Server (PRIMARY) - ~2ms latency                │
│  ├─ HTTP Binary (FALLBACK) - ~8ms latency               │
│  └─ HTTP JSON (LAST RESORT) - ~22ms latency             │
└──────────────────────────────────────────────────────────┘

3 Modos de Execução

1. LOCAL (Sem GPU)
   └─> APIs Externas (Groq/OpenAI)
   └─> Latência: ~800-1500ms
   └─> Custo: Paga por uso

2. GPU LOCAL (24GB VRAM)
   └─> Modelos carregados localmente
   └─> Latência: ~150-300ms
   └─> Custo: Apenas hardware

3. RUNPOD (Serverless)
   └─> Auto-scaling em RunPod
   └─> Latência: ~400-700ms
   └─> Custo: Paga por uso + idle shutdown

4 Camadas de Context (Dependency Injection)

┌────────────────────────────────────────┐
│ 1. GlobalContext                       │
│    └─> GPU Manager                     │
│    └─> Metrics Collector               │
│    └─> Profile Config                  │
├────────────────────────────────────────┤
│ 2. ProcessContext                      │
│    └─> GlobalContext (herda)           │
│    └─> Communication Manager           │
├────────────────────────────────────────┤
│ 3. ServiceContext (Service Manager)    │
│    └─> ProcessContext (herda tudo)     │
│    └─> Logger                          │
│    └─> Config                          │
├────────────────────────────────────────┤
│ 4. StandaloneContext (Standalone)      │
│    └─> Logger apenas                   │
│    └─> Config apenas                   │
│    └─> SEM GPU/Metrics/Communication   │
└────────────────────────────────────────┘

SERVICE MANAGER MODE:
  └─> ServiceContext (completo com GPU, Metrics, Communication)

STANDALONE MODE:
  └─> StandaloneContext (minimalista: só logger + config)

Fluxo de Conversação

Cliente (WebRTC/WebSocket/REST)
    ↓
Entry Point (4 opções: API Gateway, WebRTC, WebSocket, REST Polling)
    ↓
ConversationController (unified)
    ↓
OrchestratorClient
    ↓
Orchestrator Service
    ├─> External LLM → Groq/OpenAI API
    ├─> TTS Service
    └─> STT Service
    ↓
Conversation Store
    ↓
Response → Cliente

🚀 Quick Start

1. Instalação

git clone https://github.com/your-org/ultravox-pipeline.git
cd ultravox-pipeline
pip install -r requirements.txt
cp .env.example .env
# Edite .env com suas API keys

2. Escolha o Modo

Sem GPU (APIs externas):

export PROFILE=local-pc
export GROQ_API_KEY=your_key
./start_service_manager.sh start

Com GPU local:

export PROFILE=gpu-machine
export CUDA_VISIBLE_DEVICES=0
./start_service_manager.sh start

Com RunPod:

export PROFILE=main-server
export RUNPOD_API_KEY=your_key
./start_service_manager.sh start

3. Testar

# Health check
curl http://localhost:8888/health

# Conversação
curl -X POST http://localhost:8900/api/orchestrator/conversation \
  -H "Content-Type: application/json" \
  -d '{
    "session_id": "test",
    "message": "Olá, como você está?",
    "voice_id": "pf_dora"
  }'

Testes Automatizados (Níveis)

Sistema de testes com 5 níveis para controlar custo e velocidade:

# Testes rápidos e gratuitos (padrão)
./main.sh test <service>                    # Standard: unit + integration
./main.sh test --all                         # Todos os serviços (paralelo)

# Níveis específicos
./main.sh test <service> --level unit        # Só unit tests (super rápido)
./main.sh test <service> --level real        # Validação com APIs reais (dry-run)
./main.sh test <service> --level expensive --confirm-real  # Lança clusters reais ($$)
Nível Descrição Custo Tempo
unit Testes isolados Grátis <1min
integration Mocks de serviços Grátis ~2min
standard Unit + integration (padrão) Grátis ~3-5min
real Dry-run com APIs reais Grátis ~5-10min
expensive Clusters/GPUs reais PAGO ~10-30min

Exemplo - SkyPilot:

./main.sh test skypilot                     # Testes padrão (grátis)
./main.sh test skypilot --level real        # Valida RunPod/VastAI (grátis)
./main.sh test skypilot --level expensive --confirm-real  # Lança cluster (~$0.02)

4. Quick Start V2.0 (Novo)

Método simplificado com arquitetura híbrida:

# 1. Iniciar tudo (Main + Isolated Services)
./start_ultravox_v2.sh

# 2. Verificar status
curl http://localhost:8080/health

# 3. Testar conversação
curl -X POST http://localhost:8080/api/orchestrator/conversation \
  -H "Content-Type: application/json" \
  -d '{
    "session_id": "test",
    "message": "Hello",
    "voice_id": "pf_dora"
  }'

# 4. Parar tudo
./stop_ultravox_v2.sh

🔗 Communication Service (Novo)

Hub centralizado que gerencia TODA comunicação entre serviços:

Protocol Priority

  1. gRPC (Primary) - Latência ~2ms, binary protocol
  2. HTTP Binary (Fallback) - Latência ~8ms, msgpack/protobuf
  3. HTTP JSON (Last Resort) - Latência ~22ms, human-readable

Endpoints

# Health Check
curl http://localhost:8888/health

# Call Service via Communication Hub
curl -X POST http://localhost:8888/call \
  -H "Content-Type: application/json" \
  -d '{
    "service_name": "stt",
    "endpoint": "/transcribe",
    "method": "POST",
    "data": {"audio": "base64..."}
  }'

# Proxy Request
curl -X POST http://localhost:8888/proxy/stt/transcribe \
  -d '{"audio": "base64..."}'

# Metrics
curl http://localhost:8888/metrics
curl http://localhost:8888/metrics/stt

Uso Programático

from src.core.managers.communication_manager import get_communication_manager

comm_manager = get_communication_manager()

# Automatic protocol selection (gRPC → HTTP Binary → JSON)
result = await comm_manager.call_service(
    service_name="stt",
    endpoint="/transcribe",
    method="POST",
    json_data={"audio": audio_bytes}
)

🔒 Isolated Services (Novo)

O que são?

Serviços (STT, TTS, LLM) que rodam em subprocessos separados com venvs Python isolados, gerenciados pelo IsolatedServiceManager.

Benefícios

  • Isolamento de Dependências: Cada serviço tem seu próprio venv
  • Auto-Restart: Falhas são detectadas e serviço reinicia automaticamente
  • Health Monitoring: Check automático a cada 10 segundos
  • Graceful Shutdown: SIGTERM → 5s → SIGKILL

Como funciona

from src.core.managers.isolated_service_manager import IsolatedServiceManager

manager = IsolatedServiceManager(project_root)

# Registrar serviços
manager.register_service(ServiceConfig(
    name="stt",
    script_path="stt_server.py",
    venv_path=".venvs/stt_service",
    port=8120,
    restart_on_failure=True
))

# Iniciar todos
await manager.start_all()

# Restart individual
await manager.restart_service("stt")

# Status
status = manager.get_all_status()

📦 Serviços Principais

Core IA

  • LLM (:8100) - Ultravox local ou Groq/OpenAI externo
  • STT (:8099) - Whisper local ou Groq externo
  • TTS (:8101) - Kokoro local

Orquestração

  • Service Manager (:8888) - Orquestração central
  • Orchestrator (:8900) - Coordenação de conversação
  • API Gateway (:8010) - Entry point REST

Comunicação

  • WebRTC (:8020) - Streaming de áudio
  • WebSocket (:8022) - Comunicação bidirecional
  • REST Polling (:8106) - Long polling

Dados

  • Database (:8102) - Vector store (FAISS + SQLite)
  • Conversation Store (:8101) - Histórico
  • Session (:8800) - Gerenciamento de sessões

🔧 Configuração

Variáveis de Ambiente Principais

# APIs Externas
GROQ_API_KEY=gsk_your_key
OPENAI_API_KEY=sk_your_key
RUNPOD_API_KEY=your_key

# Service Manager
SERVICE_MANAGER_PORT=8888
PROFILE=local-pc  # ou gpu-machine, main-server

# Redis (opcional)
REDIS_URL=redis://localhost:6379

# GPU (se disponível)
CUDA_VISIBLE_DEVICES=0
LLM_GPU_MEMORY_UTILIZATION=0.9

Trocar Perfil

# Via CLI
sm profile activate gpu-machine

# Via API
curl -X POST http://localhost:8888/profiles/gpu-machine/activate

📊 Monitoramento

# Stack: Prometheus + Grafana + Loki
cd monitoring
./start_monitoring.sh

# Acessar
open http://localhost:3000  # Grafana (admin/admin)
open http://localhost:9090  # Prometheus

Métricas expostas: Cada serviço expõe em 9200+ (ex: LLM em :9210)


🛠️ CLI Tool

sm list                    # Lista serviços
sm describe llm            # Detalhes do serviço
sm health --all            # Health de todos
sm start llm               # Iniciar serviço
sm stop llm                # Parar serviço
sm call llm /health        # Chamar endpoint

📁 Estrutura do Projeto

⚠️ Virtual Environments: Para evitar problemas de file watchers (ENOSPC) e reduzir tamanho do workspace, os virtual environments estão localizados em ~/.cache/ultravox-venvs/ com symlinks no projeto:

  • .venv~/.cache/ultravox-venvs/.venv (main venv - Python 3.13)
  • .venvs~/.cache/ultravox-venvs/.venvs (service venvs - Python 3.11/3.13)
ultravox-pipeline/
├── 🎯 Servidores Standalone (Entry Points)
│   ├── api_gateway_server.py       # REST API (:8010)
│   ├── webrtc_server.py            # WebRTC (:8020)
│   ├── websocket_server.py         # WebSocket (:8022)
│   ├── orchestrator_server.py      # Orchestrator (:8900) ✨ Updated
│   ├── external_llm_server.py      # External LLM (:8110) ✨ Updated
│   └── session_server.py           # Session (:8800)
│
├── 📁 src/
│   ├── core/                       # Components core
│   │   ├── base_service.py         # Base para serviços
│   │   ├── context/                # Dependency Injection
│   │   │   ├── service_context.py  # ServiceContext (Service Manager)
│   │   │   └── standalone_context.py # StandaloneContext (Standalone)
│   │   ├── controllers/            # ConversationController
│   │   ├── managers/               # Communication, Metrics
│   │   ├── service_manager/        # Service Manager core
│   │   │
│   │   ├── resilience/             # 🆕 Resilience Patterns
│   │   │   ├── circuit_breaker.py  # Circuit Breaker Pattern
│   │   │   └── retry_policy.py     # Retry com Exponential Backoff
│   │   │
│   │   ├── middleware/             # 🆕 FastAPI Middleware
│   │   │   ├── rate_limiting.py    # Rate Limiting
│   │   │   ├── input_validation.py # Input Validation
│   │   │   └── authentication.py   # JWT Authentication
│   │   │
│   │   └── logging/                # 🆕 Structured Logging
│   │       └── structured_logger.py # JSON Logs
│   │
│   └── services/                   # 20+ Microserviços
│       ├── orchestrator/           # Coordenação
│       ├── external_llm/           # LLM externo
│       ├── llm/                    # LLM local
│       ├── tts/                    # TTS local
│       ├── database/               # Vector store
│       ├── conversation_store/     # Histórico
│       ├── session/                # Sessions
│       └── ...
│
├── 🔌 modules/                     # Módulos reutilizáveis
│   ├── providers/                  # LLM/STT/TTS providers
│   ├── pipeline/                   # Circuit breaker
│   └── ultravox/                   # Ultravox implementation
│
├── 📊 config/                      # Configs YAML
│   ├── profiles.yaml               # 4 perfis de execução
│   └── services.yaml               # Config de serviços
│
├── 📈 monitoring/                  # Stack de monitoramento
│   ├── prometheus.yml
│   └── grafana/dashboards/
│
├── 🎓 examples/                    # 🆕 Exemplos
│   └── full_middleware_integration.py  # Exemplo completo de middleware
│
├── IMPROVEMENTS_SUMMARY.md         # 🆕 Documentação v2.0
└── README.md                       # ✨ Updated

🔄 Comparação de Modos

Característica Local (sem GPU) GPU Local RunPod
GPU ✅ 24GB 🚀 Serverless
Latência 800-1500ms 150-300ms 400-700ms
Custo $ API $ Hardware $$ Uso
Ideal para Dev local Produção Auto-scaling

📊 Comparação V1 vs V2

Aspecto V1 (Anterior) V2 (Atual)
Processos 5+ processos separados 1 Main + 3 subprocessos gerenciados
Startup Scripts múltiplos ./start_ultravox_v2.sh
Latência (entry points) HTTP (~5ms) In-memory (<1ms)
Isolamento de venv ❌ Compartilhado ✅ Isolado (STT/TTS/LLM)
Auto-restart ❌ Manual ✅ Automático
Communication HTTP direto Communication Service (gRPC/HTTP)
Monitoring ❌ Manual ✅ Automático (health checks)

🛡️ Features Enterprise (v2.0)

Resilience & Fault Tolerance

Circuit Breaker Pattern

Previne cascading failures em serviços distribuídos.

from src.core.resilience import get_circuit_breaker_registry, CircuitBreakerConfig

# Configurar circuit breaker
registry = get_circuit_breaker_registry()
circuit = registry.get(
    "groq_api",
    config=CircuitBreakerConfig(
        failure_threshold=5,        # Abrir após 5 falhas
        recovery_timeout=60.0,      # Testar recovery após 60s
        success_threshold=2         # Fechar após 2 sucessos
    )
)

# Usar circuit breaker
result = await circuit.call(external_api_function, *args)

Estados:

  • 🟢 CLOSED: Normal operation
  • 🔴 OPEN: Service down, requests fail fast
  • 🟡 HALF_OPEN: Testing recovery

Retry Policy com Exponential Backoff

Recuperação automática de falhas temporárias.

from src.core.resilience import get_retry_policy_registry, RetryPolicyConfig, RetryStrategy

# Configurar retry policy
registry = get_retry_policy_registry()
retry = registry.get(
    "groq_api",
    config=RetryPolicyConfig(
        max_attempts=5,
        initial_delay=1.0,
        max_delay=60.0,
        strategy=RetryStrategy.EXPONENTIAL,  # 1s, 2s, 4s, 8s, ...
        jitter=True  # Adiciona variação aleatória
    )
)

# Usar retry policy
result = await retry.execute(flaky_operation, *args)

Estratégias:

  • FIXED: Delay constante (1s, 1s, 1s, ...)
  • LINEAR: Delay linear (1s, 2s, 3s, ...)
  • EXPONENTIAL: Delay exponencial (1s, 2s, 4s, 8s, ...)

Integração Automática no Communication Manager 🆕

Resilience patterns já integrados automaticamente em TODAS as chamadas entre serviços!

from src.core.managers.communication_manager import ServiceCommunicationManager

comm = ServiceCommunicationManager()

# Circuit Breaker + Retry AUTOMÁTICOS em todas as chamadas
result = await comm.call_service(
    service_name="llm",
    endpoint_path="/chat/completions",
    method="POST",
    json_data={"messages": [...]}
    # enable_resilience=True (padrão)
)

Configuração padrão por serviço:

  • Circuit Breaker: 5 falhas → OPEN por 60s
  • Retry Policy: 3 tentativas, exponential backoff (1s, 2s, 4s)
  • Jitter: 10% de variação aleatória

Controle global:

# Desabilitar resilience patterns
export ENABLE_RESILIENCE=false

# Desabilitar por chamada individual
result = await comm.call_service(..., enable_resilience=False)

Endpoints de Monitoring de Resilience 🆕

API Gateway expõe endpoints REST para monitorar e controlar resilience patterns.

# Ver estatísticas completas
curl http://localhost:8010/resilience/stats

# Ver todos os circuit breakers
curl http://localhost:8010/resilience/circuit-breakers

# Ver circuit breaker específico
curl http://localhost:8010/resilience/circuit-breakers/llm

# Resetar circuit breaker manualmente
curl -X POST http://localhost:8010/resilience/circuit-breakers/llm/reset

# Resetar todos os circuit breakers
curl -X POST http://localhost:8010/resilience/circuit-breakers/reset-all

# Ver políticas de retry
curl http://localhost:8010/resilience/retry-policies

# Ver política de retry específica
curl http://localhost:8010/resilience/retry-policies/llm

# Resetar estatísticas de retry
curl -X POST http://localhost:8010/resilience/retry-policies/reset-stats

Response de stats:

{
    "circuit_breakers": {
        "llm": {
            "state": "closed",
            "total_calls": 150,
            "total_failures": 2,
            "total_successes": 148,
            "failure_rate": 0.013,
            "last_failure_time": "2025-10-11T14:30:00Z"
        }
    },
    "retry_policies": {
        "llm": {
            "total_attempts": 152,
            "successful_attempts": 148,
            "failed_attempts": 4,
            "total_retries": 4,
            "avg_attempts": 1.03
        }
    }
}

---

### **Security & Protection**

#### **JWT Authentication**
Autenticação enterprise-grade com tokens JWT.

```python
from src.core.middleware import AuthenticationMiddleware, AuthConfig, UserRole

app.add_middleware(
    AuthenticationMiddleware,
    config=AuthConfig(
        secret_key="your-secret-key-change-in-production",
        access_token_expire_minutes=30,
        refresh_token_expire_days=7,
        public_paths=["/health", "/docs", "/auth/login"],
        api_keys={"service-key-123"}  # API key alternativo
    )
)

Features:

  • ✅ Access + Refresh tokens
  • ✅ Role-based access control (RBAC)
  • ✅ Token blacklist (revocation)
  • ✅ API Key authentication
  • ✅ Public paths whitelist

Uso:

# Login
curl -X POST http://localhost:8900/auth/login \
  -d '{"username":"[email protected]","password":"pass"}'

# Response: {"access_token": "eyJ...", "token_type": "bearer"}

# Chamar endpoint protegido
curl -H "Authorization: Bearer <token>" http://localhost:8900/protected

Input Validation

Validação e sanitização automática de requests.

from src.core.middleware import InputValidationMiddleware, ValidationConfig, ValidationLevel

app.add_middleware(
    InputValidationMiddleware,
    config=ValidationConfig(
        max_content_length=50 * 1024 * 1024,  # 50 MB
        level=ValidationLevel.MODERATE,
        sanitize_html=True,   # Remove XSS
        sanitize_sql=True,    # Detecta SQL injection
        validate_base64=True
    )
)

Protege contra:

  • ✅ XSS (Cross-Site Scripting)
  • ✅ SQL Injection
  • ✅ JSON bombs (deep nesting)
  • ✅ Oversized payloads
  • ✅ Invalid Content-Type

Rate Limiting

Proteção contra abuso de API.

from src.core.middleware import RateLimitMiddleware, RateLimitConfig, RateLimitStrategy

app.add_middleware(
    RateLimitMiddleware,
    config=RateLimitConfig(
        requests_per_minute=60,
        requests_per_hour=1000,
        strategy=RateLimitStrategy.SLIDING_WINDOW,  # Mais preciso
        whitelist=["127.0.0.1"]  # IPs permitidos
    )
)

Estratégias:

  • FIXED_WINDOW: Contador simples por janela de tempo
  • SLIDING_WINDOW: Janela deslizante (mais preciso)
  • TOKEN_BUCKET: Permite burst traffic

Response em caso de limite:

{
    "error": "Rate limit exceeded",
    "retry_after": 45.2,
    "message": "Too many requests. Please retry after 45.2 seconds."
}

Observability

Structured Logging (JSON)

Logs estruturados para fácil parsing e análise.

from src.core.logging import get_structured_logger, LogLevel

logger = get_structured_logger("my_service", level=LogLevel.INFO)

# Log simples
logger.info("Request processed", endpoint="/api/chat", duration_ms=125.5)

# Com contexto (request tracking)
with logger.context(request_id="req-123", user_id="user-456"):
    logger.info("Processing request")
    # ... processar
    logger.info("Request completed")

# Performance timing
with logger.timer("database_query"):
    # ... query
    pass  # Logs automaticamente: "Operation completed (duration_ms: 45.2)"

Output (JSON):

{
    "timestamp": "2025-10-11T14:05:00.123Z",
    "level": "INFO",
    "service": "orchestrator",
    "message": "Request processed",
    "request_id": "req-123",
    "user_id": "user-456",
    "endpoint": "/api/chat",
    "duration_ms": 125.5,
    "file": "app.py:42"
}

Benefícios:

  • ✅ Fácil integração com Elasticsearch, Loki, CloudWatch
  • ✅ Correlação de logs entre serviços
  • ✅ Métricas de performance embebidas
  • ✅ Rastreamento de erros com contexto completo

🚀 Exemplo de Integração Completa

Veja examples/full_middleware_integration.py para um exemplo completo usando TODOS os componentes.

from fastapi import FastAPI
from src.core.middleware import (
    RateLimitMiddleware, RateLimitConfig,
    InputValidationMiddleware, ValidationConfig,
    AuthenticationMiddleware, AuthConfig
)
from src.core.logging import get_structured_logger
from src.core.resilience import get_circuit_breaker_registry

app = FastAPI()

# 1. Authentication (outermost)
app.add_middleware(AuthenticationMiddleware, config=AuthConfig(...))

# 2. Input Validation
app.add_middleware(InputValidationMiddleware, config=ValidationConfig(...))

# 3. Rate Limiting (innermost)
app.add_middleware(RateLimitMiddleware, config=RateLimitConfig(...))

# 4. Structured Logging
logger = get_structured_logger("my_service")

# 5. Circuit Breaker
circuit = get_circuit_breaker_registry().get("external_api")

@app.post("/api/process")
async def process(data: dict, request: Request):
    with logger.context(user_id=request.state.user_id):
        logger.info("Processing request")
        result = await circuit.call(external_api, data)
        return {"result": result}

Executar exemplo:

cd examples
python full_middleware_integration.py

# Open: http://localhost:8000/docs

📚 Documentação


🆘 Troubleshooting

# Service Manager não inicia
rm /tmp/service-manager-8888.pid
./start_service_manager.sh start

# Ver logs
./start_service_manager.sh logs
tail -f src/services/llm/tmp/logs/llm.log

# Health check
sm health --all
curl http://localhost:8888/health

📄 Licença

MIT License - Veja LICENSE


Feito com ❤️ pela equipe Ultravox

Downloads last month

-

Downloads are not tracked for this model. How to track
Inference Providers NEW
This model isn't deployed by any Inference Provider. 🙋 Ask for provider support