Autenticação e Autorização - C-Suite
Visão Geral
Este documento descreve o sistema de autenticação e autorização padronizado para o ecossistema C-Suite. O objetivo é fornecer segurança consistente em todos os apps usando JWT (JSON Web Tokens) e RBAC (Role-Based Access Control).
Módulo de Autenticação
Localização
O módulo principal está em: common_auth.py (raiz do projeto)
Características
- JWT Tokens: Tokens de acesso e refresh
- RBAC: Role-Based Access Control
- Middleware FastAPI: Integração nativa com FastAPI
- Validação automática: Validação de tokens em cada request
- Isolamento por organização: Verificação de acesso à organização
- Configuração via env vars: Fácil configuração
Uso Básico
1. Configuração
# Variáveis de ambiente
JWT_SECRET_KEY=your-secret-key-here # OBRIGATÓRIO em produção!
JWT_ALGORITHM=HS256
JWT_ACCESS_TOKEN_EXPIRE_MINUTES=30
JWT_REFRESH_TOKEN_EXPIRE_DAYS=7
JWT_ISSUER=csuite
JWT_AUDIENCE=csuite-api
2. Endpoint de Login
from fastapi import FastAPI, HTTPException
from common_auth import create_access_token, create_refresh_token, verify_password
from pydantic import BaseModel
app = FastAPI()
class LoginRequest(BaseModel):
email: str
password: str
@app.post("/auth/login")
async def login(credentials: LoginRequest):
# Verifica credenciais (exemplo - implementar lógica real)
user = get_user_by_email(credentials.email)
if not user or not verify_password(credentials.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Credenciais inválidas")
# Cria tokens
access_token = create_access_token(
user_id=str(user.id),
email=user.email,
org_id=user.org_id,
roles=user.roles
)
refresh_token = create_refresh_token(
user_id=str(user.id),
email=user.email
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
3. Proteger Endpoints
from common_auth import get_current_user, require_role, require_org_access
# Endpoint protegido (requer autenticação)
@app.get("/protected")
async def protected_endpoint(current_user: dict = Depends(get_current_user)):
return {
"message": "Acesso autorizado",
"user": current_user
}
# Endpoint com role específica
@app.post("/admin/users")
async def admin_endpoint(
current_user: dict = Depends(get_current_user),
_: None = Depends(require_role("admin"))
):
return {"message": "Apenas admins podem acessar"}
# Endpoint com verificação de organização
@app.get("/orgs/{org_id}/data")
async def org_data(
org_id: int,
current_user: dict = Depends(get_current_user),
_: None = Depends(require_org_access("org_id"))
):
return {"data": f"Dados da organização {org_id}"}
4. Refresh Token
from common_auth import decode_token, create_access_token
@app.post("/auth/refresh")
async def refresh_token(refresh_token: str):
# Valida refresh token
payload = decode_token(refresh_token)
# Verifica se é refresh token
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Token inválido")
# Cria novo access token
user = get_user_by_id(payload["sub"])
access_token = create_access_token(
user_id=str(user.id),
email=user.email,
org_id=user.org_id,
roles=user.roles
)
return {
"access_token": access_token,
"token_type": "bearer"
}
Roles e Permissões
Roles Padrão
- admin: Acesso total ao sistema
- org_admin: Administrador da organização
- user: Usuário comum
- viewer: Apenas leitura
Definindo Roles
# Ao criar token
access_token = create_access_token(
user_id="123",
email="user@example.com",
org_id=1,
roles=["org_admin", "user"] # Múltiplas roles
)
Verificando Roles
# Uma role específica
@app.post("/admin")
async def admin_only(
current_user: dict = Depends(get_current_user),
_: None = Depends(require_role("admin"))
):
return {"message": "Admin only"}
# Múltiplas roles (qualquer uma)
@app.post("/admin-or-org-admin")
async def admin_or_org_admin(
current_user: dict = Depends(get_current_user),
_: None = Depends(require_role("admin", "org_admin"))
):
return {"message": "Admin or org admin"}
Isolamento por Organização
Verificação Automática
@app.get("/orgs/{org_id}/metrics")
async def org_metrics(
org_id: int,
current_user: dict = Depends(get_current_user),
_: None = Depends(require_org_access("org_id"))
):
# Usuário só pode acessar dados da sua organização
# (exceto se for admin)
return get_metrics(org_id)
Verificação Manual
@app.get("/orgs/{org_id}/data")
async def org_data(
org_id: int,
current_user: dict = Depends(get_current_user)
):
user_org_id = current_user.get("org_id")
user_roles = current_user.get("roles", [])
# Verifica acesso
if user_org_id != org_id and "admin" not in user_roles:
raise HTTPException(status_code=403, detail="Acesso negado")
return get_data(org_id)
Estrutura do Token JWT
Access Token
{
"sub": "user_id",
"email": "user@example.com",
"org_id": 1,
"roles": ["org_admin", "user"],
"iat": 1234567890,
"exp": 1234571490,
"iss": "csuite",
"aud": "csuite-api"
}
Refresh Token
{
"sub": "user_id",
"email": "user@example.com",
"type": "refresh",
"iat": 1234567890,
"exp": 1235172690,
"iss": "csuite",
"aud": "csuite-api"
}
Segurança
Boas Práticas
- Use HTTPS: Sempre em produção
- Chave secreta forte: Use chave aleatória longa (32+ caracteres)
- TTL apropriado: Tokens de acesso curtos (30 min), refresh tokens longos (7 dias)
- Validação de audience: Verifica se token é para o app correto
- Validação de issuer: Verifica origem do token
- Não armazene senhas: Use hash (bcrypt)
- Revogação de tokens: Implemente blacklist para logout
Variáveis de Ambiente
# OBRIGATÓRIO em produção
JWT_SECRET_KEY=$(openssl rand -hex 32)
# Opcionais (com defaults)
JWT_ALGORITHM=HS256
JWT_ACCESS_TOKEN_EXPIRE_MINUTES=30
JWT_REFRESH_TOKEN_EXPIRE_DAYS=7
JWT_ISSUER=csuite
JWT_AUDIENCE=csuite-api
Integração com Apps
4c Decision API
from common_auth import get_current_user, require_role
@app.post("/decide")
async def decide(
req: DecideRequest,
current_user: dict = Depends(get_current_user)
):
# Usa org_id do token
org_id = current_user.get("org_id") or req.org_id
return await _decide_core(req.customer_id, req.candidates, org_id=org_id)
csuite-executive
from common_auth import get_current_user, require_role, require_org_access
@app.get("/policies/{org_id}")
async def list_policies(
org_id: int,
current_user: dict = Depends(get_current_user),
_: None = Depends(require_org_access("org_id"))
):
return get_policies(org_id)
4c-suite
from common_auth import get_current_user
@app.get("/organismo/{org_id}/status")
async def organismo_status(
org_id: int,
current_user: dict = Depends(get_current_user),
_: None = Depends(require_org_access("org_id"))
):
return get_organism_status(org_id)
Logout e Revogação
Implementação Básica
from common_cache import get_cache
# Blacklist de tokens (usando cache)
cache = get_cache()
@app.post("/auth/logout")
async def logout(
current_user: dict = Depends(get_current_user),
credentials: HTTPAuthorizationCredentials = Depends(security)
):
token = credentials.credentials
payload = decode_token(token)
# Adiciona token à blacklist (expira quando token expirar)
exp = payload.get("exp", 0)
ttl = exp - int(datetime.utcnow().timestamp())
if ttl > 0:
cache.set(f"blacklist:{token}", True, ttl=ttl)
return {"message": "Logout realizado"}
# Middleware para verificar blacklist
async def check_blacklist(request: Request, call_next):
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
cache = get_cache()
if cache.exists(f"blacklist:{token}"):
raise HTTPException(status_code=401, detail="Token revogado")
return await call_next(request)
Troubleshooting
Token inválido
- Verifique se
JWT_SECRET_KEYestá configurado - Verifique se token não expirou
- Verifique se issuer e audience estão corretos
Acesso negado
- Verifique se usuário tem role necessária
- Verifique se usuário tem acesso à organização
- Verifique se token contém claims necessárias
Token expirado
- Use refresh token para obter novo access token
- Aumente
JWT_ACCESS_TOKEN_EXPIRE_MINUTESse necessário - Implemente refresh automático no cliente
Próximos Passos
- ✅ Módulo de autenticação criado
- ⏳ Implementar endpoints de login/registro em cada app
- ⏳ Migrar apps existentes para usar
common_auth - ⏳ Implementar blacklist de tokens
- ⏳ Adicionar MFA (Multi-Factor Authentication)
- ⏳ Implementar rate limiting por usuário
- ⏳ Adicionar audit log de autenticação