Plano Integracao Pricing Agent

📋 Plano de Integração: Pricing Agent → CSuite Pricing Engine

Data: 2025-01-03
Status: Planejamento
Versão: 1.0


🎯 Objetivo

Integrar o Pricing Agent (agents/pricing) com o CSuite Pricing Engine completo, alinhando a implementação técnica com a especificação de governança e o schema csuite_pricing existente.

Resultado Esperado


📊 Análise das Lacunas Atuais

1. Conceitos Não Implementados

Conceito CSuite Pricing Status Atual Necessário
Preço de Tela (PT) ❌ Não existe ✅ Consultar price_screen
Piso Técnico ❌ Não existe ✅ Consultar price_floor
Corredor de Decisão ❌ Não existe ✅ Calcular PT - Piso
Elasticidade (NONE/LOW/MEDIUM/HIGH/MAX) ⚠️ Numérico apenas ✅ Mapear para ENUM
Cliente Âncora ❌ Não existe ✅ Consultar pricing_anchor_prices
Marca Principal ❌ Não existe ✅ Consultar brands.brand_role
Classificação de Clientes ❌ Não existe ✅ Consultar customer_brand_profile
Motor SQL (sp_price_compute_v7/v8) ❌ Não usa ✅ Chamar stored procedure

2. Integrações Faltantes

3. Payload e Contexto

Payload Atual:

{
    "org_id": 0,
    "sku": {"id": "...", "cost": 8000, "margin": 20},
    "inventory": {"days_on_hand": 45, "turn_rate": 0.5},
    "market": {"elasticity": -1.2, "competitor_price": 10000},
    "policy": {"floor_margin": 18, "max_discount": 5}
}

Payload Necessário (CSuite Pricing):

{
    "org_id": 0,
    "brand_id": 1,
    "customer_id": 123,
    "sku_id": 456,
    "sku_qty": 5,
    "order_value": 50000,
    "payment_term": "standard",  # standard | short | extended
    "stock_level": "normal",     # normal | high
    "machine_curve": "A"          # A | B | C
}

🏗️ Arquitetura Proposta

Fluxo de Decisão Integrado

┌─────────────────┐
  Pricing Agent  
   (FastAPI)     
└────────┬────────┘
         
          1. Recebe payload
         
┌─────────────────┐
  Context Builder 
  (context.py)   
└────────┬────────┘
         
          2. Enriquece contexto
             - Consulta price_screen (PT)
             - Consulta price_floor (Piso)
             - Consulta customer_brand_profile
             - Verifica Cliente Âncora
             - Consulta brands.brand_role
         
┌─────────────────┐
  Anchor Check   
  (anchor.py)    
└────────┬────────┘
         
          3. Cliente Âncora?
         ├─── SIM  Consulta pricing_anchor_prices
                   Retorna preço estável
                   EXIT
         
         └─── NÃO  Continua para motor
         
┌─────────────────┐
  Pricing Engine 
  (engine.py)    
└────────┬────────┘
         
          4. Chama sp_price_compute_v8
             - Valida PT e Piso
             - Determina elasticidade
             - Calcula corredor
             - Retorna final_price
         
┌─────────────────┐
  Validation     
  (policies.py)  
└────────┬────────┘
         
          5. Valida decisão
             - PT  Piso?  PRICE_INCIDENT
             - Margem mínima respeitada?
             - Elasticidade dentro do corredor?
         
┌─────────────────┐
  Execution      
  (execution.py) 
└────────┬────────┘
         
          6. Registra em price_calculations
             - Log completo do cálculo
             - Auditoria
         
┌─────────────────┐
  Policy Engine   
  (instrumentation)
└─────────────────┘

📦 Fases de Implementação

Fase 1: Infraestrutura e Conexão (Prioridade: ALTA)

Objetivo: Estabelecer conexão com schema csuite_pricing e criar módulos base.

1.1. Configuração de Banco de Dados

Arquivo: agents/pricing/database.py (NOVO)

"""
CSuite Pricing Agent - Database Connection
"""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from typing import Optional
import os
import logging

logger = logging.getLogger(__name__)

