DEV Community

Cover image for Realtime PostgreSQL - Escutando o seu banco de dados com Supabase
Vitor Braggion
Vitor Braggion

Posted on

Realtime PostgreSQL - Escutando o seu banco de dados com Supabase

Fala dev, tudo bem?

Talvez você já deve ter se perguntado como ficar 'escutando' o seu banco de dados para fazer alguma ação na sua aplicação. Seja listando dados em tempo real para o seu usuário, seja escutando alguma alteração em algum dado para disparar  algum evento/ação de forma automática (ex: notificação, etc).  E qual seria a melhor forma para conseguir isso? Ficar todo momento consultando o banco e vendo se foi realizado alguma alteração?  Isso certamente é uma dor de cabeça grande e também com certeza irá gerar uma sobrecarga desnecessária na sua aplicação e no seu banco de dados (já vi na prática empresas fazendo isso e depois de um tempo tempo que refazer boa parte do sistema rsrs)  Lembrando que quanto menos IO sua aplicação fizer é melhor, pois o custo de processamento é muito alto. Então é bem importante saber gerenciar bem operações IO.

Há soluções bem simples que não precisamos fazer muitas ações, pois já entregam praticamente pronto. Um exemplo disso é o banco de dados realtime do Firebase, ele consegue abstrair muitas coisas e ficar escutando as mudanças do banco e já entregar para o seu client de forma 'automática'. É muito recomendado o uso dele, muitas pessoas usam para mobiles. Porém pensando em um cenário em que você já tenha o seu banco de dados e realizar essa migração para o Firebase ou essa gestão de duplicação de dados para lá não compense o esforço, talvez simplesmente por algum outro motivo vocês queiram manter o seu banco atual ou queira utilizar o Postgres. E sim, há sim outras alternativas mantendo o seu banco.

Eu irei mostrar para vocês como escutar o seu banco de dados PostgreSQL usando uma biblioteca muito interessante chamada de Supabase, que segundo eles é uma alternativa open-source para o Firebase. Esse projeto já possui vários modulos interessantes, como: Fire Storage, Database Realtime, Authentication/Authorization, Functions, Auto-generated APIs, etc. Nele iremos usar o seu modulorealtime.

Mas antes de simplesmente colocar a mão no código, implementar a lib e fingir que tudo acontece como uma mágica, vamos primeiro entender um pouco melhor sobre esse projeto e como ele funciona e quais são os conceitos utilizados por trás dele. Esse projeto é um servidor construido com a linguagem Elixir usando o framework Phoenix, que permite você escutar e replicar mudanças no seu banco de dados PostgreSQL através de logical replication.

Mas afinal, o que é esse tal de logical replication?

Resumidamente logical replication é um método de replicar dados, seja o dado completo ou suas mudanças, para um outro lugar. Geralmente é utilizado um publish e subscribe node, onde o publisher é responsável por "coletar" um conjunto de mudanças de uma ou mais tabelas (publication) e o subscriber que é responsável por 'assinar' ou 'colecar' as publications.

E pensando no cenário de um banco de dados, vai se referir em replicar dados entre dois ou mais banco de dados. **Ele vai capturar as mudanças feitas no banco (ex: INSERTS, UPDATES, DELETES, etc) em uma camada do banco de dados e enviar essas mudanças para os banco de dados que ele queira (targets).

Geralmente isso é feito através de WAL, Write-ahead logging, que é basicamente para escrever logs antes de excutar uma tarefa. Essa técnica garante integridade, atomicidade, durabilidade, entre vários outros benefícios (Muito utilizado em sistemas baseados em eventos). Ou seja, em um cenário de banco dados, quaisquer mudança no arquivo do banco que se refere aos dados, primeiramente terá que ser feito o log, assim que logado, irá fazer a alteração no arquivo do banco de dados.

Mas voltando ao foco principal, vamos começar implementar um pequeno projeto para testarmos o Supabase e como 'escutar' nosso banco de dados. Com isso, vamos criar um projeto em que iremos listar todos os serviços de acordo com seu status (pendente, em andamento e concluído), e obviamente sem ficar consultando várias vezes o banco de dados para ver se houve alguma modificação, queremos que nossa aplicação seja realtime, qualquer alteração que houver, ele precisa ser atualizado imediatamente. *Em um projeto maior, isso pode tirar uma carga enorme do banco de dados.

Architecture

Então basicamente, iremos utilizar o Supabase realtime para escutar todas as modificações do nosso banco de dados e enviá-las para a nossa aplicação NodeJS, em que essa aplicação irá gerenciar esses dados modificados e enviar para os clients via WebSocket.

