DEV Community

Discussion on: Event Storage in Postgres, Multi-tenant

 
kspeakman profile image
Kasey Speakman • Edited

Here is a quick sketch of what the solution would look like that supported multiple concurrent writers. Using the strategy from that link. This is accomplished by making sequence number generation part of the parent transaction. I even added some rules to make the Event table append-only and the sequence counter update only to prevent maintenance mistakes. Also inspired by that link.

Here, I also renamed SequenceNum to Position.

--DROP TABLE IF EXISTS PositionCounter CASCADE;
--DROP FUNCTION IF EXISTS NextPosition();
--DROP TABLE IF EXISTS Event CASCADE;
--DROP TRIGGER IF EXISTS trg_EventRecorded ON Event;
--DROP FUNCTION IF EXISTS NotifyEvent();

-- transactional sequence number
CREATE TABLE IF NOT EXISTS PositionCounter
(
    Position bigint NOT NULL
);

INSERT INTO PositionCounter VALUES (0);

-- prevent removal / additional rows
CREATE RULE rule_positioncounter_noinsert AS 
ON INSERT TO PositionCounter DO INSTEAD NOTHING;
CREATE RULE rule_positioncounter_nodelete AS 
ON DELETE TO PositionCounter DO INSTEAD NOTHING;

-- function to get next sequence number
CREATE FUNCTION NextPosition() RETURNS bigint AS $$
    DECLARE
        nextPos bigint;
    BEGIN
        UPDATE PositionCounter
           SET Position = Position + 1
        ;
        SELECT INTO nextPos Position FROM PositionCounter;
        RETURN nextPos;
    END;
$$ LANGUAGE plpgsql;

-- event table
CREATE TABLE IF NOT EXISTS Event
(
    Position bigint NOT NULL,
    TenantId uuid NOT NULL,
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    Data jsonb,
    LogDate timestamptz NOT NULL DEFAULT now(),
    CONSTRAINT pk_event_position PRIMARY KEY (TenantId, Position),
    CONSTRAINT uk_event_streamid_version UNIQUE (TenantId, StreamId, Version)
) PARTITION BY LIST (TenantId);

-- Append only
CREATE RULE rule_event_nodelete AS 
ON DELETE TO Event DO INSTEAD NOTHING;
CREATE RULE rule_event_noupdate AS 
ON UPDATE TO Event DO INSTEAD NOTHING;

-- notification
CREATE FUNCTION NotifyEvent() RETURNS trigger AS $$

    DECLARE
        payload text;

    BEGIN
        -- { position }/{ tenantId }/{ streamId }/{ version }/{ event type }
        SELECT CONCAT_WS( '/'
                        , NEW.Position
                        , NEW.TenantId
                        , REPLACE(CAST(NEW.StreamId AS text), '-', '')
                        , NEW.Version
                        , NEW.Type
                        )
          INTO payload
        ;

        -- using lower case channel name or else LISTEN would require quoted identifier.
        PERFORM pg_notify('eventrecorded', payload);

        RETURN NULL;
    END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_EventRecorded
    AFTER INSERT ON Event
    FOR EACH ROW
    EXECUTE PROCEDURE NotifyEvent()
;
Enter fullscreen mode Exit fullscreen mode

And here's how to append an event with the transactional sequence number.

-- inside a transaction
INSERT
  INTO Event
     ( Position
     , TenantId
     , StreamId
     , Version
     , Type
     , Meta
     , Data
     )
VALUES
     ( NextPosition()
     , '3791B53677C840FC81CA65BFDAF34F1F'
     , 'A88F94DB6E7A439E9861485F63CC8A13'
     , 1
     , 'EmptyEvent'
     , '{}'
     , NULL
     )
;
Enter fullscreen mode Exit fullscreen mode
Thread Thread
 
ronlobo profile image
Ron Gonzalez Lobo

Thanks for putting this together, great read!

Working on porting this to YugaByteDB.

Thread Thread
 
skyjur profile image
Ski

Would it not be simpler instead to use explicit table lock when inserting events instead of PositionCounter? It limits performance but I believe result is same as PositionCounter table because it too needs a row level lock

begin;
lock table event;
insert into event ...
commit
Thread Thread
 
kspeakman profile image
Kasey Speakman

Simpler? It would be less to setup vs a Position table. In usage it's adding a lock statement vs calling NextPosition() in the insert.

I suppose you will just have to test for the performance difference between the two.