# Configuração de conexão com csuite_pricing
def get_pricing_db_url() -> str:
    """Retorna URL de conexão para schema csuite_pricing"""
    host = os.getenv("CSUITE_DB_HOST", "localhost")
    port = os.getenv("CSUITE_DB_PORT", "3306")
    user = os.getenv("CSUITE_DB_USER", "root")
    password = os.getenv("CSUITE_DB_PASSWORD", "")
    database = "csuite_pricing"  # Schema específico

    return f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4"


# Engine e Session Factory
_pricing_engine = None
_PricingSession = None


def get_pricing_session() -> Session:
    """Retorna sessão SQLAlchemy para csuite_pricing"""
    global _pricing_engine, _PricingSession

    if _pricing_engine is None:
        db_url = get_pricing_db_url()
        _pricing_engine = create_engine(
            db_url,
            pool_pre_ping=True,
            pool_recycle=3600,
            echo=False
        )
        _PricingSession = sessionmaker(bind=_pricing_engine)

    return _PricingSession()


def close_pricing_session(session: Session):
    """Fecha sessão de banco"""
    try:
        session.close()
    except Exception as e:
        logger.warning(f"Erro ao fechar sessão: {e}")

1.2. Módulo de Consultas Base

Arquivo: agents/pricing/repository.py (NOVO)

"""
CSuite Pricing Agent - Repository (Consultas ao Schema)
"""
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import Optional, Dict, Any
import logging

from .database import get_pricing_session, close_pricing_session

logger = logging.getLogger(__name__)


def get_price_screen(org_id: int, sku_id: int) -> Optional[float]:
    """Consulta Preço de Tela (PT) para SKU"""
    db = get_pricing_session()
    try:
        result = db.execute(
            text("""
                SELECT screen_price
                FROM price_screen
                WHERE org_id = :org_id AND sku_id = :sku_id
                LIMIT 1
            """),
            {"org_id": org_id, "sku_id": sku_id}
        )
        row = result.fetchone()
        return float(row[0]) if row else None
    except Exception as e:
        logger.error(f"Erro ao consultar price_screen: {e}")
        return None
    finally:
        close_pricing_session(db)


def get_price_floor(org_id: int, sku_id: int) -> Optional[float]:
    """Consulta Piso Técnico para SKU"""
    db = get_pricing_session()
    try:
        result = db.execute(
            text("""
                SELECT floor_price
                FROM price_floor
                WHERE org_id = :org_id AND sku_id = :sku_id
                LIMIT 1
            """),
            {"org_id": org_id, "sku_id": sku_id}
        )
        row = result.fetchone()
        return float(row[0]) if row else None
    except Exception as e:
        logger.error(f"Erro ao consultar price_floor: {e}")
        return None
    finally:
        close_pricing_session(db)


def get_customer_brand_profile(org_id: int, customer_id: int, brand_id: int) -> Optional[Dict[str, Any]]:
    """Consulta perfil de marca do cliente"""
    db = get_pricing_session()
    try:
        result = db.execute(
            text("""
                SELECT 
                    customer_brand_role,
                    volume_tier,
                    payment_term_profile
                FROM customer_brand_profile
                WHERE org_id = :org_id 
                  AND customer_id = :customer_id
                  AND brand_id = :brand_id
                LIMIT 1
            """),
            {"org_id": org_id, "customer_id": customer_id, "brand_id": brand_id}
        )
        row = result.fetchone()
        if row:
            return {
                "customer_brand_role": row[0],  # anchor, strategic, motor, etc.
                "volume_tier": row[1],
                "payment_term_profile": row[2]
            }
        return None
    except Exception as e:
        logger.error(f"Erro ao consultar customer_brand_profile: {e}")
        return None
    finally:
        close_pricing_session(db)


def get_brand_role(brand_id: int) -> Optional[str]:
    """Consulta role da marca (principal, secondary_target, tertiary_flexible)"""
    db = get_pricing_session()
    try:
        result = db.execute(
            text("""
                SELECT brand_role
                FROM brands
                WHERE brand_id = :brand_id
                LIMIT 1
            """),
            {"brand_id": brand_id}
        )
        row = result.fetchone()
        return row[0] if row else None
    except Exception as e:
        logger.error(f"Erro ao consultar brand_role: {e}")
        return None
    finally:
        close_pricing_session(db)


