Pular para conteúdo

Banco de dados

PostgreSQL é a única persistência local. Nenhum dado clínico fica aqui (tudo vai para o OTIMUS). O DB serve para: fila de mensagens, histórico do agente e telemetria.

Schema completo

-- init.sql — trocar 'projeto' pelo nome canônico (medcenter, clinfeto, ...)

-- Fila de mensagens (debounce de mensagens rápidas do WhatsApp)
CREATE TABLE IF NOT EXISTS n8n_fila_mensagens_projeto (
    id SERIAL PRIMARY KEY,
    id_mensagem TEXT NOT NULL DEFAULT '',
    telefone TEXT NOT NULL,
    mensagem TEXT NOT NULL,
    timestamp TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_fila_projeto_telefone ON n8n_fila_mensagens_projeto(telefone);
CREATE INDEX IF NOT EXISTS idx_fila_projeto_timestamp ON n8n_fila_mensagens_projeto(timestamp);

-- Histórico (LangGraph PostgresSaver cria tabelas próprias: checkpoint*)
-- Esta é uma tabela adicional (opcional) para logar mensagens em formato simples
CREATE TABLE IF NOT EXISTS dados_cliente_projeto (
    id SERIAL PRIMARY KEY,
    session_id TEXT NOT NULL,
    role TEXT NOT NULL,
    content TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_chat_projeto_session ON dados_cliente_projeto(session_id);

-- Telemetria: sessões
CREATE TABLE IF NOT EXISTS dashboard_sessions (
    id SERIAL PRIMARY KEY,
    session_id TEXT NOT NULL,
    phone_number TEXT NOT NULL,
    contact_name TEXT NOT NULL DEFAULT '',
    started_at TIMESTAMPTZ DEFAULT NOW(),
    status TEXT NOT NULL DEFAULT 'active',
    transferred BOOLEAN DEFAULT FALSE,
    content_type TEXT DEFAULT 'TEXT'
);
CREATE INDEX IF NOT EXISTS idx_dash_sessions_started ON dashboard_sessions(started_at);
CREATE INDEX IF NOT EXISTS idx_dash_sessions_phone ON dashboard_sessions(phone_number);
CREATE INDEX IF NOT EXISTS idx_dash_sessions_status ON dashboard_sessions(status);

-- Telemetria: uso de tokens
CREATE TABLE IF NOT EXISTS dashboard_token_usage (
    id SERIAL PRIMARY KEY,
    session_id TEXT NOT NULL DEFAULT '',
    provider TEXT NOT NULL,
    model TEXT NOT NULL,
    prompt_tokens INTEGER DEFAULT 0,
    completion_tokens INTEGER DEFAULT 0,
    total_tokens INTEGER DEFAULT 0,
    cost_usd NUMERIC(10, 6) DEFAULT 0,
    cost_brl NUMERIC(10, 4) DEFAULT 0,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dash_tokens_created ON dashboard_token_usage(created_at);
CREATE INDEX IF NOT EXISTS idx_dash_tokens_session ON dashboard_token_usage(session_id);
CREATE INDEX IF NOT EXISTS idx_dash_tokens_provider ON dashboard_token_usage(provider);

-- Telemetria: erros
CREATE TABLE IF NOT EXISTS dashboard_errors (
    id SERIAL PRIMARY KEY,
    session_id TEXT DEFAULT '',
    phone_number TEXT DEFAULT '',
    error_type TEXT NOT NULL,
    error_message TEXT NOT NULL,
    stack_trace TEXT DEFAULT '',
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dash_errors_created ON dashboard_errors(created_at);
CREATE INDEX IF NOT EXISTS idx_dash_errors_type ON dashboard_errors(error_type);

-- Telemetria: tags
CREATE TABLE IF NOT EXISTS dashboard_tags (
    id SERIAL PRIMARY KEY,
    session_id TEXT NOT NULL DEFAULT '',
    phone_number TEXT NOT NULL,
    tag_name TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dash_tags_created ON dashboard_tags(created_at);
CREATE INDEX IF NOT EXISTS idx_dash_tags_name ON dashboard_tags(tag_name);

Tabelas do LangGraph

PostgresSaver.setup() cria:

  • checkpoints
  • checkpoint_blobs
  • checkpoint_writes
  • checkpoint_migrations

Não mexa nessas. São internas da biblioteca. Usam o mesmo DATABASE_URL.

Nomenclatura por projeto

Tabelas de fila e histórico usam sufixo do projeto:

Projeto Fila Chat history
MEDCENTER n8n_fila_mensagens_medcenter dados_cliente_medcenter
CLINFETO n8n_fila_mensagens_clinfeto dados_cliente_clinfeto

Controlado via env var:

CHAT_HISTORY_TABLE=dados_cliente_novoprojeto
MESSAGE_QUEUE_TABLE=n8n_fila_mensagens_novoprojeto

Por que o prefixo n8n_

Histórico — era o nome usado no pipeline n8n original. Mantido por compatibilidade com ferramentas admin que esperam esse nome. Pode renomear em projetos novos.

Operações principais

Enfileirar mensagem (debounce)

# db.py
def enqueue_message(phone: str, message_id: str, message: str) -> None:
    with _conn.cursor() as cur:
        cur.execute(
            f"INSERT INTO {MESSAGE_QUEUE_TABLE} (id_mensagem, telefone, mensagem) VALUES (%s, %s, %s)",
            (message_id, phone, message),
        )

Ler e limpar fila de um telefone

def fetch_and_clear_queue(phone: str) -> list[str]:
    with _conn.cursor() as cur:
        cur.execute(
            f"DELETE FROM {MESSAGE_QUEUE_TABLE} WHERE telefone = %s RETURNING mensagem ORDER BY id",
            (phone,),
        )
        rows = cur.fetchall()
    return [r[0] for r in rows]

Conexão

Uma conexão global ao nível do módulo, com autocommit:

import psycopg
_conn = psycopg.connect(DATABASE_URL, autocommit=True)

psycopg3 (psycopg, sem -binary) é o padrão. Aceita reconexão em caso de drop.

Backup

  • Sessões e telemetria (dashboard_*): backup diário é suficiente.
  • Fila (n8n_fila_*): pode perder — é transiente.
  • Chat history (checkpoint*, dados_cliente_*): opcionalmente arquivar mensal — compliance/LGPD pode exigir apagar após X meses.

Limpeza de chat history

O PostgresSaver não expira automaticamente. Rotina típica:

-- Apagar sessões inativas há mais de 60 dias
DELETE FROM checkpoints
WHERE thread_id NOT IN (
    SELECT session_id FROM dashboard_sessions
    WHERE started_at > NOW() - INTERVAL '60 days'
);

Rodar em cron job separado (não dentro da aplicação).