Async Processing - C-Suite
Este documento descreve o suporte a processamento assíncrono usando Celery para o ecossistema C-Suite.
Visão Geral
O módulo common/common_async.py fornece suporte a processamento assíncrono usando Celery, permitindo mover processamentos pesados para background tasks e melhorar tempo de resposta das APIs.
Características
- ✅ Integração com Celery
- ✅ Decorators para criar tasks assíncronas
- ✅ Helpers para executar e monitorar tasks
- ✅ Fallback síncrono quando Celery não disponível
- ✅ Configuração via variáveis de ambiente
Configuração
Variáveis de Ambiente
# Celery Broker (Redis)
CELERY_BROKER_URL=redis://localhost:6379/0
# Celery Result Backend
CELERY_RESULT_BACKEND=redis://localhost:6379/0
# Serialization
CELERY_TASK_SERIALIZER=json
CELERY_RESULT_SERIALIZER=json
CELERY_ACCEPT_CONTENT=json
# Timezone
CELERY_TIMEZONE=UTC
CELERY_ENABLE_UTC=true
# Task Configuration
CELERY_TASK_TRACK_STARTED=true
CELERY_TASK_TIME_LIMIT=300 # 5 minutes
CELERY_TASK_SOFT_TIME_LIMIT=240 # 4 minutes
Instalação
pip install celery redis
Uso
Criar Task Assíncrona
from common.common_async import async_task
@async_task(name="process_data")
def process_heavy_data(data: dict) -> dict:
"""Process heavy data asynchronously"""
# Heavy processing
return {"processed": True}
Executar Task
from common.common_async import delay_task
# Execute task asynchronously
result = delay_task(process_heavy_data, {"key": "value"})
if result:
task_id = result.id
print(f"Task started: {task_id}")
Verificar Status
from common.common_async import get_task_result
# Get task status
status = get_task_result(task_id)
print(f"Status: {status['status']}")
print(f"Result: {status['result']}")
Aguardar Resultado
from common.common_async import wait_for_task
# Wait for task to complete (with timeout)
result = wait_for_task(task_id, timeout=60.0)
if result['status'] == 'SUCCESS':
data = result['result']
Exemplos
Processamento Pesado
from common.common_async import async_task
@async_task(name="csuite.analyze_metrics")
def analyze_metrics(org_id: int, date_range: tuple):
"""Analyze metrics for organization"""
from common.common_db_pool import get_db_session
session = get_db_session("csuite")
# Heavy analysis
results = session.execute("""
SELECT
DATE(created_at) as date,
COUNT(*) as count,
AVG(amount) as avg_amount
FROM transactions
WHERE org_id = :org_id
AND created_at BETWEEN :start AND :end
GROUP BY DATE(created_at)
""", {
"org_id": org_id,
"start": date_range[0],
"end": date_range[1]
})
return [dict(row) for row in results]
Notificações em Massa
from common.common_async import async_task
@async_task(name="csuite.send_bulk_emails")
def send_bulk_emails(recipients: list, subject: str, body: str):
"""Send bulk emails asynchronously"""
from common.common_notifications import send_notification, NotificationChannel
sent = 0
for recipient in recipients:
try:
send_notification(
title=subject,
message=body,
channel=NotificationChannel.EMAIL,
recipient=recipient
)
sent += 1
except Exception as e:
logger.error(f"Failed to send to {recipient}: {e}")
return {"sent": sent, "total": len(recipients)}
Geração de Relatórios
from common.common_async import async_task
from datetime import datetime
@async_task(name="csuite.generate_report")
def generate_report(org_id: int, report_type: str):
"""Generate report asynchronously"""
# Report generation logic
report_data = {
"org_id": org_id,
"report_type": report_type,
"generated_at": datetime.utcnow().isoformat()
}
# Save report
# ...
return report_data
Integração com FastAPI
Endpoint Assíncrono
from fastapi import FastAPI, BackgroundTasks
from common.common_async import delay_task, get_task_result
app = FastAPI()
@app.post("/process")
async def process_data(data: dict):
"""Start async processing"""
result = delay_task(process_heavy_data, data)
if result:
return {"task_id": result.id, "status": "started"}
else:
# Fallback: process synchronously
return {"result": process_heavy_data(data)}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
"""Get task status"""
status = get_task_result(task_id)
if status is None:
return {"error": "Task not found"}
return status
Worker Celery
Criar Worker
# celery_worker.py
from common.common_async import get_celery_app
celery_app = get_celery_app()
if __name__ == '__main__':
celery_app.start()
Executar Worker
celery -A celery_worker worker --loglevel=info
Executar com Beat (Scheduled Tasks)
celery -A celery_worker beat --loglevel=info
Scheduled Tasks
Configurar Tarefas Agendadas
from celery.schedules import crontab
from common.common_async import get_celery_app
celery_app = get_celery_app()
celery_app.conf.beat_schedule = {
'sync-4c-to-csuite': {
'task': 'csuite.sync_4c_to_csuite',
'schedule': crontab(minute='*/15'), # Every 15 minutes
},
'backup-database': {
'task': 'csuite.backup_database',
'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
},
}
Monitoramento
Flower (Celery Monitoring)
pip install flower
celery -A celery_worker flower
Acesse em http://localhost:5555
Métricas Prometheus
Integre com common/common_metrics.py para expor métricas de tasks.
Best Practices
- Use para operações pesadas: Apenas para operações que demoram > 1 segundo
- Timeout apropriado: Configure timeouts adequados
- Idempotência: Tasks devem ser idempotentes quando possível
- Error handling: Trate erros adequadamente
- Logging: Use logging para rastrear execução
- Monitoramento: Monitore workers e tasks
Troubleshooting
Tasks não executam
- Verifique se worker está rodando
- Verifique conexão com broker (Redis)
- Verifique logs do worker
Tasks falham
- Verifique logs do worker
- Verifique se dependências estão disponíveis
- Verifique timeouts
Performance
- Configure número adequado de workers
- Use prefetch para melhor throughput
- Configure pools apropriados