def get_anchor_price(org_id: int, sku_id: int, policy_version: str = "v1.0") -> Optional[Dict[str, Any]]:
    """Consulta Preço Âncora para Cliente Âncora"""
    db = get_pricing_session()
    try:
        result = db.execute(
            text("""
                SELECT 
                    anchor_price,
                    screen_price_pt,
                    floor_price,
                    elasticity_used,
                    discount_pct
                FROM pricing_anchor_prices
                WHERE org_id = :org_id 
                  AND sku_id = :sku_id
                  AND policy_version = :policy_version
                LIMIT 1
            """),
            {"org_id": org_id, "sku_id": sku_id, "policy_version": policy_version}
        )
        row = result.fetchone()
        if row:
            return {
                "anchor_price": float(row[0]),
                "screen_price_pt": float(row[1]),
                "floor_price": float(row[2]),
                "elasticity_used": row[3],
                "discount_pct": float(row[4])
            }
        return None
    except Exception as e:
        logger.error(f"Erro ao consultar pricing_anchor_prices: {e}")
        return None
    finally:
        close_pricing_session(db)

Checklist Fase 1:
- [ ] Criar agents/pricing/database.py
- [ ] Criar agents/pricing/repository.py
- [ ] Adicionar variáveis de ambiente para conexão
- [ ] Testar conexão com schema csuite_pricing
- [ ] Validar consultas básicas (PT, Piso, Brand Role)


Fase 2: Suporte a Cliente Âncora (Prioridade: ALTA)

Objetivo: Implementar lógica de Cliente Âncora conforme especificação.

2.1. Módulo de Cliente Âncora

Arquivo: agents/pricing/anchor.py (NOVO)

"""
CSuite Pricing Agent - Cliente Âncora Handler
"""
from typing import Dict, Any, Optional
import logging

from .repository import get_customer_brand_profile, get_anchor_price

logger = logging.getLogger(__name__)


def is_anchor_customer(org_id: int, customer_id: int, brand_id: int) -> bool:
    """
    Verifica se cliente é Âncora para a marca

    Regra: customer_brand_role == 'anchor'
    """
    profile = get_customer_brand_profile(org_id, customer_id, brand_id)
    if not profile:
        return False

    return profile.get("customer_brand_role") == "anchor"


def get_anchor_price_for_sku(
    org_id: int,
    customer_id: int,
    brand_id: int,
    sku_id: int,
    policy_version: str = "v1.0"
) -> Optional[Dict[str, Any]]:
    """
    Retorna Preço Âncora para Cliente Âncora

    Regra de Prioridade (conforme especificação):
    IF customer_type == 'anchor' THEN
        price = pricing_anchor_prices[sku_id, policy_version]
        EXIT
    END IF
    """
    if not is_anchor_customer(org_id, customer_id, brand_id):
        return None

    anchor_data = get_anchor_price(org_id, sku_id, policy_version)

    if not anchor_data:
        logger.warning(
            f"Cliente {customer_id} é Âncora mas não há Preço Âncora "
            f"para SKU {sku_id} (policy_version: {policy_version})"
        )
        return None

    return {
        "price": anchor_data["anchor_price"],
        "screen_price_pt": anchor_data["screen_price_pt"],
        "floor_price": anchor_data["floor_price"],
        "elasticity_used": anchor_data["elasticity_used"],
        "discount_pct": anchor_data["discount_pct"],
        "mode": "ANCHOR_TABLE",
        "reason": "Preço Âncora derivado da política vigente"
    }

2.2. Integração no Context Builder

Modificar: agents/pricing/context.py

# Adicionar no início
from .anchor import is_anchor_customer, get_anchor_price_for_sku
from .repository import get_price_screen, get_price_floor, get_brand_role, get_customer_brand_profile

