🔄 Guia de Integração - Async Processing (Celery)
Data: 2025-12-01
Status: ✅ Guia Criado
🎯 Objetivo
Este guia mostra como integrar processamento assíncrono usando Celery nos apps do ecossistema C-Suite para mover processamentos pesados para background tasks.
📦 Módulo Disponível
- Módulo:
common/common_async.py - Funções Principais:
async_task()- Decorator para criar tasks assíncronasdelay_task()- Executar task assincronamenteget_task_result()- Verificar status de taskwait_for_task()- Aguardar conclusão de taskget_celery_app()- Obter instância do Celery
🚀 Integração Passo a Passo
Passo 1: Configurar Variáveis de Ambiente
# Redis como broker e backend
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
# Serialização
CELERY_TASK_SERIALIZER=json
CELERY_RESULT_SERIALIZER=json
CELERY_ACCEPT_CONTENT=json
# Timezone
CELERY_TIMEZONE=UTC
CELERY_ENABLE_UTC=true
# Limites de tempo
CELERY_TASK_TIME_LIMIT=300 # 5 minutos
CELERY_TASK_SOFT_TIME_LIMIT=240 # 4 minutos
Passo 2: Instalar Dependências
pip install celery redis
Passo 3: Importar e Configurar no App
import sys
from pathlib import Path
# Adiciona common ao path
COMMON_PATH = str(Path(__file__).resolve().parents[N]) # Ajuste N
sys.path.insert(0, COMMON_PATH)
try:
from common.common_async import (
async_task,
delay_task,
get_task_result,
wait_for_task,
get_celery_app
)
ASYNC_ENABLED = True
except ImportError:
ASYNC_ENABLED = False
print("⚠️ Celery não disponível, tasks executarão sincronamente")
Passo 4: Criar Tasks Assíncronas
from common.common_async import async_task
from common.common_logging import get_logger
logger = get_logger(__name__)
@async_task(name="app.process_heavy_data")
def process_heavy_data(data: dict) -> dict:
"""
Processa dados pesados de forma assíncrona.
Args:
data: Dados a processar
Returns:
Dados processados
"""
logger.info(f"Processando {len(data)} itens...")
# Processamento pesado aqui
# Exemplo: análise de dados, geração de relatórios, etc.
result = {
"processed": True,
"items": len(data),
"timestamp": datetime.utcnow().isoformat()
}
logger.info("Processamento concluído")
return result
@async_task(name="app.send_bulk_emails")
def send_bulk_emails(recipients: list, subject: str, body: str) -> dict:
"""
Envia emails em massa de forma assíncrona.
Args:
recipients: Lista de destinatários
subject: Assunto do email
body: Corpo do email
Returns:
Resultado do envio
"""
from common.common_notifications import get_notification_service, NotificationChannel
notification_service = get_notification_service()
sent = 0
failed = 0
for recipient in recipients:
try:
await notification_service.send_email(
recipient=recipient,
subject=subject,
body=body
)
sent += 1
except Exception as e:
logger.error(f"Falha ao enviar para {recipient}: {e}")
failed += 1
return {
"sent": sent,
"failed": failed,
"total": len(recipients)
}
@async_task(name="app.generate_report")
def generate_report(org_id: int, report_type: str, start_date: str, end_date: str) -> dict:
"""
Gera relatório de forma assíncrona.
Args:
org_id: ID da organização
report_type: Tipo de relatório
start_date: Data inicial
end_date: Data final
Returns:
Dados do relatório
"""
logger.info(f"Gerando relatório {report_type} para org {org_id}")
# Lógica de geração de relatório
# Exemplo: consultas ao banco, agregações, etc.
return {
"org_id": org_id,
"report_type": report_type,
"start_date": start_date,
"end_date": end_date,
"generated_at": datetime.utcnow().isoformat(),
"data": {} # Dados do relatório
}
Passo 5: Criar Endpoints FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class ProcessRequest(BaseModel):
data: dict
class TaskStatusResponse(BaseModel):
task_id: str
status: str
result: Optional[dict] = None
error: Optional[str] = None
@app.post("/process")
async def start_processing(request: ProcessRequest):
"""
Inicia processamento assíncrono.
Retorna task_id para acompanhamento.
"""
if not ASYNC_ENABLED:
# Fallback: processar sincronamente
result = process_heavy_data(request.data)
return {"result": result, "sync": True}
# Executar task assincronamente
task_result = delay_task(process_heavy_data, request.data)
if task_result is None:
# Fallback se Celery não disponível
result = process_heavy_data(request.data)
return {"result": result, "sync": True}
return {
"task_id": task_result.id,
"status": "started",
"message": "Processamento iniciado. Use /task/{task_id} para verificar status."
}
@app.get("/task/{task_id}", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
"""
Verifica status de uma task assíncrona.
"""
if not ASYNC_ENABLED:
raise HTTPException(status_code=503, detail="Async processing not enabled")
status = get_task_result(task_id)
if status is None:
raise HTTPException(status_code=404, detail="Task not found")
return TaskStatusResponse(
task_id=task_id,
status=status["status"],
result=status.get("result"),
error=status.get("error")
)
@app.post("/task/{task_id}/wait")
async def wait_for_task_completion(task_id: str, timeout: Optional[float] = 60.0):
"""
Aguarda conclusão de uma task (com timeout).
"""
if not ASYNC_ENABLED:
raise HTTPException(status_code=503, detail="Async processing not enabled")
result = wait_for_task(task_id, timeout=timeout)
if result is None:
raise HTTPException(status_code=404, detail="Task not found")
if result["status"] == "FAILURE":
raise HTTPException(status_code=500, detail=result.get("error", "Task failed"))
return result
Passo 6: Criar Worker Celery
Criar arquivo celery_worker.py no app:
#!/usr/bin/env python3
"""
Celery Worker para processamento assíncrono
"""
import sys
from pathlib import Path
# Adiciona common ao path
COMMON_PATH = str(Path(__file__).resolve().parents[N])
sys.path.insert(0, COMMON_PATH)
from common.common_async import get_celery_app
# Importar tasks para registro
from app.tasks import process_heavy_data, send_bulk_emails, generate_report
celery_app = get_celery_app()
if __name__ == '__main__':
celery_app.start()
Passo 7: Executar Worker
# Worker básico
celery -A celery_worker worker --loglevel=info
# Worker com múltiplos processos
celery -A celery_worker worker --loglevel=info --concurrency=4
# Worker com fila específica
celery -A celery_worker worker --loglevel=info --queues=heavy_tasks
🔄 Exemplo Completo - Integração em App Existente
Exemplo: 4C Decision API
from fastapi import FastAPI, HTTPException
from typing import Optional
import sys
from pathlib import Path
COMMON_PATH = str(Path(__file__).resolve().parents[N])
sys.path.insert(0, COMMON_PATH)
# Imports com fallback
try:
from common.common_async import async_task, delay_task, get_task_result
ASYNC_ENABLED = True
except ImportError:
ASYNC_ENABLED = False
app = FastAPI()
# Task assíncrona para enriquecimento de features
@async_task(name="decision_api.enrich_features")
def enrich_features_async(customer_id: int, org_id: Optional[int] = None) -> dict:
"""
Enriquece features de cliente de forma assíncrona.
"""
from common.common_logging import get_logger
logger = get_logger(__name__)
logger.info(f"Enriquecendo features para customer {customer_id}")
# Lógica de enriquecimento pesada
# Exemplo: múltiplas consultas ao banco, cálculos complexos, etc.
features = {
"customer_id": customer_id,
"enriched": True,
"features": {} # Features enriquecidas
}
return features
@app.post("/decide/async")
async def decide_async(customer_id: int, org_id: Optional[int] = None):
"""
Inicia decisão assíncrona.
"""
if not ASYNC_ENABLED:
# Fallback: decisão síncrona
return await decide_sync(customer_id, org_id)
# Iniciar enriquecimento assíncrono
task_result = delay_task(enrich_features_async, customer_id, org_id)
if task_result is None:
# Fallback se Celery não disponível
return await decide_sync(customer_id, org_id)
return {
"task_id": task_result.id,
"status": "processing",
"message": "Decisão sendo processada. Use /decide/status/{task_id} para verificar."
}
@app.get("/decide/status/{task_id}")
async def get_decision_status(task_id: str):
"""
Verifica status de decisão assíncrona.
"""
if not ASYNC_ENABLED:
raise HTTPException(status_code=503, detail="Async processing not enabled")
status = get_task_result(task_id)
if status is None:
raise HTTPException(status_code=404, detail="Task not found")
return status
📅 Tarefas Agendadas (Celery Beat)
Configurar Tarefas Periódicas
# celery_beat.py
from celery.schedules import crontab
from common.common_async import get_celery_app
celery_app = get_celery_app()
celery_app.conf.beat_schedule = {
'sync-4c-to-csuite': {
'task': 'csuite.sync_4c_to_csuite',
'schedule': crontab(minute='*/15'), # A cada 15 minutos
},
'backup-database': {
'task': 'csuite.backup_database',
'schedule': crontab(hour=2, minute=0), # Diariamente às 2h
},
'enrich-tasks': {
'task': 'sales_manager.enrich_tasks',
'schedule': crontab(minute='*/5'), # A cada 5 minutos
},
}
Executar Celery Beat
celery -A celery_beat beat --loglevel=info
📊 Monitoramento
Flower (Interface Web)
# Instalar
pip install flower
# Executar
celery -A celery_worker flower --port=5555
Acessar: http://localhost:5555
Métricas Prometheus
O Celery expõe métricas automaticamente quando configurado. Integrar com common_metrics.py:
from prometheus_client import Counter, Histogram
from common.common_metrics import create_metrics_router
# Métricas de tasks
task_total = Counter('celery_tasks_total', 'Total tasks', ['task_name', 'status'])
task_duration = Histogram('celery_task_duration_seconds', 'Task duration', ['task_name'])
✅ Checklist de Integração
Configuração
- [ ] Variáveis de ambiente configuradas
- [ ] Redis instalado e rodando
- [ ] Celery instalado (
pip install celery redis)
Código
- [ ] Tasks assíncronas criadas com
@async_task - [ ] Endpoints FastAPI criados para iniciar tasks
- [ ] Endpoints para verificar status de tasks
- [ ] Fallback síncrono implementado
Worker
- [ ] Arquivo
celery_worker.pycriado - [ ] Tasks importadas no worker
- [ ] Worker testado localmente
Produção
- [ ] Worker configurado no Docker/systemd
- [ ] Celery Beat configurado (se necessário)
- [ ] Monitoramento configurado (Flower/Prometheus)
- [ ] Health checks configurados
🎯 Casos de Uso Recomendados
Quando Usar Async Processing
✅ Use quando:
- Processamento pesado (> 1 segundo)
- Geração de relatórios grandes
- Envio de emails em massa
- Sincronização de dados
- Processamento de arquivos
- Análise de dados complexa
❌ Não use quando:
- Resposta precisa ser imediata
- Processamento muito rápido (< 100ms)
- Operações críticas que precisam de resposta síncrona
🔧 Troubleshooting
Task não executa
- Verificar se worker está rodando
- Verificar conexão com Redis
- Verificar logs do worker
Task falha silenciosamente
- Verificar logs do worker
- Verificar exceções na task
- Verificar limites de tempo
Resultado não disponível
- Verificar se
CELERY_RESULT_BACKENDestá configurado - Verificar retenção de resultados no Redis
- Verificar se task foi concluída
📚 Recursos Adicionais
- Documentação Celery: https://docs.celeryproject.org/
- Documentação Async Processing:
docs/ASYNC_PROCESSING.md - Flower: https://flower.readthedocs.io/
Última atualização: 2025-12-01