DEV Community

loading...
Cover image for Persisting event data to Postgres using GenStage and EventBus

Persisting event data to Postgres using GenStage and EventBus

Mustafa Turan
Go, Open Source
・1 min read

One of the ways to consume EventBus events is implementing GenStage consumers. GenStage handles backpressure easily with configurable workers. event_bus_postgres library uses GenStage to persist event_bus events to postgres DB with batch insert.

How it works

+-----+
|     |                                         GEN STAGE
|     |        EVENTBUS      +------------------------------------------+
|     |        CONSUMER      |                   +---+                  |
|     |        +-----+       |                   |   |                  |
|     |        |     |       |                   |   |          +---+   |
|     |        |  E  |       |                   |   |          |   |   |
|     |        |  v  |       |                   |   |          |   |   |
|     |        |  e  |       |                   |   |          |   |   |
|  E  |        |  n  |       |                   | E |          |   |   |
|  l  |        |  t  |       |  +-------+        | v |          |   |   |
|  i  | topic  |  B  |  topic   |       |        | e |          |   |
|  x  |   +    |  u  |    +     |   Q   |        | n |          | B |       +--+
|  i  |event_id|  s  | event_id |   u   |   ask  | t |    ask   | u |       |  |
|  r  |------->|  .  |--------->|   e   |<-------|   | <--------| c | BATCH |  |
|     |        |  P  |          |   u   |------->| M | -------->| k |------>|DB|
|  E  |        |  o  |          |   e   |   pull | a |    pull  | e | INSERT|  |
|  v  |        |  s  |          |       |        | p |          | t |       |  |
|  e  |        |  t  |       |  +-------+        | p |          |   |   |   +--+
|  n  |        |  g  |       |  GENSTAGE         | e |          |   |   |
|  t  |        |  r  |       |  PRODUCER         | r |          |   |   |
|  B  |        |  e  |       |                   |   |          |   |   |
|  u  |        |  s  |       |                   |   |          |   |   |
|  s  |        +-----+       |                   |   |          |   |   |
|     |<-----------------------------------------|   |          +---+   |
|     |                      |    fetch_event/1  |   |         CONSUMER |
|     |                      |                   |   |                  |
+-----+                      |                   +---+                  |
                             |                  CONSUMER                |
                             |                  PRODUCER                |
                             +------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

Components

EventBus
Message bus for Elixir; it publishes event_id and topic data to topic subscribers.

EventBus.Postgres
Message bus event consumer; it pushes event_id and topic to the EventBus.Postgres.Queue

Queue
GenStage producer; it is a simple queue implementaion

EventMapper
GenStage producer-consumer; it pulls/dequeues from EventBus.Postgres.Queue, and fetch original event from EventBus and then convert data into Ecto model.

Bucket
GenStage consumer; it pulls/dequeues from EventBus.Postgres.EventMapper and batch insert data to Postgres DB.

Source code: https://github.com/otobus/event_bus_postgres

Discussion (0)