def build_context(payload: Dict[str, Any]) -> Dict[str, Any]:
    """
    Contexto enriquecido com dados do schema csuite_pricing

    Agora inclui:
    - PT (Preço de Tela)
    - Piso Técnico
    - Brand Role
    - Customer Brand Profile
    - Cliente Âncora (flag)
    """
    org_id = payload.get("org_id", 0)
    brand_id = payload.get("brand_id")
    customer_id = payload.get("customer_id")
    sku_id = payload.get("sku_id")

    # Consultas ao schema
    pt = get_price_screen(org_id, sku_id) if sku_id else None
    piso = get_price_floor(org_id, sku_id) if sku_id else None
    brand_role = get_brand_role(brand_id) if brand_id else None
    customer_profile = None

    if org_id and customer_id and brand_id:
        customer_profile = get_customer_brand_profile(org_id, customer_id, brand_id)

    is_anchor = is_anchor_customer(org_id, customer_id, brand_id) if (org_id and customer_id and brand_id) else False

    # Corredor de Decisão
    decision_corridor = None
    if pt and piso:
        decision_corridor = pt - piso

    return {
        "org_id": org_id,
        "brand_id": brand_id,
        "customer_id": customer_id,
        "sku_id": sku_id,
        "price_screen_pt": pt,
        "price_floor": piso,
        "decision_corridor": decision_corridor,
        "brand_role": brand_role,
        "customer_profile": customer_profile,
        "is_anchor_customer": is_anchor,
        "sku_qty": payload.get("sku_qty", 1),
        "order_value": payload.get("order_value", 0),
        "payment_term": payload.get("payment_term", "standard"),
        "stock_level": payload.get("stock_level", "normal"),
        "machine_curve": payload.get("machine_curve", "B")
    }

Checklist Fase 2:
- [ ] Criar agents/pricing/anchor.py
- [ ] Modificar context.py para enriquecer contexto
- [ ] Testar detecção de Cliente Âncora
- [ ] Testar consulta de Preço Âncora
- [ ] Validar fluxo de saída antecipada (EXIT) para Âncora


Fase 3: Integração com Motor SQL (Prioridade: CRÍTICA)

Objetivo: Substituir lógica Python por chamada a sp_price_compute_v8.

3.1. Módulo do Motor de Preços

Arquivo: agents/pricing/engine.py (NOVO)

"""
CSuite Pricing Agent - Pricing Engine (Motor SQL)
"""
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import Dict, Any, Optional
import logging

from .database import get_pricing_session, close_pricing_session

logger = logging.getLogger(__name__)


def compute_price_v8(
    org_id: int,
    brand_id: int,
    customer_id: Optional[int],
    sku_id: int,
    sku_qty: int,
    order_value: float,
    payment_term: str,  # standard | short | extended
    stock_level: str,   # normal | high
    machine_curve: str  # A | B | C
) -> Dict[str, Any]:
    """
    Chama stored procedure sp_price_compute_v8

    Retorna:
    {
        "status": "OK" | "INCIDENT",
        "reason": "...",
        "final_price": float,
        "discount_allowed": float,
        "applied_mode": "ANCHOR_TABLE" | "CORRIDOR_PRICE",
        "elasticity": "none" | "low" | "medium" | "high" | "max",
        "screen_price_pt": float,
        "floor_price": float
    }
    """
    db = get_pricing_session()

    try:
        result = db.execute(
            text("""
                CALL sp_price_compute_v8(
                    :p_brand_id,
                    :p_customer_id,
                    :p_sku_id,
                    :p_sku_qty,
                    :p_order_value,
                    :p_payment_term,
                    :p_stock_level,
                    :p_machine_curve,
                    @p_status,
                    @p_reason,
                    @p_final_price,
                    @p_discount_allowed,
                    @p_applied_mode,
                    @p_elasticity,
                    @p_screen_price_pt,
                    @p_floor_price
                )
            """),
            {
                "p_brand_id": brand_id,
                "p_customer_id": customer_id,
                "p_sku_id": sku_id,
                "p_sku_qty": sku_qty,
                "p_order_value": order_value,
                "p_payment_term": payment_term,
                "p_stock_level": stock_level,
                "p_machine_curve": machine_curve
            }
        )

        # Buscar variáveis de saída
        output = db.execute(text("""
            SELECT 
                @p_status AS status,
                @p_reason AS reason,
                @p_final_price AS final_price,
                @p_discount_allowed AS discount_allowed,
                @p_applied_mode AS applied_mode,
                @p_elasticity AS elasticity,
                @p_screen_price_pt AS screen_price_pt,
                @p_floor_price AS floor_price
        """))

        row = output.fetchone()

        if not row:
            raise ValueError("Stored procedure não retornou resultado")

        return {
            "status": row[0],
            "reason": row[1],
            "final_price": float(row[2]) if row[2] else None,
            "discount_allowed": float(row[3]) if row[3] else None,
            "applied_mode": row[4],
            "elasticity": row[5],
            "screen_price_pt": float(row[6]) if row[6] else None,
            "floor_price": float(row[7]) if row[7] else None
        }

    except Exception as e:
        logger.error(f"Erro ao executar sp_price_compute_v8: {e}", exc_info=True)
        raise
    finally:
        close_pricing_session(db)

