Event Sourcing - C-Suite
Este documento descreve a implementação do padrão Event Sourcing para decisões e outros eventos de domínio no ecossistema C-Suite.
Visão Geral
O módulo common/common_event_sourcing.py fornece uma implementação completa de Event Sourcing, permitindo que todas as decisões sejam eventos imutáveis que podem ser auditados e reexecutados.
Características
- ✅ Eventos imutáveis
- ✅ Armazenamento persistente
- ✅ Replay de eventos
- ✅ Queries por agregado
- ✅ Auditoria completa
- ✅ Rastreabilidade
Conceitos
Event
Um evento representa algo que aconteceu no sistema:
from common.common_event_sourcing import Event, EventType
event = Event(
event_id="uuid",
event_type=EventType.DECISION_CREATED,
aggregate_id="decision_123",
aggregate_type="decision",
payload={"org_id": 1, "offer": "10% discount"},
metadata={"user_id": "user_456"},
timestamp=datetime.utcnow()
)
Event Store
Armazena todos os eventos de forma imutável:
from common.common_event_sourcing import get_event_store
event_store = get_event_store("csuite")
event_store.append(event)
Aggregate
Um agregado é uma entidade de negócio (ex: Decision, Organization):
# Todos os eventos de uma decisão
events = event_store.get_events(
aggregate_type="decision",
aggregate_id="decision_123"
)
Uso
Registrar Evento de Decisão
from common.common_event_sourcing import record_event, EventType
# Quando uma decisão é criada
event_id = record_event(
event_type=EventType.DECISION_CREATED,
aggregate_id=f"decision_{decision_id}",
aggregate_type="decision",
payload={
"org_id": org_id,
"offer": offer,
"channel": channel,
"message": message,
"timing": timing
},
metadata={
"user_id": user_id,
"request_id": request_id
}
)
Registrar Evento de Execução
# Quando uma decisão é executada
record_event(
event_type=EventType.DECISION_EXECUTED,
aggregate_id=f"decision_{decision_id}",
aggregate_type="decision",
payload={
"executed_at": datetime.utcnow().isoformat(),
"result": "success"
},
metadata={"executor": "system"}
)
Registrar Violação de Política
# Quando uma política é violada
record_event(
event_type=EventType.POLICY_VIOLATED,
aggregate_id=f"decision_{decision_id}",
aggregate_type="decision",
payload={
"policy_name": "min_margin",
"violation_reason": "Margin below minimum"
},
metadata={"policy_engine": "4c"}
)
Obter Histórico de Decisão
from common.common_event_sourcing import get_decision_events
# Obter todos os eventos de uma decisão
events = get_decision_events("decision_123")
for event in events:
print(f"{event.timestamp}: {event.event_type.value}")
print(f" Payload: {event.payload}")
Replay de Eventos
from common.common_event_sourcing import replay_decision
# Replay todos os eventos de uma decisão
def handle_event(event: Event):
print(f"Processing {event.event_type.value}")
# Reconstruir estado ou executar ação
events = replay_decision("decision_123", handle_event)
Tipos de Eventos
Eventos de Decisão
DECISION_CREATED: Decisão foi criadaDECISION_EXECUTED: Decisão foi executadaDECISION_REJECTED: Decisão foi rejeitada
Eventos de Política
POLICY_APPLIED: Política foi aplicadaPOLICY_VIOLATED: Política foi violada
Eventos de Métricas
METRIC_RECORDED: Métrica foi registrada
Eventos de Tasks
TASK_CREATED: Task foi criadaTASK_COMPLETED: Task foi completada
Eventos Customizados
CUSTOM: Evento customizado
Integração com Outros Módulos
Com Audit Log
from common.common_event_sourcing import record_event, EventType
from common.common_audit import audit_log
# Registrar evento e audit log
event_id = record_event(
event_type=EventType.DECISION_CREATED,
aggregate_id=f"decision_{decision_id}",
aggregate_type="decision",
payload=payload
)
audit_log(
action="decision_created",
user_id=user_id,
org_id=org_id,
resource_type="decision",
resource_id=decision_id
)
Com Notificações
from common.common_event_sourcing import record_event, EventType
from common.common_notifications import send_alert
# Registrar evento e notificar
record_event(
event_type=EventType.POLICY_VIOLATED,
aggregate_id=f"decision_{decision_id}",
aggregate_type="decision",
payload={"policy_name": "min_margin"}
)
send_alert(
title="Policy Violated",
message=f"Decision {decision_id} violated policy min_margin"
)
Com Tracing
from common.common_event_sourcing import record_event, EventType
from common.common_tracing import add_trace_event
# Registrar evento e trace
event_id = record_event(
event_type=EventType.DECISION_CREATED,
aggregate_id=f"decision_{decision_id}",
aggregate_type="decision",
payload=payload
)
add_trace_event("decision_event_recorded", {
"event_id": event_id,
"decision_id": decision_id
})
Queries
Por Agregado
events = event_store.get_events(
aggregate_type="decision",
aggregate_id="decision_123"
)
Por Tipo de Evento
events = event_store.get_events(
event_type=EventType.DECISION_CREATED
)
Por Período
from datetime import datetime, timedelta
start_time = datetime.utcnow() - timedelta(days=7)
events = event_store.get_events(
start_time=start_time,
event_type=EventType.DECISION_CREATED
)
Combinado
events = event_store.get_events(
aggregate_type="decision",
event_type=EventType.DECISION_EXECUTED,
start_time=start_time,
end_time=end_time,
limit=100
)
Replay e Reconstrução
Reconstruir Estado
def rebuild_decision_state(decision_id: str) -> Dict[str, Any]:
"""Reconstruir estado de uma decisão a partir dos eventos"""
events = get_decision_events(decision_id)
state = {
"created": False,
"executed": False,
"rejected": False,
"policies_applied": [],
"policies_violated": []
}
for event in events:
if event.event_type == EventType.DECISION_CREATED:
state["created"] = True
state.update(event.payload)
elif event.event_type == EventType.DECISION_EXECUTED:
state["executed"] = True
elif event.event_type == EventType.DECISION_REJECTED:
state["rejected"] = True
elif event.event_type == EventType.POLICY_APPLIED:
state["policies_applied"].append(event.payload)
elif event.event_type == EventType.POLICY_VIOLATED:
state["policies_violated"].append(event.payload)
return state
Schema do Banco
O módulo cria automaticamente a tabela event_store:
CREATE TABLE event_store (
event_id VARCHAR(36) PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
metadata JSON NOT NULL,
timestamp DATETIME NOT NULL,
version INT NOT NULL DEFAULT 1,
INDEX idx_aggregate (aggregate_type, aggregate_id),
INDEX idx_timestamp (timestamp),
INDEX idx_event_type (event_type)
);
Best Practices
- Eventos são imutáveis: Nunca modifique eventos existentes
- Payload completo: Inclua todos os dados necessários no payload
- Metadata útil: Use metadata para contexto adicional
- Versionamento: Use version para evolução de eventos
- Queries eficientes: Use índices para queries rápidas
Performance
Índices
A tabela tem índices em:
- aggregate_type, aggregate_id - Para queries por agregado
- timestamp - Para queries temporais
- event_type - Para filtros por tipo
Otimizações
- Use
limitem queries para evitar carregar muitos eventos - Use
start_timeeend_timepara queries temporais - Considere arquivamento de eventos antigos