DEV Community

பாலாஜி
பாலாஜி

Posted on

How to use pg_cron in postgres to do historic aggregation?

What is Postgres?

Postgres is a popular sql database that allows end users to insert data into table-like structures and retrieve it using ANSI-SQL.

Problem Statement

Recently, I ended up with the use case of historic aggregation of numeric columns.

To explain the problem better, let's say we have a schema for a banking application. The schema contains balance column in the customer table. Now, we want to know sum of all your customer balances day-wise.

For e.g: what is the total balance of all your customers yesterday, today and the upcoming days.
Sum of Deposited cash
This historic aggregation, helps us to measure the growth of the deposited money over a period of time.

The obvious logical step to solve the problem would be having a periodic job to calculate the sum and insert it into a new table. But, setting up the cron job and maintaining a separate data pipeline is too much of a headache.

Hack

Luckily, postgres has an extension to schedule cron jobs called pg_cron.

With the help of pg_cron, we can bring down the job of writing the code and deploying it as a service to writing few SQL queries.

Now that we have simplified the job, let's create a new table with the following columns to store our historic aggregations:

  • calculated_at - timestamp of the calculated metrics
  • metric_name - I've added this metric_name column, so that in future, if i want to add more metrics, I can use this table again
  • metric_value - aggregated value.
CREATE TABLE historic_numeric_aggregation
  (
     calculated_at TIMESTAMP WITH time zone,
     metric_name   TEXT,
     metric_value  NUMERIC
  ) 
Enter fullscreen mode Exit fullscreen mode

The next step after creating the table is to populate the table with the calculated metrics value everyday. To do that, I'm creating a postgres function, which inserts the sum of balances of the customer to the historic_numeric_aggregation table.

CREATE
OR
replace FUNCTION aggregate_customer_balance_sum()
returns void AS
$$
BEGIN
INSERT INTO historic_numeric_aggregation
              (
                          calculated_at,
                          metric_name,
                          metric_value
              )
              VALUES
              (
                          CURRENT_TIMESTAMP,
                          'customer_sum_balance',
                          (
                                 SELECT sum(balance)
                                 FROM   customers)
              );
END
$$ 
language 'plpgsql';
Enter fullscreen mode Exit fullscreen mode

So far we have created all the building blocks to calculate the metrics.

Let's schedule the aggregate_customer_balance_sum function to run every day morning at 10'O clock.

SELECT cron.schedule('customer_sum_aggregation', '0 10 * * *',
              'select * from aggregate_customer_balance_sum()'); 
Enter fullscreen mode Exit fullscreen mode

Voila, now we have a cron job to do the historic metrics aggregation.

Note: In this post, I've not explained how to enable the pg_cron extenstion. Because, every flavour of postgres has its own way of installing and enabling it. I've followed AWS tutorial to enable pg_cron on our RDS cluster.

By the way, I'm building a centralized access control solution for postgres to protect customer privacy. If you are curious, follow this link to know more https://github.com/poonai/inspektor

Top comments (0)