3.2. Modificar Decision Engine

Modificar: agents/pricing/decision.py

# Substituir _determine_decision_type, _calculate_confidence, _propose_actions
# por chamada ao motor SQL

from .anchor import get_anchor_price_for_sku
from .engine import compute_price_v8

async def run_decision(
    payload: Dict[str, Any],
    db: Optional[Session] = None
) -> Dict[str, Any]:
    org_id = payload.get("org_id", 0)
    agent_code = "CSuite.Pricing.Agent"

    context = build_context(payload)
    context_hash = get_context_hash(context)

    # 1. Verificar Cliente Âncora (prioridade)
    if context.get("is_anchor_customer"):
        anchor_result = get_anchor_price_for_sku(
            org_id=org_id,
            customer_id=context.get("customer_id"),
            brand_id=context.get("brand_id"),
            sku_id=context.get("sku_id")
        )

        if anchor_result:
            decision = {
                "decision_type": "PRICING.ANCHOR",
                "confidence": 1.0,  # Máxima confiança para Âncora
                "final_price": anchor_result["price"],
                "applied_mode": "ANCHOR_TABLE",
                "elasticity": anchor_result["elasticity_used"],
                "reason": anchor_result["reason"],
                "proposed_actions": [{
                    "type": "APPLY_ANCHOR_PRICE",
                    "price": anchor_result["price"],
                    "mode": "ANCHOR_TABLE"
                }]
            }

            # Registrar em price_calculations
            _log_price_calculation(context, decision, db)

            # Instrumentação Policy Engine
            decision_log_id = _instrument_decision(decision, context, agent_code, org_id, db)

            return {
                "decision": decision,
                "context": context,
                "execution": {"status": "EXECUTED", "actions": decision["proposed_actions"]},
                "decision_log_id": decision_log_id
            }

    # 2. Cliente não é Âncora → usar motor SQL
    try:
        engine_result = compute_price_v8(
            org_id=org_id,
            brand_id=context.get("brand_id"),
            customer_id=context.get("customer_id"),
            sku_id=context.get("sku_id"),
            sku_qty=context.get("sku_qty", 1),
            order_value=context.get("order_value", 0),
            payment_term=context.get("payment_term", "standard"),
            stock_level=context.get("stock_level", "normal"),
            machine_curve=context.get("machine_curve", "B")
        )

        # 3. Verificar incidente (PT ≤ Piso)
        if engine_result["status"] == "INCIDENT":
            _log_price_incident(context, engine_result, db)
            decision = {
                "decision_type": "PRICING.INCIDENT",
                "confidence": 0.0,
                "reason": engine_result["reason"],
                "proposed_actions": [{
                    "type": "BLOCK_PRICE",
                    "reason": engine_result["reason"]
                }]
            }
        else:
            decision = {
                "decision_type": "PRICING.COMPUTED",
                "confidence": 0.9,  # Alta confiança (motor SQL validado)
                "final_price": engine_result["final_price"],
                "discount_allowed": engine_result["discount_allowed"],
                "applied_mode": engine_result["applied_mode"],
                "elasticity": engine_result["elasticity"],
                "proposed_actions": [{
                    "type": "UPDATE_PRICE",
                    "new_price": engine_result["final_price"],
                    "discount_pct": engine_result["discount_allowed"] * 100,
                    "elasticity": engine_result["elasticity"]
                }]
            }

        # 4. Registrar cálculo
        _log_price_calculation(context, decision, engine_result, db)

        # 5. Instrumentação Policy Engine
        decision_log_id = _instrument_decision(decision, context, agent_code, org_id, db)

        return {
            "decision": decision,
            "context": context,
            "execution": {"status": "EXECUTED", "actions": decision["proposed_actions"]},
            "decision_log_id": decision_log_id
        }

    except Exception as e:
        logger.error(f"Erro ao calcular preço: {e}", exc_info=True)
        raise

Checklist Fase 3:
- [ ] Criar agents/pricing/engine.py
- [ ] Modificar decision.py para usar motor SQL
- [ ] Testar chamada a sp_price_compute_v8
- [ ] Validar tratamento de incidentes (PT ≤ Piso)
- [ ] Validar fluxo completo (Âncora → Motor → Validação)


