Integracao Async Processing

🔄 Guia de Integração - Async Processing (Celery)

Data: 2025-12-01
Status:Guia Criado


🎯 Objetivo

Este guia mostra como integrar processamento assíncrono usando Celery nos apps do ecossistema C-Suite para mover processamentos pesados para background tasks.


📦 Módulo Disponível


🚀 Integração Passo a Passo

Passo 1: Configurar Variáveis de Ambiente

# Redis como broker e backend
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0

# Serialização
CELERY_TASK_SERIALIZER=json
CELERY_RESULT_SERIALIZER=json
CELERY_ACCEPT_CONTENT=json

# Timezone
CELERY_TIMEZONE=UTC
CELERY_ENABLE_UTC=true

# Limites de tempo
CELERY_TASK_TIME_LIMIT=300  # 5 minutos
CELERY_TASK_SOFT_TIME_LIMIT=240  # 4 minutos

Passo 2: Instalar Dependências

pip install celery redis

Passo 3: Importar e Configurar no App

import sys
from pathlib import Path

# Adiciona common ao path
COMMON_PATH = str(Path(__file__).resolve().parents[N])  # Ajuste N
sys.path.insert(0, COMMON_PATH)

try:
    from common.common_async import (
        async_task,
        delay_task,
        get_task_result,
        wait_for_task,
        get_celery_app
    )
    ASYNC_ENABLED = True
except ImportError:
    ASYNC_ENABLED = False
    print("⚠️  Celery não disponível, tasks executarão sincronamente")

Passo 4: Criar Tasks Assíncronas

from common.common_async import async_task
from common.common_logging import get_logger

logger = get_logger(__name__)

@async_task(name="app.process_heavy_data")
def process_heavy_data(data: dict) -> dict:
    """
    Processa dados pesados de forma assíncrona.

    Args:
        data: Dados a processar

    Returns:
        Dados processados
    """
    logger.info(f"Processando {len(data)} itens...")

    # Processamento pesado aqui
    # Exemplo: análise de dados, geração de relatórios, etc.

    result = {
        "processed": True,
        "items": len(data),
        "timestamp": datetime.utcnow().isoformat()
    }

    logger.info("Processamento concluído")
    return result

@async_task(name="app.send_bulk_emails")
def send_bulk_emails(recipients: list, subject: str, body: str) -> dict:
    """
    Envia emails em massa de forma assíncrona.

    Args:
        recipients: Lista de destinatários
        subject: Assunto do email
        body: Corpo do email

    Returns:
        Resultado do envio
    """
    from common.common_notifications import get_notification_service, NotificationChannel

    notification_service = get_notification_service()
    sent = 0
    failed = 0

    for recipient in recipients:
        try:
            await notification_service.send_email(
                recipient=recipient,
                subject=subject,
                body=body
            )
            sent += 1
        except Exception as e:
            logger.error(f"Falha ao enviar para {recipient}: {e}")
            failed += 1

    return {
        "sent": sent,
        "failed": failed,
        "total": len(recipients)
    }

@async_task(name="app.generate_report")
def generate_report(org_id: int, report_type: str, start_date: str, end_date: str) -> dict:
    """
    Gera relatório de forma assíncrona.

    Args:
        org_id: ID da organização
        report_type: Tipo de relatório
        start_date: Data inicial
        end_date: Data final

    Returns:
        Dados do relatório
    """
    logger.info(f"Gerando relatório {report_type} para org {org_id}")

    # Lógica de geração de relatório
    # Exemplo: consultas ao banco, agregações, etc.

    return {
        "org_id": org_id,
        "report_type": report_type,
        "start_date": start_date,
        "end_date": end_date,
        "generated_at": datetime.utcnow().isoformat(),
        "data": {}  # Dados do relatório
    }

Passo 5: Criar Endpoints FastAPI

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class ProcessRequest(BaseModel):
    data: dict

class TaskStatusResponse(BaseModel):
    task_id: str
    status: str
    result: Optional[dict] = None
    error: Optional[str] = None

@app.post("/process")
async def start_processing(request: ProcessRequest):
    """
    Inicia processamento assíncrono.

    Retorna task_id para acompanhamento.
    """
    if not ASYNC_ENABLED:
        # Fallback: processar sincronamente
        result = process_heavy_data(request.data)
        return {"result": result, "sync": True}

    # Executar task assincronamente
    task_result = delay_task(process_heavy_data, request.data)

    if task_result is None:
        # Fallback se Celery não disponível
        result = process_heavy_data(request.data)
        return {"result": result, "sync": True}

    return {
        "task_id": task_result.id,
        "status": "started",
        "message": "Processamento iniciado. Use /task/{task_id} para verificar status."
    }

