Async Processing

Async Processing - C-Suite

Este documento descreve o suporte a processamento assíncrono usando Celery para o ecossistema C-Suite.

Visão Geral

O módulo common/common_async.py fornece suporte a processamento assíncrono usando Celery, permitindo mover processamentos pesados para background tasks e melhorar tempo de resposta das APIs.

Características

Configuração

Variáveis de Ambiente

# Celery Broker (Redis)
CELERY_BROKER_URL=redis://localhost:6379/0

# Celery Result Backend
CELERY_RESULT_BACKEND=redis://localhost:6379/0

# Serialization
CELERY_TASK_SERIALIZER=json
CELERY_RESULT_SERIALIZER=json
CELERY_ACCEPT_CONTENT=json

# Timezone
CELERY_TIMEZONE=UTC
CELERY_ENABLE_UTC=true

# Task Configuration
CELERY_TASK_TRACK_STARTED=true
CELERY_TASK_TIME_LIMIT=300  # 5 minutes
CELERY_TASK_SOFT_TIME_LIMIT=240  # 4 minutes

Instalação

pip install celery redis

Uso

Criar Task Assíncrona

from common.common_async import async_task

@async_task(name="process_data")
def process_heavy_data(data: dict) -> dict:
    """Process heavy data asynchronously"""
    # Heavy processing
    return {"processed": True}

Executar Task

from common.common_async import delay_task

# Execute task asynchronously
result = delay_task(process_heavy_data, {"key": "value"})

if result:
    task_id = result.id
    print(f"Task started: {task_id}")

Verificar Status

from common.common_async import get_task_result

# Get task status
status = get_task_result(task_id)
print(f"Status: {status['status']}")
print(f"Result: {status['result']}")

Aguardar Resultado

from common.common_async import wait_for_task

# Wait for task to complete (with timeout)
result = wait_for_task(task_id, timeout=60.0)

if result['status'] == 'SUCCESS':
    data = result['result']

Exemplos

Processamento Pesado

from common.common_async import async_task

@async_task(name="csuite.analyze_metrics")
def analyze_metrics(org_id: int, date_range: tuple):
    """Analyze metrics for organization"""
    from common.common_db_pool import get_db_session

    session = get_db_session("csuite")
    # Heavy analysis
    results = session.execute("""
        SELECT 
            DATE(created_at) as date,
            COUNT(*) as count,
            AVG(amount) as avg_amount
        FROM transactions
        WHERE org_id = :org_id
        AND created_at BETWEEN :start AND :end
        GROUP BY DATE(created_at)
    """, {
        "org_id": org_id,
        "start": date_range[0],
        "end": date_range[1]
    })

    return [dict(row) for row in results]

Notificações em Massa

from common.common_async import async_task

@async_task(name="csuite.send_bulk_emails")
def send_bulk_emails(recipients: list, subject: str, body: str):
    """Send bulk emails asynchronously"""
    from common.common_notifications import send_notification, NotificationChannel

    sent = 0
    for recipient in recipients:
        try:
            send_notification(
                title=subject,
                message=body,
                channel=NotificationChannel.EMAIL,
                recipient=recipient
            )
            sent += 1
        except Exception as e:
            logger.error(f"Failed to send to {recipient}: {e}")

    return {"sent": sent, "total": len(recipients)}

Geração de Relatórios

from common.common_async import async_task
from datetime import datetime

@async_task(name="csuite.generate_report")
def generate_report(org_id: int, report_type: str):
    """Generate report asynchronously"""
    # Report generation logic
    report_data = {
        "org_id": org_id,
        "report_type": report_type,
        "generated_at": datetime.utcnow().isoformat()
    }

    # Save report
    # ...

    return report_data

Integração com FastAPI

Endpoint Assíncrono

from fastapi import FastAPI, BackgroundTasks
from common.common_async import delay_task, get_task_result

app = FastAPI()

@app.post("/process")
async def process_data(data: dict):
    """Start async processing"""
    result = delay_task(process_heavy_data, data)

    if result:
        return {"task_id": result.id, "status": "started"}
    else:
        # Fallback: process synchronously
        return {"result": process_heavy_data(data)}

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    """Get task status"""
    status = get_task_result(task_id)

    if status is None:
        return {"error": "Task not found"}

    return status

Worker Celery

Criar Worker

# celery_worker.py
from common.common_async import get_celery_app

celery_app = get_celery_app()

if __name__ == '__main__':
    celery_app.start()

Executar Worker

celery -A celery_worker worker --loglevel=info

Executar com Beat (Scheduled Tasks)

celery -A celery_worker beat --loglevel=info

Scheduled Tasks

Configurar Tarefas Agendadas

from celery.schedules import crontab
from common.common_async import get_celery_app

celery_app = get_celery_app()

celery_app.conf.beat_schedule = {
    'sync-4c-to-csuite': {
        'task': 'csuite.sync_4c_to_csuite',
        'schedule': crontab(minute='*/15'),  # Every 15 minutes
    },
    'backup-database': {
        'task': 'csuite.backup_database',
        'schedule': crontab(hour=2, minute=0),  # Daily at 2 AM
    },
}

Monitoramento

Flower (Celery Monitoring)

pip install flower
celery -A celery_worker flower

Acesse em http://localhost:5555

Métricas Prometheus

Integre com common/common_metrics.py para expor métricas de tasks.

Best Practices

  1. Use para operações pesadas: Apenas para operações que demoram > 1 segundo
  2. Timeout apropriado: Configure timeouts adequados
  3. Idempotência: Tasks devem ser idempotentes quando possível
  4. Error handling: Trate erros adequadamente
  5. Logging: Use logging para rastrear execução
  6. Monitoramento: Monitore workers e tasks

Troubleshooting

Tasks não executam

  1. Verifique se worker está rodando
  2. Verifique conexão com broker (Redis)
  3. Verifique logs do worker

Tasks falham

  1. Verifique logs do worker
  2. Verifique se dependências estão disponíveis
  3. Verifique timeouts

Performance

  1. Configure número adequado de workers
  2. Use prefetch para melhor throughput
  3. Configure pools apropriados

Referências

🔊 Text-to-Speech

1.0x
1.0
Pronto para reproduzir