Fase 4: Auditoria e Logging (Prioridade: MÉDIA)

Objetivo: Registrar todas as decisões em price_calculations e price_incidents.

4.1. Módulo de Auditoria

Arquivo: agents/pricing/audit.py (NOVO)

"""
CSuite Pricing Agent - Audit Logging
"""
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import Dict, Any, Optional
import logging
from datetime import datetime

from .database import get_pricing_session, close_pricing_session

logger = logging.getLogger(__name__)


def log_price_calculation(
    context: Dict[str, Any],
    decision: Dict[str, Any],
    engine_result: Optional[Dict[str, Any]] = None,
    db: Optional[Session] = None
) -> Optional[int]:
    """
    Registra cálculo em price_calculations

    Retorna calc_id ou None em caso de erro
    """
    use_external_db = db is not None
    if not use_external_db:
        db = get_pricing_session()

    try:
        result = db.execute(
            text("""
                INSERT INTO price_calculations (
                    brand_id, customer_id, sku_id,
                    PT, PISO,
                    stock_level, customer_brand_role, payment_term, machine_curve,
                    sku_qty, order_value,
                    applied_mode, elasticity,
                    score_final, discount_allowed, final_price
                ) VALUES (
                    :brand_id, :customer_id, :sku_id,
                    :pt, :piso,
                    :stock_level, :customer_brand_role, :payment_term, :machine_curve,
                    :sku_qty, :order_value,
                    :applied_mode, :elasticity,
                    :score_final, :discount_allowed, :final_price
                )
            """),
            {
                "brand_id": context.get("brand_id"),
                "customer_id": context.get("customer_id"),
                "sku_id": context.get("sku_id"),
                "pt": context.get("price_screen_pt"),
                "piso": context.get("price_floor"),
                "stock_level": context.get("stock_level", "normal"),
                "customer_brand_role": context.get("customer_profile", {}).get("customer_brand_role", "operational"),
                "payment_term": context.get("payment_term", "standard"),
                "machine_curve": context.get("machine_curve", "B"),
                "sku_qty": context.get("sku_qty", 1),
                "order_value": context.get("order_value", 0),
                "applied_mode": decision.get("applied_mode", "CORRIDOR_PRICE"),
                "elasticity": decision.get("elasticity"),
                "score_final": decision.get("confidence", 0.0) * 100,
                "discount_allowed": decision.get("discount_allowed", 0.0),
                "final_price": decision.get("final_price")
            }
        )

        if not use_external_db:
            db.commit()

        calc_id = result.lastrowid
        logger.info(f"Registrado price_calculation: calc_id={calc_id}")
        return calc_id

    except Exception as e:
        logger.error(f"Erro ao registrar price_calculation: {e}", exc_info=True)
        if not use_external_db:
            db.rollback()
        return None
    finally:
        if not use_external_db:
            close_pricing_session(db)


def log_price_incident(
    context: Dict[str, Any],
    engine_result: Dict[str, Any],
    db: Optional[Session] = None
) -> Optional[int]:
    """
    Registra incidente em price_incidents

    Retorna incident_id ou None em caso de erro
    """
    use_external_db = db is not None
    if not use_external_db:
        db = get_pricing_session()

    try:
        result = db.execute(
            text("""
                INSERT INTO price_incidents (
                    brand_id, brand_role, customer_id, sku_id,
                    reason, details,
                    PT, PISO
                ) VALUES (
                    :brand_id, :brand_role, :customer_id, :sku_id,
                    :reason, :details,
                    :pt, :piso
                )
            """),
            {
                "brand_id": context.get("brand_id"),
                "brand_role": context.get("brand_role"),
                "customer_id": context.get("customer_id"),
                "sku_id": context.get("sku_id"),
                "reason": engine_result.get("reason", "PT_LEQ_PISO"),
                "details": f"PT ({context.get('price_screen_pt')}) <= Piso ({context.get('price_floor')})",
                "pt": context.get("price_screen_pt"),
                "piso": context.get("price_floor")
            }
        )

        if not use_external_db:
            db.commit()

        incident_id = result.lastrowid
        logger.warning(f"Registrado price_incident: incident_id={incident_id}, reason={engine_result.get('reason')}")
        return incident_id

    except Exception as e:
        logger.error(f"Erro ao registrar price_incident: {e}", exc_info=True)
        if not use_external_db:
            db.rollback()
        return None
    finally:
        if not use_external_db:
            close_pricing_session(db)

