Banco de dados¶
PostgreSQL é a única persistência local. Nenhum dado clínico fica aqui (tudo vai para o OTIMUS). O DB serve para: fila de mensagens, histórico do agente e telemetria.
Schema completo¶
-- init.sql — trocar 'projeto' pelo nome canônico (medcenter, clinfeto, ...)
-- Fila de mensagens (debounce de mensagens rápidas do WhatsApp)
CREATE TABLE IF NOT EXISTS n8n_fila_mensagens_projeto (
id SERIAL PRIMARY KEY,
id_mensagem TEXT NOT NULL DEFAULT '',
telefone TEXT NOT NULL,
mensagem TEXT NOT NULL,
timestamp TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_fila_projeto_telefone ON n8n_fila_mensagens_projeto(telefone);
CREATE INDEX IF NOT EXISTS idx_fila_projeto_timestamp ON n8n_fila_mensagens_projeto(timestamp);
-- Histórico (LangGraph PostgresSaver cria tabelas próprias: checkpoint*)
-- Esta é uma tabela adicional (opcional) para logar mensagens em formato simples
CREATE TABLE IF NOT EXISTS dados_cliente_projeto (
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_chat_projeto_session ON dados_cliente_projeto(session_id);
-- Telemetria: sessões
CREATE TABLE IF NOT EXISTS dashboard_sessions (
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
phone_number TEXT NOT NULL,
contact_name TEXT NOT NULL DEFAULT '',
started_at TIMESTAMPTZ DEFAULT NOW(),
status TEXT NOT NULL DEFAULT 'active',
transferred BOOLEAN DEFAULT FALSE,
content_type TEXT DEFAULT 'TEXT'
);
CREATE INDEX IF NOT EXISTS idx_dash_sessions_started ON dashboard_sessions(started_at);
CREATE INDEX IF NOT EXISTS idx_dash_sessions_phone ON dashboard_sessions(phone_number);
CREATE INDEX IF NOT EXISTS idx_dash_sessions_status ON dashboard_sessions(status);
-- Telemetria: uso de tokens
CREATE TABLE IF NOT EXISTS dashboard_token_usage (
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL DEFAULT '',
provider TEXT NOT NULL,
model TEXT NOT NULL,
prompt_tokens INTEGER DEFAULT 0,
completion_tokens INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
cost_usd NUMERIC(10, 6) DEFAULT 0,
cost_brl NUMERIC(10, 4) DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dash_tokens_created ON dashboard_token_usage(created_at);
CREATE INDEX IF NOT EXISTS idx_dash_tokens_session ON dashboard_token_usage(session_id);
CREATE INDEX IF NOT EXISTS idx_dash_tokens_provider ON dashboard_token_usage(provider);
-- Telemetria: erros
CREATE TABLE IF NOT EXISTS dashboard_errors (
id SERIAL PRIMARY KEY,
session_id TEXT DEFAULT '',
phone_number TEXT DEFAULT '',
error_type TEXT NOT NULL,
error_message TEXT NOT NULL,
stack_trace TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dash_errors_created ON dashboard_errors(created_at);
CREATE INDEX IF NOT EXISTS idx_dash_errors_type ON dashboard_errors(error_type);
-- Telemetria: tags
CREATE TABLE IF NOT EXISTS dashboard_tags (
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL DEFAULT '',
phone_number TEXT NOT NULL,
tag_name TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dash_tags_created ON dashboard_tags(created_at);
CREATE INDEX IF NOT EXISTS idx_dash_tags_name ON dashboard_tags(tag_name);
Tabelas do LangGraph¶
PostgresSaver.setup() cria:
checkpointscheckpoint_blobscheckpoint_writescheckpoint_migrations
Não mexa nessas. São internas da biblioteca. Usam o mesmo DATABASE_URL.
Nomenclatura por projeto¶
Tabelas de fila e histórico usam sufixo do projeto:
| Projeto | Fila | Chat history |
|---|---|---|
| MEDCENTER | n8n_fila_mensagens_medcenter |
dados_cliente_medcenter |
| CLINFETO | n8n_fila_mensagens_clinfeto |
dados_cliente_clinfeto |
Controlado via env var:
Por que o prefixo n8n_
Histórico — era o nome usado no pipeline n8n original. Mantido por compatibilidade com ferramentas admin que esperam esse nome. Pode renomear em projetos novos.
Operações principais¶
Enfileirar mensagem (debounce)¶
# db.py
def enqueue_message(phone: str, message_id: str, message: str) -> None:
with _conn.cursor() as cur:
cur.execute(
f"INSERT INTO {MESSAGE_QUEUE_TABLE} (id_mensagem, telefone, mensagem) VALUES (%s, %s, %s)",
(message_id, phone, message),
)
Ler e limpar fila de um telefone¶
def fetch_and_clear_queue(phone: str) -> list[str]:
with _conn.cursor() as cur:
cur.execute(
f"DELETE FROM {MESSAGE_QUEUE_TABLE} WHERE telefone = %s RETURNING mensagem ORDER BY id",
(phone,),
)
rows = cur.fetchall()
return [r[0] for r in rows]
Conexão¶
Uma conexão global ao nível do módulo, com autocommit:
psycopg3 (psycopg, sem -binary) é o padrão. Aceita reconexão em caso
de drop.
Backup¶
- Sessões e telemetria (
dashboard_*): backup diário é suficiente. - Fila (
n8n_fila_*): pode perder — é transiente. - Chat history (
checkpoint*,dados_cliente_*): opcionalmente arquivar mensal — compliance/LGPD pode exigir apagar após X meses.
Limpeza de chat history¶
O PostgresSaver não expira automaticamente. Rotina típica:
-- Apagar sessões inativas há mais de 60 dias
DELETE FROM checkpoints
WHERE thread_id NOT IN (
SELECT session_id FROM dashboard_sessions
WHERE started_at > NOW() - INTERVAL '60 days'
);
Rodar em cron job separado (não dentro da aplicação).