📋 Plano de Integração: Pricing Agent → CSuite Pricing Engine
Data: 2025-01-03
Status: Planejamento
Versão: 1.0
🎯 Objetivo
Integrar o Pricing Agent (agents/pricing) com o CSuite Pricing Engine completo, alinhando a implementação técnica com a especificação de governança e o schema csuite_pricing existente.
Resultado Esperado
- ✅ Agente usa
sp_price_compute_v7/v8como motor de decisão - ✅ Suporte completo a Cliente Âncora e Preço Âncora
- ✅ Respeita PT, Piso Técnico e Corredores de Decisão
- ✅ Integra com schema
csuite_pricing(12 tabelas) - ✅ Mantém compatibilidade com Agent Loop v1 e Policy Engine
- ✅ Auditoria completa via
price_calculationseprice_incidents
📊 Análise das Lacunas Atuais
1. Conceitos Não Implementados
| Conceito CSuite Pricing | Status Atual | Necessário |
|---|---|---|
| Preço de Tela (PT) | ❌ Não existe | ✅ Consultar price_screen |
| Piso Técnico | ❌ Não existe | ✅ Consultar price_floor |
| Corredor de Decisão | ❌ Não existe | ✅ Calcular PT - Piso |
| Elasticidade (NONE/LOW/MEDIUM/HIGH/MAX) | ⚠️ Numérico apenas | ✅ Mapear para ENUM |
| Cliente Âncora | ❌ Não existe | ✅ Consultar pricing_anchor_prices |
| Marca Principal | ❌ Não existe | ✅ Consultar brands.brand_role |
| Classificação de Clientes | ❌ Não existe | ✅ Consultar customer_brand_profile |
Motor SQL (sp_price_compute_v7/v8) |
❌ Não usa | ✅ Chamar stored procedure |
2. Integrações Faltantes
- ❌ Conexão com schema
csuite_pricing - ❌ Chamada a
sp_price_compute_v7ousp_price_compute_v8 - ❌ Consulta a
pricing_anchor_pricespara Cliente Âncora - ❌ Registro em
price_calculationseprice_incidents - ❌ Validação de PT ≤ Piso (gera incidente)
3. Payload e Contexto
Payload Atual:
{
"org_id": 0,
"sku": {"id": "...", "cost": 8000, "margin": 20},
"inventory": {"days_on_hand": 45, "turn_rate": 0.5},
"market": {"elasticity": -1.2, "competitor_price": 10000},
"policy": {"floor_margin": 18, "max_discount": 5}
}
Payload Necessário (CSuite Pricing):
{
"org_id": 0,
"brand_id": 1,
"customer_id": 123,
"sku_id": 456,
"sku_qty": 5,
"order_value": 50000,
"payment_term": "standard", # standard | short | extended
"stock_level": "normal", # normal | high
"machine_curve": "A" # A | B | C
}
🏗️ Arquitetura Proposta
Fluxo de Decisão Integrado
┌─────────────────┐
│ Pricing Agent │
│ (FastAPI) │
└────────┬────────┘
│
│ 1. Recebe payload
▼
┌─────────────────┐
│ Context Builder │
│ (context.py) │
└────────┬────────┘
│
│ 2. Enriquece contexto
│ - Consulta price_screen (PT)
│ - Consulta price_floor (Piso)
│ - Consulta customer_brand_profile
│ - Verifica Cliente Âncora
│ - Consulta brands.brand_role
▼
┌─────────────────┐
│ Anchor Check │
│ (anchor.py) │
└────────┬────────┘
│
│ 3. Cliente Âncora?
├─── SIM → Consulta pricing_anchor_prices
│ Retorna preço estável
│ EXIT
│
└─── NÃO → Continua para motor
▼
┌─────────────────┐
│ Pricing Engine │
│ (engine.py) │
└────────┬────────┘
│
│ 4. Chama sp_price_compute_v8
│ - Valida PT e Piso
│ - Determina elasticidade
│ - Calcula corredor
│ - Retorna final_price
▼
┌─────────────────┐
│ Validation │
│ (policies.py) │
└────────┬────────┘
│
│ 5. Valida decisão
│ - PT ≤ Piso? → PRICE_INCIDENT
│ - Margem mínima respeitada?
│ - Elasticidade dentro do corredor?
▼
┌─────────────────┐
│ Execution │
│ (execution.py) │
└────────┬────────┘
│
│ 6. Registra em price_calculations
│ - Log completo do cálculo
│ - Auditoria
▼
┌─────────────────┐
│ Policy Engine │
│ (instrumentation)│
└─────────────────┘
📦 Fases de Implementação
Fase 1: Infraestrutura e Conexão (Prioridade: ALTA)
Objetivo: Estabelecer conexão com schema csuite_pricing e criar módulos base.
1.1. Configuração de Banco de Dados
Arquivo: agents/pricing/database.py (NOVO)
"""
CSuite Pricing Agent - Database Connection
"""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from typing import Optional
import os
import logging
logger = logging.getLogger(__name__)
# Configuração de conexão com csuite_pricing
def get_pricing_db_url() -> str:
"""Retorna URL de conexão para schema csuite_pricing"""
host = os.getenv("CSUITE_DB_HOST", "localhost")
port = os.getenv("CSUITE_DB_PORT", "3306")
user = os.getenv("CSUITE_DB_USER", "root")
password = os.getenv("CSUITE_DB_PASSWORD", "")
database = "csuite_pricing" # Schema específico
return f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4"
# Engine e Session Factory
_pricing_engine = None
_PricingSession = None
def get_pricing_session() -> Session:
"""Retorna sessão SQLAlchemy para csuite_pricing"""
global _pricing_engine, _PricingSession
if _pricing_engine is None:
db_url = get_pricing_db_url()
_pricing_engine = create_engine(
db_url,
pool_pre_ping=True,
pool_recycle=3600,
echo=False
)
_PricingSession = sessionmaker(bind=_pricing_engine)
return _PricingSession()
def close_pricing_session(session: Session):
"""Fecha sessão de banco"""
try:
session.close()
except Exception as e:
logger.warning(f"Erro ao fechar sessão: {e}")
1.2. Módulo de Consultas Base
Arquivo: agents/pricing/repository.py (NOVO)
"""
CSuite Pricing Agent - Repository (Consultas ao Schema)
"""
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import Optional, Dict, Any
import logging
from .database import get_pricing_session, close_pricing_session
logger = logging.getLogger(__name__)
def get_price_screen(org_id: int, sku_id: int) -> Optional[float]:
"""Consulta Preço de Tela (PT) para SKU"""
db = get_pricing_session()
try:
result = db.execute(
text("""
SELECT screen_price
FROM price_screen
WHERE org_id = :org_id AND sku_id = :sku_id
LIMIT 1
"""),
{"org_id": org_id, "sku_id": sku_id}
)
row = result.fetchone()
return float(row[0]) if row else None
except Exception as e:
logger.error(f"Erro ao consultar price_screen: {e}")
return None
finally:
close_pricing_session(db)
def get_price_floor(org_id: int, sku_id: int) -> Optional[float]:
"""Consulta Piso Técnico para SKU"""
db = get_pricing_session()
try:
result = db.execute(
text("""
SELECT floor_price
FROM price_floor
WHERE org_id = :org_id AND sku_id = :sku_id
LIMIT 1
"""),
{"org_id": org_id, "sku_id": sku_id}
)
row = result.fetchone()
return float(row[0]) if row else None
except Exception as e:
logger.error(f"Erro ao consultar price_floor: {e}")
return None
finally:
close_pricing_session(db)
def get_customer_brand_profile(org_id: int, customer_id: int, brand_id: int) -> Optional[Dict[str, Any]]:
"""Consulta perfil de marca do cliente"""
db = get_pricing_session()
try:
result = db.execute(
text("""
SELECT
customer_brand_role,
volume_tier,
payment_term_profile
FROM customer_brand_profile
WHERE org_id = :org_id
AND customer_id = :customer_id
AND brand_id = :brand_id
LIMIT 1
"""),
{"org_id": org_id, "customer_id": customer_id, "brand_id": brand_id}
)
row = result.fetchone()
if row:
return {
"customer_brand_role": row[0], # anchor, strategic, motor, etc.
"volume_tier": row[1],
"payment_term_profile": row[2]
}
return None
except Exception as e:
logger.error(f"Erro ao consultar customer_brand_profile: {e}")
return None
finally:
close_pricing_session(db)
def get_brand_role(brand_id: int) -> Optional[str]:
"""Consulta role da marca (principal, secondary_target, tertiary_flexible)"""
db = get_pricing_session()
try:
result = db.execute(
text("""
SELECT brand_role
FROM brands
WHERE brand_id = :brand_id
LIMIT 1
"""),
{"brand_id": brand_id}
)
row = result.fetchone()
return row[0] if row else None
except Exception as e:
logger.error(f"Erro ao consultar brand_role: {e}")
return None
finally:
close_pricing_session(db)
def get_anchor_price(org_id: int, sku_id: int, policy_version: str = "v1.0") -> Optional[Dict[str, Any]]:
"""Consulta Preço Âncora para Cliente Âncora"""
db = get_pricing_session()
try:
result = db.execute(
text("""
SELECT
anchor_price,
screen_price_pt,
floor_price,
elasticity_used,
discount_pct
FROM pricing_anchor_prices
WHERE org_id = :org_id
AND sku_id = :sku_id
AND policy_version = :policy_version
LIMIT 1
"""),
{"org_id": org_id, "sku_id": sku_id, "policy_version": policy_version}
)
row = result.fetchone()
if row:
return {
"anchor_price": float(row[0]),
"screen_price_pt": float(row[1]),
"floor_price": float(row[2]),
"elasticity_used": row[3],
"discount_pct": float(row[4])
}
return None
except Exception as e:
logger.error(f"Erro ao consultar pricing_anchor_prices: {e}")
return None
finally:
close_pricing_session(db)
Checklist Fase 1:
- [ ] Criar agents/pricing/database.py
- [ ] Criar agents/pricing/repository.py
- [ ] Adicionar variáveis de ambiente para conexão
- [ ] Testar conexão com schema csuite_pricing
- [ ] Validar consultas básicas (PT, Piso, Brand Role)
Fase 2: Suporte a Cliente Âncora (Prioridade: ALTA)
Objetivo: Implementar lógica de Cliente Âncora conforme especificação.
2.1. Módulo de Cliente Âncora
Arquivo: agents/pricing/anchor.py (NOVO)
"""
CSuite Pricing Agent - Cliente Âncora Handler
"""
from typing import Dict, Any, Optional
import logging
from .repository import get_customer_brand_profile, get_anchor_price
logger = logging.getLogger(__name__)
def is_anchor_customer(org_id: int, customer_id: int, brand_id: int) -> bool:
"""
Verifica se cliente é Âncora para a marca
Regra: customer_brand_role == 'anchor'
"""
profile = get_customer_brand_profile(org_id, customer_id, brand_id)
if not profile:
return False
return profile.get("customer_brand_role") == "anchor"
def get_anchor_price_for_sku(
org_id: int,
customer_id: int,
brand_id: int,
sku_id: int,
policy_version: str = "v1.0"
) -> Optional[Dict[str, Any]]:
"""
Retorna Preço Âncora para Cliente Âncora
Regra de Prioridade (conforme especificação):
IF customer_type == 'anchor' THEN
price = pricing_anchor_prices[sku_id, policy_version]
EXIT
END IF
"""
if not is_anchor_customer(org_id, customer_id, brand_id):
return None
anchor_data = get_anchor_price(org_id, sku_id, policy_version)
if not anchor_data:
logger.warning(
f"Cliente {customer_id} é Âncora mas não há Preço Âncora "
f"para SKU {sku_id} (policy_version: {policy_version})"
)
return None
return {
"price": anchor_data["anchor_price"],
"screen_price_pt": anchor_data["screen_price_pt"],
"floor_price": anchor_data["floor_price"],
"elasticity_used": anchor_data["elasticity_used"],
"discount_pct": anchor_data["discount_pct"],
"mode": "ANCHOR_TABLE",
"reason": "Preço Âncora derivado da política vigente"
}
2.2. Integração no Context Builder
Modificar: agents/pricing/context.py
# Adicionar no início
from .anchor import is_anchor_customer, get_anchor_price_for_sku
from .repository import get_price_screen, get_price_floor, get_brand_role, get_customer_brand_profile
def build_context(payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Contexto enriquecido com dados do schema csuite_pricing
Agora inclui:
- PT (Preço de Tela)
- Piso Técnico
- Brand Role
- Customer Brand Profile
- Cliente Âncora (flag)
"""
org_id = payload.get("org_id", 0)
brand_id = payload.get("brand_id")
customer_id = payload.get("customer_id")
sku_id = payload.get("sku_id")
# Consultas ao schema
pt = get_price_screen(org_id, sku_id) if sku_id else None
piso = get_price_floor(org_id, sku_id) if sku_id else None
brand_role = get_brand_role(brand_id) if brand_id else None
customer_profile = None
if org_id and customer_id and brand_id:
customer_profile = get_customer_brand_profile(org_id, customer_id, brand_id)
is_anchor = is_anchor_customer(org_id, customer_id, brand_id) if (org_id and customer_id and brand_id) else False
# Corredor de Decisão
decision_corridor = None
if pt and piso:
decision_corridor = pt - piso
return {
"org_id": org_id,
"brand_id": brand_id,
"customer_id": customer_id,
"sku_id": sku_id,
"price_screen_pt": pt,
"price_floor": piso,
"decision_corridor": decision_corridor,
"brand_role": brand_role,
"customer_profile": customer_profile,
"is_anchor_customer": is_anchor,
"sku_qty": payload.get("sku_qty", 1),
"order_value": payload.get("order_value", 0),
"payment_term": payload.get("payment_term", "standard"),
"stock_level": payload.get("stock_level", "normal"),
"machine_curve": payload.get("machine_curve", "B")
}
Checklist Fase 2:
- [ ] Criar agents/pricing/anchor.py
- [ ] Modificar context.py para enriquecer contexto
- [ ] Testar detecção de Cliente Âncora
- [ ] Testar consulta de Preço Âncora
- [ ] Validar fluxo de saída antecipada (EXIT) para Âncora
Fase 3: Integração com Motor SQL (Prioridade: CRÍTICA)
Objetivo: Substituir lógica Python por chamada a sp_price_compute_v8.
3.1. Módulo do Motor de Preços
Arquivo: agents/pricing/engine.py (NOVO)
"""
CSuite Pricing Agent - Pricing Engine (Motor SQL)
"""
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import Dict, Any, Optional
import logging
from .database import get_pricing_session, close_pricing_session
logger = logging.getLogger(__name__)
def compute_price_v8(
org_id: int,
brand_id: int,
customer_id: Optional[int],
sku_id: int,
sku_qty: int,
order_value: float,
payment_term: str, # standard | short | extended
stock_level: str, # normal | high
machine_curve: str # A | B | C
) -> Dict[str, Any]:
"""
Chama stored procedure sp_price_compute_v8
Retorna:
{
"status": "OK" | "INCIDENT",
"reason": "...",
"final_price": float,
"discount_allowed": float,
"applied_mode": "ANCHOR_TABLE" | "CORRIDOR_PRICE",
"elasticity": "none" | "low" | "medium" | "high" | "max",
"screen_price_pt": float,
"floor_price": float
}
"""
db = get_pricing_session()
try:
result = db.execute(
text("""
CALL sp_price_compute_v8(
:p_brand_id,
:p_customer_id,
:p_sku_id,
:p_sku_qty,
:p_order_value,
:p_payment_term,
:p_stock_level,
:p_machine_curve,
@p_status,
@p_reason,
@p_final_price,
@p_discount_allowed,
@p_applied_mode,
@p_elasticity,
@p_screen_price_pt,
@p_floor_price
)
"""),
{
"p_brand_id": brand_id,
"p_customer_id": customer_id,
"p_sku_id": sku_id,
"p_sku_qty": sku_qty,
"p_order_value": order_value,
"p_payment_term": payment_term,
"p_stock_level": stock_level,
"p_machine_curve": machine_curve
}
)
# Buscar variáveis de saída
output = db.execute(text("""
SELECT
@p_status AS status,
@p_reason AS reason,
@p_final_price AS final_price,
@p_discount_allowed AS discount_allowed,
@p_applied_mode AS applied_mode,
@p_elasticity AS elasticity,
@p_screen_price_pt AS screen_price_pt,
@p_floor_price AS floor_price
"""))
row = output.fetchone()
if not row:
raise ValueError("Stored procedure não retornou resultado")
return {
"status": row[0],
"reason": row[1],
"final_price": float(row[2]) if row[2] else None,
"discount_allowed": float(row[3]) if row[3] else None,
"applied_mode": row[4],
"elasticity": row[5],
"screen_price_pt": float(row[6]) if row[6] else None,
"floor_price": float(row[7]) if row[7] else None
}
except Exception as e:
logger.error(f"Erro ao executar sp_price_compute_v8: {e}", exc_info=True)
raise
finally:
close_pricing_session(db)
3.2. Modificar Decision Engine
Modificar: agents/pricing/decision.py
# Substituir _determine_decision_type, _calculate_confidence, _propose_actions
# por chamada ao motor SQL
from .anchor import get_anchor_price_for_sku
from .engine import compute_price_v8
async def run_decision(
payload: Dict[str, Any],
db: Optional[Session] = None
) -> Dict[str, Any]:
org_id = payload.get("org_id", 0)
agent_code = "CSuite.Pricing.Agent"
context = build_context(payload)
context_hash = get_context_hash(context)
# 1. Verificar Cliente Âncora (prioridade)
if context.get("is_anchor_customer"):
anchor_result = get_anchor_price_for_sku(
org_id=org_id,
customer_id=context.get("customer_id"),
brand_id=context.get("brand_id"),
sku_id=context.get("sku_id")
)
if anchor_result:
decision = {
"decision_type": "PRICING.ANCHOR",
"confidence": 1.0, # Máxima confiança para Âncora
"final_price": anchor_result["price"],
"applied_mode": "ANCHOR_TABLE",
"elasticity": anchor_result["elasticity_used"],
"reason": anchor_result["reason"],
"proposed_actions": [{
"type": "APPLY_ANCHOR_PRICE",
"price": anchor_result["price"],
"mode": "ANCHOR_TABLE"
}]
}
# Registrar em price_calculations
_log_price_calculation(context, decision, db)
# Instrumentação Policy Engine
decision_log_id = _instrument_decision(decision, context, agent_code, org_id, db)
return {
"decision": decision,
"context": context,
"execution": {"status": "EXECUTED", "actions": decision["proposed_actions"]},
"decision_log_id": decision_log_id
}
# 2. Cliente não é Âncora → usar motor SQL
try:
engine_result = compute_price_v8(
org_id=org_id,
brand_id=context.get("brand_id"),
customer_id=context.get("customer_id"),
sku_id=context.get("sku_id"),
sku_qty=context.get("sku_qty", 1),
order_value=context.get("order_value", 0),
payment_term=context.get("payment_term", "standard"),
stock_level=context.get("stock_level", "normal"),
machine_curve=context.get("machine_curve", "B")
)
# 3. Verificar incidente (PT ≤ Piso)
if engine_result["status"] == "INCIDENT":
_log_price_incident(context, engine_result, db)
decision = {
"decision_type": "PRICING.INCIDENT",
"confidence": 0.0,
"reason": engine_result["reason"],
"proposed_actions": [{
"type": "BLOCK_PRICE",
"reason": engine_result["reason"]
}]
}
else:
decision = {
"decision_type": "PRICING.COMPUTED",
"confidence": 0.9, # Alta confiança (motor SQL validado)
"final_price": engine_result["final_price"],
"discount_allowed": engine_result["discount_allowed"],
"applied_mode": engine_result["applied_mode"],
"elasticity": engine_result["elasticity"],
"proposed_actions": [{
"type": "UPDATE_PRICE",
"new_price": engine_result["final_price"],
"discount_pct": engine_result["discount_allowed"] * 100,
"elasticity": engine_result["elasticity"]
}]
}
# 4. Registrar cálculo
_log_price_calculation(context, decision, engine_result, db)
# 5. Instrumentação Policy Engine
decision_log_id = _instrument_decision(decision, context, agent_code, org_id, db)
return {
"decision": decision,
"context": context,
"execution": {"status": "EXECUTED", "actions": decision["proposed_actions"]},
"decision_log_id": decision_log_id
}
except Exception as e:
logger.error(f"Erro ao calcular preço: {e}", exc_info=True)
raise
Checklist Fase 3:
- [ ] Criar agents/pricing/engine.py
- [ ] Modificar decision.py para usar motor SQL
- [ ] Testar chamada a sp_price_compute_v8
- [ ] Validar tratamento de incidentes (PT ≤ Piso)
- [ ] Validar fluxo completo (Âncora → Motor → Validação)
Fase 4: Auditoria e Logging (Prioridade: MÉDIA)
Objetivo: Registrar todas as decisões em price_calculations e price_incidents.
4.1. Módulo de Auditoria
Arquivo: agents/pricing/audit.py (NOVO)
"""
CSuite Pricing Agent - Audit Logging
"""
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import Dict, Any, Optional
import logging
from datetime import datetime
from .database import get_pricing_session, close_pricing_session
logger = logging.getLogger(__name__)
def log_price_calculation(
context: Dict[str, Any],
decision: Dict[str, Any],
engine_result: Optional[Dict[str, Any]] = None,
db: Optional[Session] = None
) -> Optional[int]:
"""
Registra cálculo em price_calculations
Retorna calc_id ou None em caso de erro
"""
use_external_db = db is not None
if not use_external_db:
db = get_pricing_session()
try:
result = db.execute(
text("""
INSERT INTO price_calculations (
brand_id, customer_id, sku_id,
PT, PISO,
stock_level, customer_brand_role, payment_term, machine_curve,
sku_qty, order_value,
applied_mode, elasticity,
score_final, discount_allowed, final_price
) VALUES (
:brand_id, :customer_id, :sku_id,
:pt, :piso,
:stock_level, :customer_brand_role, :payment_term, :machine_curve,
:sku_qty, :order_value,
:applied_mode, :elasticity,
:score_final, :discount_allowed, :final_price
)
"""),
{
"brand_id": context.get("brand_id"),
"customer_id": context.get("customer_id"),
"sku_id": context.get("sku_id"),
"pt": context.get("price_screen_pt"),
"piso": context.get("price_floor"),
"stock_level": context.get("stock_level", "normal"),
"customer_brand_role": context.get("customer_profile", {}).get("customer_brand_role", "operational"),
"payment_term": context.get("payment_term", "standard"),
"machine_curve": context.get("machine_curve", "B"),
"sku_qty": context.get("sku_qty", 1),
"order_value": context.get("order_value", 0),
"applied_mode": decision.get("applied_mode", "CORRIDOR_PRICE"),
"elasticity": decision.get("elasticity"),
"score_final": decision.get("confidence", 0.0) * 100,
"discount_allowed": decision.get("discount_allowed", 0.0),
"final_price": decision.get("final_price")
}
)
if not use_external_db:
db.commit()
calc_id = result.lastrowid
logger.info(f"Registrado price_calculation: calc_id={calc_id}")
return calc_id
except Exception as e:
logger.error(f"Erro ao registrar price_calculation: {e}", exc_info=True)
if not use_external_db:
db.rollback()
return None
finally:
if not use_external_db:
close_pricing_session(db)
def log_price_incident(
context: Dict[str, Any],
engine_result: Dict[str, Any],
db: Optional[Session] = None
) -> Optional[int]:
"""
Registra incidente em price_incidents
Retorna incident_id ou None em caso de erro
"""
use_external_db = db is not None
if not use_external_db:
db = get_pricing_session()
try:
result = db.execute(
text("""
INSERT INTO price_incidents (
brand_id, brand_role, customer_id, sku_id,
reason, details,
PT, PISO
) VALUES (
:brand_id, :brand_role, :customer_id, :sku_id,
:reason, :details,
:pt, :piso
)
"""),
{
"brand_id": context.get("brand_id"),
"brand_role": context.get("brand_role"),
"customer_id": context.get("customer_id"),
"sku_id": context.get("sku_id"),
"reason": engine_result.get("reason", "PT_LEQ_PISO"),
"details": f"PT ({context.get('price_screen_pt')}) <= Piso ({context.get('price_floor')})",
"pt": context.get("price_screen_pt"),
"piso": context.get("price_floor")
}
)
if not use_external_db:
db.commit()
incident_id = result.lastrowid
logger.warning(f"Registrado price_incident: incident_id={incident_id}, reason={engine_result.get('reason')}")
return incident_id
except Exception as e:
logger.error(f"Erro ao registrar price_incident: {e}", exc_info=True)
if not use_external_db:
db.rollback()
return None
finally:
if not use_external_db:
close_pricing_session(db)
Modificar: agents/pricing/decision.py
# Adicionar no início
from .audit import log_price_calculation, log_price_incident
# Substituir _log_price_calculation e _log_price_incident por chamadas reais
Checklist Fase 4:
- [ ] Criar agents/pricing/audit.py
- [ ] Integrar logging em decision.py
- [ ] Testar registro em price_calculations
- [ ] Testar registro em price_incidents
- [ ] Validar auditoria completa
Fase 5: Validação e Políticas (Prioridade: MÉDIA)
Objetivo: Alinhar validações com especificação CSuite Pricing.
5.1. Modificar Policies
Modificar: agents/pricing/policies.py
"""
CSuite Pricing Agent - Policy Guardrails (Alinhado com CSuite Pricing)
"""
from typing import Dict, Any
def validate_decision(decision: Dict[str, Any], context: Dict[str, Any]) -> bool:
"""
Valida decisão contra policies do CSuite Pricing
Policies:
- PT > Piso (obrigatório, senão gera incidente)
- Margem mínima respeitada
- Elasticidade dentro do corredor permitido
- Cliente Âncora não recebe desconto elástico
"""
# 1. Validação de PT e Piso
pt = context.get("price_screen_pt")
piso = context.get("price_floor")
if pt and piso and pt <= piso:
return False # Gera incidente (já tratado no engine)
# 2. Validação de Cliente Âncora
if context.get("is_anchor_customer"):
# Cliente Âncora só pode usar ANCHOR_TABLE
if decision.get("applied_mode") != "ANCHOR_TABLE":
return False
# Elasticidade deve ser none ou low
elasticity = decision.get("elasticity")
if elasticity not in ["none", "low"]:
return False
# 3. Validação de Marca Principal
brand_role = context.get("brand_role")
if brand_role == "principal":
# Marca Principal: elasticidade máxima LOW
elasticity = decision.get("elasticity")
if elasticity and elasticity not in ["none", "low"]:
return False
# 4. Validação de margem (se final_price disponível)
final_price = decision.get("final_price")
if final_price:
# TODO: Calcular margem real e validar contra piso
pass
return True
def get_autonomy_level(context: Dict[str, Any]) -> str:
"""
Determina nível de autonomia baseado em contexto CSuite Pricing
L0: Recomendação (Cliente Âncora, Marca Principal)
L1: Ajustes pequenos (elasticidade LOW)
L2: Ajustes médios (elasticidade MEDIUM)
L3: Ajustes agressivos (elasticidade HIGH/MAX)
"""
# Cliente Âncora: sempre L0
if context.get("is_anchor_customer"):
return "L0"
# Marca Principal: máximo L1
if context.get("brand_role") == "principal":
return "L1"
# Elasticidade determina autonomia
elasticity = context.get("elasticity")
if elasticity == "none":
return "L0"
elif elasticity == "low":
return "L1"
elif elasticity == "medium":
return "L2"
elif elasticity in ["high", "max"]:
return "L3"
return "L0" # Padrão conservador
Checklist Fase 5:
- [ ] Modificar policies.py com validações CSuite Pricing
- [ ] Testar validação de Cliente Âncora
- [ ] Testar validação de Marca Principal
- [ ] Testar cálculo de autonomia
- [ ] Validar todas as regras de negócio
Fase 6: Atualização de Payload e API (Prioridade: BAIXA)
Objetivo: Atualizar payload e documentação da API.
6.1. Modificar Main API
Modificar: agents/pricing/main.py
@app.post("/run")
async def run_agent(payload: Dict[str, Any]):
"""
Executa o Pricing Agent (CSuite Pricing Engine)
Payload esperado (CSuite Pricing):
{
"org_id": 1,
"brand_id": 1,
"customer_id": 123,
"sku_id": 456,
"sku_qty": 5,
"order_value": 50000,
"payment_term": "standard", # standard | short | extended
"stock_level": "normal", # normal | high
"machine_curve": "A" # A | B | C
}
Payload legado (compatibilidade):
{
"org_id": 0,
"sku": {"id": "...", "cost": 8000, "margin": 20},
"inventory": {"days_on_hand": 45, "turn_rate": 0.5},
"market": {"elasticity": -1.2, "competitor_price": 10000},
"policy": {"floor_margin": 18, "max_discount": 5}
}
"""
try:
# Validar payload mínimo
if not payload.get("org_id"):
raise HTTPException(status_code=400, detail="org_id é obrigatório")
# Detectar formato (novo vs legado)
is_legacy = "sku" in payload and "inventory" in payload
if is_legacy:
# Converter payload legado para novo formato
# TODO: Implementar conversão
raise HTTPException(
status_code=400,
detail="Payload legado não suportado. Use formato CSuite Pricing."
)
# Validar campos obrigatórios (novo formato)
required_fields = ["brand_id", "sku_id", "customer_id"]
missing = [f for f in required_fields if not payload.get(f)]
if missing:
raise HTTPException(
status_code=400,
detail=f"Campos obrigatórios faltando: {', '.join(missing)}"
)
result = await run_decision(payload)
return JSONResponse(content={
"status": "success",
"agent": AGENT_CODE,
"result": result
})
except HTTPException:
raise
except Exception as e:
logger.error(f"Erro ao executar Pricing Agent: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
Checklist Fase 6:
- [ ] Atualizar documentação da API
- [ ] Validar payload novo formato
- [ ] Adicionar validações de campos obrigatórios
- [ ] Atualizar exemplos de uso
- [ ] Testar compatibilidade (se necessário)
🧪 Plano de Testes
Testes Unitários
- Repository Tests
test_get_price_screen()- Consulta PTtest_get_price_floor()- Consulta Pisotest_get_customer_brand_profile()- Perfil cliente-
test_get_anchor_price()- Preço Âncora -
Anchor Tests
test_is_anchor_customer()- Detecção Âncoratest_get_anchor_price_for_sku()- Retorno Preço Âncora-
test_anchor_exit_flow()- Fluxo de saída antecipada -
Engine Tests
test_compute_price_v8()- Chamada stored proceduretest_incident_pt_leq_piso()- Tratamento incidente-
test_elasticity_mapping()- Mapeamento elasticidade -
Policy Tests
test_validate_anchor_customer()- Validação Âncoratest_validate_principal_brand()- Validação Marca Principaltest_autonomy_levels()- Níveis de autonomia
Testes de Integração
- Fluxo Completo
- Cliente Âncora → Preço Âncora
- Cliente Normal → Motor SQL → Preço Calculado
-
PT ≤ Piso → Incidente registrado
-
Auditoria
- Registro em
price_calculations - Registro em
price_incidents - Integração com Policy Engine
Testes End-to-End
- Cenários Reais
- Pedido Cliente Âncora
- Pedido Cliente Motor (elasticidade MEDIUM)
- Pedido Marca Principal (elasticidade LOW)
- Incidente PT ≤ Piso
📝 Checklist Geral de Implementação
Infraestrutura
- [ ] Fase 1: Database e Repository
- [ ] Fase 2: Cliente Âncora
- [ ] Fase 3: Motor SQL
- [ ] Fase 4: Auditoria
- [ ] Fase 5: Validações
- [ ] Fase 6: API e Payload
Testes
- [ ] Testes unitários (cobertura > 80%)
- [ ] Testes de integração
- [ ] Testes end-to-end
- [ ] Validação com dados reais
Documentação
- [ ] Atualizar README do agente
- [ ] Documentar novo payload
- [ ] Exemplos de uso
- [ ] Guia de migração (se necessário)
Deploy
- [ ] Variáveis de ambiente configuradas
- [ ] Conexão com
csuite_pricingtestada - [ ] Stored procedures disponíveis
- [ ] Monitoramento e logs
🚀 Ordem de Execução Recomendada
- Semana 1: Fases 1 e 2 (Infraestrutura + Cliente Âncora)
- Semana 2: Fase 3 (Motor SQL) - CRÍTICA
- Semana 3: Fases 4 e 5 (Auditoria + Validações)
- Semana 4: Fase 6 (API) + Testes + Documentação
📚 Referências
- Especificação:
csuite-pricing/csuite_core_docs_compete_set.md - Schema:
csuite_pricing(12 tabelas, 2 procedures) - Agent Loop:
docs/AGENT_LOOP_IMPLEMENTATION.md - Policy Engine:
csuite-executive/csuite-api/app/policy_engine/
⚠️ Riscos e Mitigações
| Risco | Mitigação |
|---|---|
Stored procedure sp_price_compute_v8 não existe |
Verificar schema, usar sp_price_compute_v7 como fallback |
| Performance de consultas SQL | Adicionar índices, cache de consultas frequentes |
| Incompatibilidade de payload | Manter compatibilidade temporária, migração gradual |
Falha na conexão com csuite_pricing |
Retry logic, fallback para lógica Python (modo degradado) |
Próximo Passo: Iniciar Fase 1 (Infraestrutura e Conexão)