Modificar: agents/pricing/decision.py

# Adicionar no início
from .audit import log_price_calculation, log_price_incident

# Substituir _log_price_calculation e _log_price_incident por chamadas reais

Checklist Fase 4:
- [ ] Criar agents/pricing/audit.py
- [ ] Integrar logging em decision.py
- [ ] Testar registro em price_calculations
- [ ] Testar registro em price_incidents
- [ ] Validar auditoria completa


Fase 5: Validação e Políticas (Prioridade: MÉDIA)

Objetivo: Alinhar validações com especificação CSuite Pricing.

5.1. Modificar Policies

Modificar: agents/pricing/policies.py

"""
CSuite Pricing Agent - Policy Guardrails (Alinhado com CSuite Pricing)
"""
from typing import Dict, Any

def validate_decision(decision: Dict[str, Any], context: Dict[str, Any]) -> bool:
    """
    Valida decisão contra policies do CSuite Pricing

    Policies:
    - PT > Piso (obrigatório, senão gera incidente)
    - Margem mínima respeitada
    - Elasticidade dentro do corredor permitido
    - Cliente Âncora não recebe desconto elástico
    """
    # 1. Validação de PT e Piso
    pt = context.get("price_screen_pt")
    piso = context.get("price_floor")

    if pt and piso and pt <= piso:
        return False  # Gera incidente (já tratado no engine)

    # 2. Validação de Cliente Âncora
    if context.get("is_anchor_customer"):
        # Cliente Âncora só pode usar ANCHOR_TABLE
        if decision.get("applied_mode") != "ANCHOR_TABLE":
            return False

        # Elasticidade deve ser none ou low
        elasticity = decision.get("elasticity")
        if elasticity not in ["none", "low"]:
            return False

    # 3. Validação de Marca Principal
    brand_role = context.get("brand_role")
    if brand_role == "principal":
        # Marca Principal: elasticidade máxima LOW
        elasticity = decision.get("elasticity")
        if elasticity and elasticity not in ["none", "low"]:
            return False

    # 4. Validação de margem (se final_price disponível)
    final_price = decision.get("final_price")
    if final_price:
        # TODO: Calcular margem real e validar contra piso
        pass

    return True


def get_autonomy_level(context: Dict[str, Any]) -> str:
    """
    Determina nível de autonomia baseado em contexto CSuite Pricing

    L0: Recomendação (Cliente Âncora, Marca Principal)
    L1: Ajustes pequenos (elasticidade LOW)
    L2: Ajustes médios (elasticidade MEDIUM)
    L3: Ajustes agressivos (elasticidade HIGH/MAX)
    """
    # Cliente Âncora: sempre L0
    if context.get("is_anchor_customer"):
        return "L0"

    # Marca Principal: máximo L1
    if context.get("brand_role") == "principal":
        return "L1"

    # Elasticidade determina autonomia
    elasticity = context.get("elasticity")
    if elasticity == "none":
        return "L0"
    elif elasticity == "low":
        return "L1"
    elif elasticity == "medium":
        return "L2"
    elif elasticity in ["high", "max"]:
        return "L3"

    return "L0"  # Padrão conservador

Checklist Fase 5:
- [ ] Modificar policies.py com validações CSuite Pricing
- [ ] Testar validação de Cliente Âncora
- [ ] Testar validação de Marca Principal
- [ ] Testar cálculo de autonomia
- [ ] Validar todas as regras de negócio


Fase 6: Atualização de Payload e API (Prioridade: BAIXA)

Objetivo: Atualizar payload e documentação da API.

6.1. Modificar Main API

Modificar: agents/pricing/main.py