project working

Vou adicionar as principais partes do projeto e explicar elas. E no final disponibilizarei o link do repositório, em que poderá ver o código completo do projeto.

ESTRUTURA DO PROJETO

Project Architecture

Essa será nossa estrutura do projeto, abaixo há detalhes das principais partes:

/docker-compose.yml: Esse arquivo é a configuração do Docker Compose em que teremos as configurações de dois containers. Um para utilizar o Supabase Realtime, ele que será responsável por ficar escutando qualquer alteração no banco de dados. E um outro container que será nosso banco de dados Postgres.

version: '3'

services:
  realtime:
    image: supabase/realtime:v1.0.0
    ports:
      - "4000:4000"
    environment:
      DB_HOST: postgres-realtime_db_1
      DB_NAME: postgres
      DB_USER: postgres
      DB_PASSWORD: postgres
      DB_PORT: 5432
      PORT: 4000
      # JWT_SECRET: SOMETHING_SUPER_SECRET
      SECURE_CHANNELS: 'false'
    networks:
      - realtime-network
    depends_on:
      - db

  db:
    build: database/.
    restart: always
    environment: 
      POSTGRES_PASSWORD: postgres
    ports:
      - "5432:5432"
    networks:
      - realtime-network

networks:
  realtime-network:
    driver: bridge
Enter fullscreen mode Exit fullscreen mode

/database: Essa pasta terá as configurações do Docker para o nosso banco de dados e configurações do banco para conseguirmos escutá-lo em realtime.

Essa será nossa estrutura do banco de dados:

Database

/database/Dockerfile: Esse arquivo do docker irá ser chamado pelo docker-compose.yml e irá utilizar uma imagem do Postgres e rodar um arquivo (/database/scripts/00-init.sql) com os scripts para criar nossas tabelas no banco de dados e realizar algumas configurações necessárias para que o Supabase consiga escutar as mudanças em nosso banco de dados.

/database/scripts/00-init.sql: Esse arquivo possui o código para criamos as tabelas do nosso banco de dados e as configurações necessárias para que o Supabase consiga escutar as mudanças em nosso banco de dados.

Script para criar as tabelas e inserir dados:


CREATE TABLE public.status (
  id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
  name VARCHAR(255
);
INSERT INTO 
    public.status (name) 
VALUES 
    ('PENDDING'),
    ('IN_PROGRESS'),
    ('CONCLUDED')

CREATE TABLE public.service (
  id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  status_id bigint REFERENCES public.status NOT NULL
);

INSERT INTO 
    public.service (name, status_id)
VALUES 
    ('Reboque', 1),
    ('Bateria', 3),
    ('Pane Seca', 1),
    ('Troca de Luz', 2);
Enter fullscreen mode Exit fullscreen mode

O Supabase utliza o WAL para capturar as modifições no nosso banco. Então antes precisamos configurar algumas coisas em nosso banco de dados, para que o Supabase Realtime consiga trabalhar.

-- adds information necessary to support logical decoding.
-- Each level includes the information logged at all lower levels.
ALTER SYSTEM SET wal_level='logical'; 

-- Specifies the maximum number of concurrent connections from standby servers or streaming base backup clients
-- (i.e., the maximum number of simultaneously running WAL sender processes). 
-- 10 already is the default value
-- RECOMMENDATIONS: If you are replicating, you want to set this to the maximum number of standby servers you mighy possibly have
-- Performance impact when set above zero, but no additional penalty for setting it higher.s
ALTER SYSTEM SET max_wal_senders='10'; 

-- Specifies the maximum number of replication slots (see streaming-replication-slots) that the server can support
-- 10 already is the default value
ALTER SYSTEM SET max_replication_slots='10';

-- Create the Replication publication 
CREATE PUBLICATION supabase_realtime FOR ALL TABLES;

-- Send the previus values
ALTER TABLE public.service REPLICA IDENTITY FULL;
ALTER TABLE public.status REPLICA IDENTITY FULL;

Enter fullscreen mode Exit fullscreen mode

*Todas essas configurações são necessárias, e todas elas estão feitas de acordo com o projeto que iremos criar. Por exemplo, mencionando as tabelas do nosso projeto. Em nosso projeto final, adicionei essas configurações e o script de criação das nossas tabelas para serem executadas de forma automática ao inciar nosso docker.

/server : Dentro dessa pasta terá os códigos do nosso projeto que será responsável por receber as modificações que o Supabase Realtime identificou e mandar para o nosso client via websocket.

Dentro desse 'projeto’ teremos tudo muito bem separado de acordo com suas responsabilidades.

  • /server/services/ : Terá todos os nosso serviços que iremos utilizar em nosso projeto.

    • Database: responsável por realizar queries em nosso banco. Iremos utilizá-lo somente quando iniciar o projeto. Pois iremos buscar todos os nossos dados no banco para assim depois só atualizar as mudanças que acontecerem.
    const pg = require('pg');
    
    class Database {
      pgPool;
      pgClient;
    
      constructor() {
        this.pgPool = new pg.Pool({
          host: '0.0.0.0',
          user: 'postgres',
          password: 'postgres',
          database: 'postgres',
          port: 5432,
        });
      }
    
      async connect() {
        try {
          this.pgClient = await this.pgPool.connect();
        } catch (error) {
          console.log('Connection error' , error)
        }
      }
    
      async query({ text, params }) {
        const res = await this.pgClient.query(text, params);
    
        return res;
      }
    
      disconnect() {
        this.pgClient.release();
      }
    
      async disconnectPool() {
        await this.pgPool.end();
      }
    }
    
    module.exports = Database;
    
    • DatabaseListener: responsável por escutar as mudanças que o Supabase identificar do nosso banco dados. Utilizaremos a lib @supabase/realtime-js para nos ajudar.
    const { RealtimeClient } = require('@supabase/realtime-js'); // "@supabase/realtime-js": "^1.3.3",
    
    class DatabaseListener {
    
        // creating the connection with our supabase realtime, it's running on docker at port 4000
      socket = new RealtimeClient(process.env.REALTIME_URL || 'ws://localhost:4000/socket');
    
      constructor() {
          // when start our DatabaseListner, we'll connect and listen our connection
        this.socket.connect();
    
        this.socket.onOpen(() => console.log('Socket opened.'));
        this.socket.onClose(() => console.log('Socket closed.'));
        this.socket.onError((e) => console.log('Socket error', e.message));
      }
    
        // generic method to listen our database by channel
      _on(eventName, callback, channel = '*') {
        const databaseChanges = this.socket.channel(`realtime:${channel}`);
    
        databaseChanges.on(eventName, (e) => {
          console.log('EVENT', e);
          callback(e);
        });
        databaseChanges.subscribe();
      }
    
        // method to listen all modifications from our database (update, insert, delete') by channel
      onAll(callback, channel = '*') {
        this._on('*', callback, channel);
      }
    
        // method to listen updates events in our database by channel
      onUpdate(callback, channel = '*') {
        this._on('UPDATE', callback, channel);
      }
    
        // method to listen inserts events in our database by channel
      onInsert(callback, channel = '*') {
        this._on('INSERT', callback, channel);
      }
    
        // method to listen deletes events in our database by channel
      onDelete(callback, channel = '*') {
        this._on('DELETE', callback, channel);
      }
    }
    
    module.exports = DatabaseListener;
    
    • Socket: responsável por gerenciar o socket que teremos com o nosso client (frontend em nosso caso) para enviar as modificões do banco de dados.
    const socket = require('socket.io'); // "socket.io": "^4.4.0"
    
    class Socket {
    
      io = null;
    
      constructor(server) {
          // starting socket server
        this.io = socket(server);
      }
    
      getIo() {
         // method to get the socket connection
       return this.io; 
      }
    
        // method listen the socket connection 
      onConnection(callback) {
        console.log(`Connected socket ${socket.id}`);
    
        this.io.on('connection', (socket) => {
          callback(socket);
        });
      }
    
        // emit an event with some data to our clients
      sendToAllSubscribers(eventName, data) {
        this.io.emit(eventName, data);
      }
    
    }
    
    module.exports = Socket;
    
  • /server/models/ : Terá todos os modelos para interagirmos com as nossas tabelas do nosso banco dedados. Será separado por tabela, como só iremos interagir com o 'Services’, então só teremos ela.

    • Service: Esse arquivo irá realizar uma conexão com o nosso banco de dados e buscar todos os serviços e seus status. Será chamado somente ao iniciar o código, pois iremos buscar todos os dados e depois ficar só escutando as mudanças, assim evitando de ir no banco de dados várias vezes.
    const Database = require('../../services/Database');
    
    class Service {
    
        // Don't allow instance this class, it'll be possible just use static methods
      constructor() {
        throw new Error('It is not instantiable')
      }
    
        // method to fetch all services in our database. We'll use this method when our server start
      static async findAll() {
        let servicesFound = [];
    
        try {
          const database = new Database();
          await database.connect();
    
          const result = await database.query({
            text: 'SELECT * FROM public.service',
            params: []
          });
    
          servicesFound = result?.rows || [];
    
        } catch (error) {
          console.log(error);
        }
    
        return servicesFound;
      }
    
    }
    
    module.exports = Service;
    
  • /server/controllers : Terá as regras de negócio para fazer o projeto acontecer. Irá utilizar os models e os services do projeto. No nosso caso só teremos um controller.

    • ServiceController: Nesse controller iremos buscar todos os ‘Services’ do nosso banco de dados e enviar para o nosso client via websocket e iremos ficar escutando quaisquer mudança no banco de dados, e caso haja alguma, enviaremos essa mudança para os clients via websocket.
    const DatabaseListener = require('../../services/DatabaseListener');
    const Service = require('../../models/Service');
    
    // variable to help us to map our services by status
    const helperToServiceMapStatus = {
      1: 'servicesPendding',
      2: 'servicesInProgress',
      3: 'servicesConcluded'
    };
    
    class ServiceController {
    
        // instance of ous Database Listener
      static databaseChanges = new DatabaseListener();
      // arrays that our services in database will be saved by status. It'll be saved in memory and it'll send for our clients by socket
      // warn: all services will be saved in memory, so you should be alert about the size of these list. Maybe in a production it's better you save it in a kind of database, like redis
      static servicesDto = {
        servicesPendding: [],
        servicesInProgress: [],
        servicesConcluded: []
      };
    
        // don't allow instance this class, it'll be just allowed use static methods
      constructor() {
        throw new Error('It is not instantiable')
      }
    
        // method to fetch all service from database, separe it by status and send to our clients by socket
      static async fetchServicesFromDatabaseAndSend(socket) {
        const servicesFound = await Service.findAll();
    
        servicesFound.map(service => {
          const serviceKey = helperToServiceMapStatus[service.status_id];
          ServiceController.servicesDto[serviceKey].push(service);
        });
    
        socket.onConnection((_socket) => {
          console.log(`Socket connection `, _socket.id);
          socket.sendToAllSubscribers('receivedMessage', ServiceController.servicesDto);
        })
      }
    
        // listing all service status update in our database, then send it to our clients by socket
      static listenAndSendServicesStatus(socket) {
    
                // listing updates in our table 'service' in the public schema with status_id equals 1
          ServiceController.databaseChanges.onUpdate((e) => {
            ServiceController._managementState(e.old_record, e.record);
            socket.sendToAllSubscribers('receivedMessage', ServiceController.servicesDto);
          }, 'public:service:status_id=eq.1');
    
                // listing updates in our table 'service' in the public schema with status_id equals 2
          ServiceController.databaseChanges.onUpdate((e) => {
            ServiceController._managementState(e.old_record, e.record);
            socket.sendToAllSubscribers('receivedMessage', ServiceController.servicesDto);
          }, 'public:service:status_id=eq.2');
    
                // listing updates in our table 'service' in the public schema with status_id equals 3
          ServiceController.databaseChanges.onUpdate((e) => {
            ServiceController._managementState(e.old_record, e.record);
            socket.sendToAllSubscribers('receivedMessage', ServiceController.servicesDto);
          }, 'public:service:status_id=eq.3');
        }
    
        // method to help us to manage our data that we're sending our client 
      static _managementState(oldRecord, newRecord) {
          const oldService = helperToServiceMapStatus[oldRecord.status_id];
          const newService = helperToServiceMapStatus[newRecord.status_id];
    
          ServiceController.servicesDto[oldService] = ServiceController.servicesDto[oldService].filter(_service => (_service.id != oldRecord.id));
          ServiceController.servicesDto[newService].push(newRecord);
        }
    
    }
    
    module.exports = ServiceController;
    
  • /server/src/server.js : Será onde iremos configurar nosso servidor para rodar o nosso frontend utilizando express e além disso iniciar nosso projeto backend.


const express = require('express'); //"express": "^4.17.1",
const path = require('path');
const app = express();
const server = require('http').createServer(app);
const Socket = require('../services/Socket');
const ServiceController = require('../controllers/ServiceController');

app.use(express.static(path.join(__dirname, '../../public')));
app.set('views', path.join(__dirname, '../../public'));
app.engine('html', require('ejs').renderFile);
app.set('view engine', 'html');

app.use('/public', (req, res) => {
  res.render('index.html');
});

(async () => {
    // starting our socket
  const socket = new Socket(server);

    // fetching our all database data and sending to our clients by socket
  ServiceController.fetchServicesFromDatabaseAndSend(socket);
  // listing our database changes and sending it to our clients by socket
  ServiceController.listenAndSendServicesStatus(socket);
})();

server.listen(8181);
Enter fullscreen mode Exit fullscreen mode
  • /public/index.html : Nosso projeto frontend, em que terá um simples layout em que irá listar nossos serviços no banco de dados de acordo com o seu status. Ele irá abrir uma conexão de socket com o nosso backend para que o backend consiga enviar as mudanças de status que teremos em nossos serviços.
<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>Services</title>
</head>
<body>
  <h2>Pendding Services</h2>
  <ul id="pendding"></ul>
  <br><br>

  <h2>In Progress Services</h2>
  <ul id="inProgress"></ul>
  <br><br>

  <h2>Concluded Services</h2>
  <ul id="concluded"></ul>
  <br><br>

</body>
<script type="module">
  import { io } from "https://cdn.socket.io/4.4.0/socket.io.esm.min.js";

  const socket = io('http://localhost:8181');
  const htmlPendding = document.getElementById('pendding');
  const htmlInProgress = document.getElementById('inProgress');
  const htmlConcluded = document.getElementById('concluded');

  socket.emit('sendMessage', {
    text: 'working'
  });
    // listing our socket to receive our data
  socket.on('receivedMessage', (data) => {
    htmlPendding.innerHTML = '';
    htmlInProgress.innerHTML = '';
    htmlConcluded.innerHTML = '';


        // putting our socket data in its specific group
    const { servicesPendding, servicesInProgress, servicesConcluded } = data;

    servicesPendding.map(service => {
      console.log('service', service);
      const item = document.createElement('li');
      item.textContent = service.name;
      htmlPendding.appendChild(item);
      window.scrollTo(0, document.body.scrollHeight);
    });

    servicesInProgress.map(service => {
      const item = document.createElement('li');
      item.textContent = service.name;
      htmlInProgress.appendChild(item);
      window.scrollTo(0, document.body.scrollHeight);
    });

    servicesConcluded.map(service => {
      console.log('service concluded', service);
      const item = document.createElement('li');
      item.textContent = service.name;
      htmlConcluded.appendChild(item);
      window.scrollTo(0, document.body.scrollHeight);
    });

  });
</script>
</html>
Enter fullscreen mode Exit fullscreen mode

/package.json:

{
  "name": "realtime-pg",
  "version": "1.0.0",
  "main": "index.js",
  "license": "MIT",
  "scripts": {
    "start": "forever -w --minUptime 2000 --spinSleepTime 2000 server/src/server.js",
    "start-node": "node server/src/server.js"
  },
  "dependencies": {
    "@supabase/realtime-js": "^1.3.3",
    "ejs": "^3.1.6",
    "express": "^4.17.1",
    "forever": "^4.0.1",
    "pg": "^8.7.1",
    "socket.io": "^4.4.0"
  }
}
Enter fullscreen mode Exit fullscreen mode

Como rodar o projeto:

  1. Garantir que você possui o Docker e o Docker Compose instalados em sua máquina;
  2. Depois rodar na raiz do projeto o seguinte comando docker-compose up —-build . Isso irá buildar o docker e iniciar os containers. Irá subir o container do Supabase e um dos nosso banco de dados Postgres.
  3. instalar as dependencias e rodar o código para inciar o nosso projeto. Você pode rodar o seguinte comando npm install && npm run start-node .
  4. Agora faça alterações na coluna status_id da tabela public.service e você conseguirá ver os logs de modificações com o valor antigo e novo valor alterado pelo terminal do item 3. Caso queira visualizar a nossa página web atualizando o html automaticamente, você pode acessar o link: http://localhost:8181.

Github:

https://github.com/VitorBrangioni/postgres-realtime

Esse projeto é algo bem interessante para pensarmos em nossa aplicação em realtime, podendo disparar vários eventos de acordo com qualquer mudança em nosso banco de dados Postgres. Como por exemplo, enviar um push notification para um client sempre que acontecer determinada mudança no banco. Então poderiamos criar um projeto isolado só para essas ações em realtime, deixando nossa api ou um determinado serviço com regras mais simples, limpas e focadas.

Espero que esse conteúdo tenham ajudado vocês de alguma forma, caso tenham alguma outra soluções que vocês conhecem que possa nos ajudar, por favor, compartilhe e vamos trocar experiência! ;) l

E qualquer dúvida, estou a disposição no meu Linkedin ou no meu Instagram: - - https://www.linkedin.com/in/vitorbrangioni/

https://www.instagram.com/vitorbrangioni.dev/

Lá irei contar mais sobre o dia a dia de um Engenheiro de Software/Empreendedor.

Obrigado pelo seu tempo e bons códigos! 😊

Top comments (0)