@app.get("/task/{task_id}", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
    """
    Verifica status de uma task assíncrona.
    """
    if not ASYNC_ENABLED:
        raise HTTPException(status_code=503, detail="Async processing not enabled")

    status = get_task_result(task_id)

    if status is None:
        raise HTTPException(status_code=404, detail="Task not found")

    return TaskStatusResponse(
        task_id=task_id,
        status=status["status"],
        result=status.get("result"),
        error=status.get("error")
    )

@app.post("/task/{task_id}/wait")
async def wait_for_task_completion(task_id: str, timeout: Optional[float] = 60.0):
    """
    Aguarda conclusão de uma task (com timeout).
    """
    if not ASYNC_ENABLED:
        raise HTTPException(status_code=503, detail="Async processing not enabled")

    result = wait_for_task(task_id, timeout=timeout)

    if result is None:
        raise HTTPException(status_code=404, detail="Task not found")

    if result["status"] == "FAILURE":
        raise HTTPException(status_code=500, detail=result.get("error", "Task failed"))

    return result

Passo 6: Criar Worker Celery

Criar arquivo celery_worker.py no app:

#!/usr/bin/env python3
"""
Celery Worker para processamento assíncrono
"""
import sys
from pathlib import Path

# Adiciona common ao path
COMMON_PATH = str(Path(__file__).resolve().parents[N])
sys.path.insert(0, COMMON_PATH)

from common.common_async import get_celery_app

# Importar tasks para registro
from app.tasks import process_heavy_data, send_bulk_emails, generate_report

celery_app = get_celery_app()

if __name__ == '__main__':
    celery_app.start()

Passo 7: Executar Worker

# Worker básico
celery -A celery_worker worker --loglevel=info

# Worker com múltiplos processos
celery -A celery_worker worker --loglevel=info --concurrency=4

# Worker com fila específica
celery -A celery_worker worker --loglevel=info --queues=heavy_tasks

🔄 Exemplo Completo - Integração em App Existente

Exemplo: 4C Decision API

from fastapi import FastAPI, HTTPException
from typing import Optional
import sys
from pathlib import Path

COMMON_PATH = str(Path(__file__).resolve().parents[N])
sys.path.insert(0, COMMON_PATH)

# Imports com fallback
try:
    from common.common_async import async_task, delay_task, get_task_result
    ASYNC_ENABLED = True
except ImportError:
    ASYNC_ENABLED = False

app = FastAPI()

# Task assíncrona para enriquecimento de features
@async_task(name="decision_api.enrich_features")
def enrich_features_async(customer_id: int, org_id: Optional[int] = None) -> dict:
    """
    Enriquece features de cliente de forma assíncrona.
    """
    from common.common_logging import get_logger
    logger = get_logger(__name__)

    logger.info(f"Enriquecendo features para customer {customer_id}")

    # Lógica de enriquecimento pesada
    # Exemplo: múltiplas consultas ao banco, cálculos complexos, etc.

    features = {
        "customer_id": customer_id,
        "enriched": True,
        "features": {}  # Features enriquecidas
    }

    return features

@app.post("/decide/async")
async def decide_async(customer_id: int, org_id: Optional[int] = None):
    """
    Inicia decisão assíncrona.
    """
    if not ASYNC_ENABLED:
        # Fallback: decisão síncrona
        return await decide_sync(customer_id, org_id)

    # Iniciar enriquecimento assíncrono
    task_result = delay_task(enrich_features_async, customer_id, org_id)

    if task_result is None:
        # Fallback se Celery não disponível
        return await decide_sync(customer_id, org_id)

    return {
        "task_id": task_result.id,
        "status": "processing",
        "message": "Decisão sendo processada. Use /decide/status/{task_id} para verificar."
    }

@app.get("/decide/status/{task_id}")
async def get_decision_status(task_id: str):
    """
    Verifica status de decisão assíncrona.
    """
    if not ASYNC_ENABLED:
        raise HTTPException(status_code=503, detail="Async processing not enabled")

    status = get_task_result(task_id)

    if status is None:
        raise HTTPException(status_code=404, detail="Task not found")

    return status

📅 Tarefas Agendadas (Celery Beat)

Configurar Tarefas Periódicas

# celery_beat.py
from celery.schedules import crontab
from common.common_async import get_celery_app

celery_app = get_celery_app()

celery_app.conf.beat_schedule = {
    'sync-4c-to-csuite': {
        'task': 'csuite.sync_4c_to_csuite',
        'schedule': crontab(minute='*/15'),  # A cada 15 minutos
    },
    'backup-database': {
        'task': 'csuite.backup_database',
        'schedule': crontab(hour=2, minute=0),  # Diariamente às 2h
    },
    'enrich-tasks': {
        'task': 'sales_manager.enrich_tasks',
        'schedule': crontab(minute='*/5'),  # A cada 5 minutos
    },
}

Executar Celery Beat

celery -A celery_beat beat --loglevel=info

📊 Monitoramento

Flower (Interface Web)

# Instalar
pip install flower

# Executar
celery -A celery_worker flower --port=5555

Acessar: http://localhost:5555

Métricas Prometheus

O Celery expõe métricas automaticamente quando configurado. Integrar com common_metrics.py:

from prometheus_client import Counter, Histogram
from common.common_metrics import create_metrics_router

# Métricas de tasks
task_total = Counter('celery_tasks_total', 'Total tasks', ['task_name', 'status'])
task_duration = Histogram('celery_task_duration_seconds', 'Task duration', ['task_name'])

✅ Checklist de Integração

Configuração

Código

Worker

Produção


🎯 Casos de Uso Recomendados

Quando Usar Async Processing

Use quando:
- Processamento pesado (> 1 segundo)
- Geração de relatórios grandes
- Envio de emails em massa
- Sincronização de dados
- Processamento de arquivos
- Análise de dados complexa

Não use quando:
- Resposta precisa ser imediata
- Processamento muito rápido (< 100ms)
- Operações críticas que precisam de resposta síncrona


🔧 Troubleshooting

Task não executa

Task falha silenciosamente

Resultado não disponível


📚 Recursos Adicionais


Última atualização: 2025-12-01

🔊 Text-to-Speech

1.0x
1.0
Pronto para reproduzir