@app.post("/run")
async def run_agent(payload: Dict[str, Any]):
    """
    Executa o Pricing Agent (CSuite Pricing Engine)

    Payload esperado (CSuite Pricing):
    {
        "org_id": 1,
        "brand_id": 1,
        "customer_id": 123,
        "sku_id": 456,
        "sku_qty": 5,
        "order_value": 50000,
        "payment_term": "standard",  # standard | short | extended
        "stock_level": "normal",     # normal | high
        "machine_curve": "A"         # A | B | C
    }

    Payload legado (compatibilidade):
    {
        "org_id": 0,
        "sku": {"id": "...", "cost": 8000, "margin": 20},
        "inventory": {"days_on_hand": 45, "turn_rate": 0.5},
        "market": {"elasticity": -1.2, "competitor_price": 10000},
        "policy": {"floor_margin": 18, "max_discount": 5}
    }
    """
    try:
        # Validar payload mínimo
        if not payload.get("org_id"):
            raise HTTPException(status_code=400, detail="org_id é obrigatório")

        # Detectar formato (novo vs legado)
        is_legacy = "sku" in payload and "inventory" in payload

        if is_legacy:
            # Converter payload legado para novo formato
            # TODO: Implementar conversão
            raise HTTPException(
                status_code=400,
                detail="Payload legado não suportado. Use formato CSuite Pricing."
            )

        # Validar campos obrigatórios (novo formato)
        required_fields = ["brand_id", "sku_id", "customer_id"]
        missing = [f for f in required_fields if not payload.get(f)]
        if missing:
            raise HTTPException(
                status_code=400,
                detail=f"Campos obrigatórios faltando: {', '.join(missing)}"
            )

        result = await run_decision(payload)

        return JSONResponse(content={
            "status": "success",
            "agent": AGENT_CODE,
            "result": result
        })

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Erro ao executar Pricing Agent: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=str(e))

Checklist Fase 6:
- [ ] Atualizar documentação da API
- [ ] Validar payload novo formato
- [ ] Adicionar validações de campos obrigatórios
- [ ] Atualizar exemplos de uso
- [ ] Testar compatibilidade (se necessário)


🧪 Plano de Testes

Testes Unitários

  1. Repository Tests
  2. test_get_price_screen() - Consulta PT
  3. test_get_price_floor() - Consulta Piso
  4. test_get_customer_brand_profile() - Perfil cliente
  5. test_get_anchor_price() - Preço Âncora

  6. Anchor Tests

  7. test_is_anchor_customer() - Detecção Âncora
  8. test_get_anchor_price_for_sku() - Retorno Preço Âncora
  9. test_anchor_exit_flow() - Fluxo de saída antecipada

  10. Engine Tests

  11. test_compute_price_v8() - Chamada stored procedure
  12. test_incident_pt_leq_piso() - Tratamento incidente
  13. test_elasticity_mapping() - Mapeamento elasticidade

  14. Policy Tests

  15. test_validate_anchor_customer() - Validação Âncora
  16. test_validate_principal_brand() - Validação Marca Principal
  17. test_autonomy_levels() - Níveis de autonomia

Testes de Integração

  1. Fluxo Completo
  2. Cliente Âncora → Preço Âncora
  3. Cliente Normal → Motor SQL → Preço Calculado
  4. PT ≤ Piso → Incidente registrado

  5. Auditoria

  6. Registro em price_calculations
  7. Registro em price_incidents
  8. Integração com Policy Engine

Testes End-to-End

  1. Cenários Reais
  2. Pedido Cliente Âncora
  3. Pedido Cliente Motor (elasticidade MEDIUM)
  4. Pedido Marca Principal (elasticidade LOW)
  5. Incidente PT ≤ Piso

📝 Checklist Geral de Implementação

Infraestrutura

Testes

Documentação

Deploy


🚀 Ordem de Execução Recomendada

  1. Semana 1: Fases 1 e 2 (Infraestrutura + Cliente Âncora)
  2. Semana 2: Fase 3 (Motor SQL) - CRÍTICA
  3. Semana 3: Fases 4 e 5 (Auditoria + Validações)
  4. Semana 4: Fase 6 (API) + Testes + Documentação

📚 Referências


⚠️ Riscos e Mitigações

Risco Mitigação
Stored procedure sp_price_compute_v8 não existe Verificar schema, usar sp_price_compute_v7 como fallback
Performance de consultas SQL Adicionar índices, cache de consultas frequentes
Incompatibilidade de payload Manter compatibilidade temporária, migração gradual
Falha na conexão com csuite_pricing Retry logic, fallback para lógica Python (modo degradado)

Próximo Passo: Iniciar Fase 1 (Infraestrutura e Conexão)

🔊 Text-to-Speech

1.0x
1.0
Pronto para reproduzir