Validação e Sanitização de Inputs - C-Suite
Visão Geral
Este documento descreve o padrão de validação e sanitização de inputs para prevenir vulnerabilidades como SQL injection, XSS, e outras ameaças de segurança.
Módulo de Validação
Localização
O módulo principal está em: common_validation.py (raiz do projeto)
Características
- Proteção SQL Injection: Validação de nomes de colunas e tabelas
- Sanitização de Strings: Remoção de caracteres perigosos
- Validação de Tipos: Validação de IDs, emails, telefones, etc.
- Pydantic Models: Models prontos para paginação, ordenação, busca
- Whitelist: Validação contra listas permitidas
Uso Básico
1. Validação de Colunas SQL
from common_validation import validate_sql_column
# Obter colunas válidas da tabela
allowed_columns = ["customer_id", "name", "email", "created_at"]
# Validar coluna antes de usar em SQL
sort_column = validate_sql_column(request.sort_column, allowed_columns)
# Agora seguro para usar em query
query = f"SELECT * FROM customers ORDER BY {sort_column} ASC"
2. Sanitização de Termos de Busca
from common_validation import sanitize_search_term
# Sanitizar termo de busca para SQL LIKE
search_term = sanitize_search_term(user_input)
# Usar em query (já escapado)
query = f"SELECT * FROM customers WHERE name LIKE '%{search_term}%'"
3. Validação de IDs
from common_validation import validate_id
# Validar ID de organização
org_id = validate_id(request.org_id, min_value=1)
# Validar ID de cliente
customer_id = validate_id(request.customer_id, min_value=1, max_value=999999)
4. Validação de Emails e Telefones
from common_validation import validate_email, validate_phone
# Validar email
email = validate_email(request.email)
# Validar telefone
phone = validate_phone(request.phone)
5. Usando Models Pydantic
from common_validation import PaginationParams, SortParams, SearchParams
from fastapi import Query
@app.get("/api/customers")
async def list_customers(
pagination: PaginationParams = Depends(),
sort: SortParams = Depends(),
search: SearchParams = Depends()
):
# Parâmetros já validados
limit = pagination.limit
offset = pagination.offset
sort_column = sort.sort_column
sort_direction = sort.sort_direction
search_term = search.search
# Construir query SQL segura
query = build_safe_query(limit, offset, sort_column, sort_direction, search_term)
return execute_query(query)
Proteção contra SQL Injection
1. Validação de Nomes de Colunas
❌ ERRADO:
# VULNERÁVEL A SQL INJECTION!
sort_column = request.sort_column
query = f"SELECT * FROM customers ORDER BY {sort_column}"
✅ CORRETO:
from common_validation import validate_sql_column
allowed_columns = ["customer_id", "name", "email", "created_at"]
sort_column = validate_sql_column(request.sort_column, allowed_columns)
query = f"SELECT * FROM customers ORDER BY {sort_column}"
2. Validação de Nomes de Tabelas
from common_validation import validate_sql_table
# Validar nome de tabela com prefixo
table_name = validate_sql_table(request.table_name, prefix="core_")
# Ou validar contra whitelist
allowed_tables = ["core_customers", "core_orders", "core_interactions"]
table_name = validate_sql_table(request.table_name, allowed_tables=allowed_tables)
3. Sanitização de Termos de Busca
from common_validation import sanitize_search_term
# Sanitizar e escapar para SQL LIKE
search_term = sanitize_search_term(user_input)
# Usar com parameterized query (ainda mais seguro)
query = "SELECT * FROM customers WHERE name LIKE :search"
params = {"search": f"%{search_term}%"}
4. Parameterized Queries (Recomendado)
Sempre use parameterized queries quando possível:
from sqlalchemy import text
# ✅ SEGURO: Parameterized query
query = text("SELECT * FROM customers WHERE customer_id = :id AND name LIKE :search")
result = db.execute(query, {"id": customer_id, "search": f"%{search_term}%"})
Validação de Inputs Comuns
IDs
from common_validation import validate_id
# Validar ID básico
org_id = validate_id(request.org_id)
# Validar ID com limites
customer_id = validate_id(request.customer_id, min_value=1, max_value=999999)
Emails
from common_validation import validate_email
try:
email = validate_email(request.email)
except ValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
Telefones
from common_validation import validate_phone
phone = validate_phone(request.phone) # Retorna apenas dígitos
Paginação
from common_validation import validate_pagination, PaginationParams
# Validação manual
limit, offset = validate_pagination(request.limit, request.offset, max_limit=1000)
# Ou usando Pydantic model
pagination = PaginationParams(limit=request.limit, offset=request.offset)
Ordenação
from common_validation import validate_sort_direction, SortParams
# Validação manual
direction = validate_sort_direction(request.sort_direction)
# Ou usando Pydantic model
sort = SortParams(sort_column=request.sort_column, sort_direction=request.sort_direction)
Exemplo Completo: Endpoint com Busca e Ordenação
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import text
from common_validation import (
validate_sql_table,
validate_sql_column,
sanitize_search_term,
PaginationParams,
SortParams,
SearchParams,
ValidationError
)
app = FastAPI()
@app.get("/api/admin/tables/{table_name}/data")
async def get_table_data(
table_name: str,
pagination: PaginationParams = Depends(),
sort: SortParams = Depends(),
search: SearchParams = Depends(),
db: Session = Depends(get_db)
):
try:
# 1. Validar nome da tabela
table_name = validate_sql_table(table_name, prefix="core_")
# 2. Obter colunas válidas
cols_query = text("""
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'core' AND TABLE_NAME = :table_name
ORDER BY ORDINAL_POSITION
""")
cols_result = db.execute(cols_query, {"table_name": table_name})
allowed_columns = [row[0] for row in cols_result]
# 3. Validar coluna de ordenação
sort_column = None
if sort.sort_column:
sort_column = validate_sql_column(sort.sort_column, allowed_columns)
# 4. Sanitizar termo de busca
search_term = sanitize_search_term(search.search) if search.search else None
# 5. Construir query SQL segura
query_parts = [f"SELECT * FROM {table_name}"]
params = {}
# WHERE clause (busca)
if search_term:
search_conditions = []
for i, col in enumerate(allowed_columns):
param_name = f"search_{i}"
search_conditions.append(f"`{col}` LIKE :{param_name}")
params[param_name] = f"%{search_term}%"
query_parts.append("WHERE (" + " OR ".join(search_conditions) + ")")
# ORDER BY clause
if sort_column:
query_parts.append(f"ORDER BY `{sort_column}` {sort.sort_direction.upper()}")
# LIMIT/OFFSET
query_parts.append("LIMIT :limit OFFSET :offset")
params.update({
"limit": pagination.limit,
"offset": pagination.offset
})
# 6. Executar query
query = text(" ".join(query_parts))
result = db.execute(query, params)
return {"data": [dict(row) for row in result]}
except ValidationError as e:
raise HTTPException(status_code=400, detail=str(e))
Boas Práticas
- Sempre valide inputs: Nunca confie em dados do usuário
- Use whitelist: Valide contra listas permitidas, não blacklist
- Parameterized queries: Use sempre que possível
- Sanitize antes de usar: Sanitize termos de busca antes de usar em SQL
- Valide tipos: Sempre valide tipos antes de usar
- Limite comprimentos: Limite comprimento de strings
- Use Pydantic: Use models Pydantic para validação automática
Vulnerabilidades Prevenidas
SQL Injection
- ✅ Validação de nomes de colunas/tabelas
- ✅ Sanitização de termos de busca
- ✅ Parameterized queries
XSS (Cross-Site Scripting)
- ✅ Sanitização de HTML
- ✅ Escape de caracteres especiais
Command Injection
- ✅ Validação de tipos
- ✅ Sanitização de strings
Path Traversal
- ✅ Validação de nomes de arquivos/tabelas
- ✅ Prefixos obrigatórios
Troubleshooting
ValidationError levantada
- Verifique se input está no formato esperado
- Verifique se está na whitelist (se aplicável)
- Verifique limites (comprimento, valores)
Query ainda vulnerável
- Certifique-se de usar parameterized queries
- Certifique-se de validar todos os inputs
- Certifique-se de sanitizar termos de busca
Próximos Passos
- ✅ Módulo de validação criado
- ⏳ Migrar endpoints existentes para usar
common_validation - ⏳ Adicionar validação em todos os endpoints públicos
- ⏳ Implementar testes de segurança
- ⏳ Revisar queries SQL existentes