Event Sourcing

Event Sourcing - C-Suite

Este documento descreve a implementação do padrão Event Sourcing para decisões e outros eventos de domínio no ecossistema C-Suite.

Visão Geral

O módulo common/common_event_sourcing.py fornece uma implementação completa de Event Sourcing, permitindo que todas as decisões sejam eventos imutáveis que podem ser auditados e reexecutados.

Características

Conceitos

Event

Um evento representa algo que aconteceu no sistema:

from common.common_event_sourcing import Event, EventType

event = Event(
    event_id="uuid",
    event_type=EventType.DECISION_CREATED,
    aggregate_id="decision_123",
    aggregate_type="decision",
    payload={"org_id": 1, "offer": "10% discount"},
    metadata={"user_id": "user_456"},
    timestamp=datetime.utcnow()
)

Event Store

Armazena todos os eventos de forma imutável:

from common.common_event_sourcing import get_event_store

event_store = get_event_store("csuite")
event_store.append(event)

Aggregate

Um agregado é uma entidade de negócio (ex: Decision, Organization):

# Todos os eventos de uma decisão
events = event_store.get_events(
    aggregate_type="decision",
    aggregate_id="decision_123"
)

Uso

Registrar Evento de Decisão

from common.common_event_sourcing import record_event, EventType

# Quando uma decisão é criada
event_id = record_event(
    event_type=EventType.DECISION_CREATED,
    aggregate_id=f"decision_{decision_id}",
    aggregate_type="decision",
    payload={
        "org_id": org_id,
        "offer": offer,
        "channel": channel,
        "message": message,
        "timing": timing
    },
    metadata={
        "user_id": user_id,
        "request_id": request_id
    }
)

Registrar Evento de Execução

# Quando uma decisão é executada
record_event(
    event_type=EventType.DECISION_EXECUTED,
    aggregate_id=f"decision_{decision_id}",
    aggregate_type="decision",
    payload={
        "executed_at": datetime.utcnow().isoformat(),
        "result": "success"
    },
    metadata={"executor": "system"}
)

Registrar Violação de Política

# Quando uma política é violada
record_event(
    event_type=EventType.POLICY_VIOLATED,
    aggregate_id=f"decision_{decision_id}",
    aggregate_type="decision",
    payload={
        "policy_name": "min_margin",
        "violation_reason": "Margin below minimum"
    },
    metadata={"policy_engine": "4c"}
)

Obter Histórico de Decisão

from common.common_event_sourcing import get_decision_events

# Obter todos os eventos de uma decisão
events = get_decision_events("decision_123")

for event in events:
    print(f"{event.timestamp}: {event.event_type.value}")
    print(f"  Payload: {event.payload}")

Replay de Eventos

from common.common_event_sourcing import replay_decision

# Replay todos os eventos de uma decisão
def handle_event(event: Event):
    print(f"Processing {event.event_type.value}")
    # Reconstruir estado ou executar ação

events = replay_decision("decision_123", handle_event)

Tipos de Eventos

Eventos de Decisão

Eventos de Política

Eventos de Métricas

Eventos de Tasks

Eventos Customizados

Integração com Outros Módulos

Com Audit Log

from common.common_event_sourcing import record_event, EventType
from common.common_audit import audit_log

# Registrar evento e audit log
event_id = record_event(
    event_type=EventType.DECISION_CREATED,
    aggregate_id=f"decision_{decision_id}",
    aggregate_type="decision",
    payload=payload
)

audit_log(
    action="decision_created",
    user_id=user_id,
    org_id=org_id,
    resource_type="decision",
    resource_id=decision_id
)

Com Notificações

from common.common_event_sourcing import record_event, EventType
from common.common_notifications import send_alert

# Registrar evento e notificar
record_event(
    event_type=EventType.POLICY_VIOLATED,
    aggregate_id=f"decision_{decision_id}",
    aggregate_type="decision",
    payload={"policy_name": "min_margin"}
)

send_alert(
    title="Policy Violated",
    message=f"Decision {decision_id} violated policy min_margin"
)

Com Tracing

from common.common_event_sourcing import record_event, EventType
from common.common_tracing import add_trace_event

# Registrar evento e trace
event_id = record_event(
    event_type=EventType.DECISION_CREATED,
    aggregate_id=f"decision_{decision_id}",
    aggregate_type="decision",
    payload=payload
)

add_trace_event("decision_event_recorded", {
    "event_id": event_id,
    "decision_id": decision_id
})

Queries

Por Agregado

events = event_store.get_events(
    aggregate_type="decision",
    aggregate_id="decision_123"
)

Por Tipo de Evento

events = event_store.get_events(
    event_type=EventType.DECISION_CREATED
)

Por Período

from datetime import datetime, timedelta

start_time = datetime.utcnow() - timedelta(days=7)
events = event_store.get_events(
    start_time=start_time,
    event_type=EventType.DECISION_CREATED
)

Combinado

events = event_store.get_events(
    aggregate_type="decision",
    event_type=EventType.DECISION_EXECUTED,
    start_time=start_time,
    end_time=end_time,
    limit=100
)

Replay e Reconstrução

Reconstruir Estado

def rebuild_decision_state(decision_id: str) -> Dict[str, Any]:
    """Reconstruir estado de uma decisão a partir dos eventos"""
    events = get_decision_events(decision_id)

    state = {
        "created": False,
        "executed": False,
        "rejected": False,
        "policies_applied": [],
        "policies_violated": []
    }

    for event in events:
        if event.event_type == EventType.DECISION_CREATED:
            state["created"] = True
            state.update(event.payload)
        elif event.event_type == EventType.DECISION_EXECUTED:
            state["executed"] = True
        elif event.event_type == EventType.DECISION_REJECTED:
            state["rejected"] = True
        elif event.event_type == EventType.POLICY_APPLIED:
            state["policies_applied"].append(event.payload)
        elif event.event_type == EventType.POLICY_VIOLATED:
            state["policies_violated"].append(event.payload)

    return state

Schema do Banco

O módulo cria automaticamente a tabela event_store:

CREATE TABLE event_store (
    event_id VARCHAR(36) PRIMARY KEY,
    event_type VARCHAR(100) NOT NULL,
    aggregate_id VARCHAR(255) NOT NULL,
    aggregate_type VARCHAR(100) NOT NULL,
    payload JSON NOT NULL,
    metadata JSON NOT NULL,
    timestamp DATETIME NOT NULL,
    version INT NOT NULL DEFAULT 1,
    INDEX idx_aggregate (aggregate_type, aggregate_id),
    INDEX idx_timestamp (timestamp),
    INDEX idx_event_type (event_type)
);

Best Practices

  1. Eventos são imutáveis: Nunca modifique eventos existentes
  2. Payload completo: Inclua todos os dados necessários no payload
  3. Metadata útil: Use metadata para contexto adicional
  4. Versionamento: Use version para evolução de eventos
  5. Queries eficientes: Use índices para queries rápidas

Performance

Índices

A tabela tem índices em:
- aggregate_type, aggregate_id - Para queries por agregado
- timestamp - Para queries temporais
- event_type - Para filtros por tipo

Otimizações

Referências

🔊 Text-to-Speech

1.0x
1.0
Pronto para reproduzir