When doing ETL, we always write data into the database one batch at a time, which contains the snapshot of each batch. these snapshots are basically historical data and may contain a lot of duplicates, so how do we find the data we need in these batches?
This article will use a simple scenario to describe some of our common processing methods, mainly window function.
User Scenario
Suppose a company uses a BPM system to manage employee changes and salary changes, then a simple BPM system will usually have two tables with the details of the employee and the salary of each person as follows.
empid | ename | birthday |
---|---|---|
1 | Ravi | 1990-1-1 |
2 | Raj | 1980-2-3 |
3 | Ram | 1970-3-5 |
empid | sal |
---|---|
1 | 3000 |
2 | 4000 |
3 | 5000 |
When working with relational databases, we usually normalize the data, i.e., we plan the tables in terms of requirement dimensions and keep only the necessary fields, which are related to each other by foreign keys.
But in the data analysis context, we often need a fact table, which means everything is aggregated together. An example of a fact table for salary analysis might look like the following.
empid | ename | sal |
---|---|---|
1 | Ravi | 3000 |
2 | Raj | 4000 |
3 | Ram | 5000 |
The common practice in aggregation is batch processing ETL, where each time the required columns are fetched from the employee details table and salary table and saved for later analysis such as salary increase trend. The fact table after the ETL aggregation is as follows.
This is an ETL for batch processing in days, and the two tables will be aggregated every day, so there will be job_process_id
for batch processing and create_time
for data creation.
Here is the SQL used in the example.
CREATE TABLE "public"."cdc_test" (
"id" serial NOT NULL,
"create_time" DATE NOT NULL,
"job_process_id" INT NOT NULL,
"empid" INT NOT NULL,
"ename" VARCHAR(100) NOT NULL,
"sal" INT NOT NULL,
PRIMARY KEY ("id")
);
INSERT INTO "public"."cdc_test" ("id", "create_time", "job_process_id", "empid", "ename", "sal") VALUES
(1, '2018-12-10', 111, 1, 'Ravi', 3000),
(2, '2018-12-10', 111, 2, 'Raj', 4000),
(3, '2018-12-10', 111, 3, 'Ram', 5000),
(4, '2018-12-11', 112, 1, 'Ravi', 3000),
(5, '2018-12-11', 112, 2, 'Raj', 4000),
(6, '2018-12-11', 112, 3, 'Ram', 5000),
(7, '2018-12-11', 112, 4, 'Srini', 6500),
(8, '2018-12-12', 113, 1, 'Ravi', 7000),
(9, '2018-12-12', 113, 2, 'Raj', 4000),
(10, '2018-12-12', 113, 3, 'Ram', 5000),
(11, '2018-12-12', 113, 4, 'Srini', 6500);
In this example there are two state changes.
- Srini was on board on 2018-12-11.
- Ravi got a pay raise on 2018-12-12.
Get the latest state
As we can see from the above figure, the fact table generated by ETL will contain a lot of duplicate data, if we need to know the current state and to exclude duplicates, how to do? This is where the window function comes into play.
SELECT
t.*
FROM (
SELECT
cdc_test.*,
(ROW_NUMBER() OVER (PARTITION BY empid,
ename ORDER BY create_time DESC)) AS seqnum
FROM
cdc_test) t
WHERE
t.seqnum = 1;
By using PARTITION BY
to lock the fixed columns (empid, ename)
, and sorting the time in reverse order, we can know the first one will be the latest state.
The result is as follows.
id | create_time | job_process_id | empid | ename | sal |
---|---|---|---|---|---|
8 | 2018-12-12 | 113 | 1 | Ravi | 7000 |
2 | 2018-12-10 | 111 | 2 | Raj | 4000 |
3 | 2018-12-10 | 111 | 3 | Ram | 5000 |
7 | 2018-12-11 | 112 | 4 | Srini | 6500 |
Get changes per salary
With the latest state, we usually still want to know about every salary change. We can still achieve our goal through the window function.
SELECT
t.*
FROM (
SELECT
cdc_test.*,
(ROW_NUMBER() OVER (PARTITION BY empid,
ename,
sal ORDER BY create_time)) AS seqnum
FROM
cdc_test) t
WHERE
t.seqnum = 1
This time, we PARTITION BY
to lock the target is (empid, ename, sal)
, because we want to treat the same three columns as the same group, then once the sal
changes will create a new group.
From the results, we can see that Ravi got one raise, from 3000 to 7000.
Get changes per salary and include the result before salary increase
In the above two examples, we only use ROW_NUMBER()
, but in fact there are many other functions available in the window function.
This time, we not only want to get the salary change, but also want the query result to include the column before the salary increase, so we can use LAG()
.
SELECT
t.*
FROM (
SELECT
cdc_test.*,
(LAG(sal) OVER (PARTITION BY empid,
ename ORDER BY create_time)) AS prev_sal
FROM
cdc_test) t
WHERE
prev_sal IS NULL
OR prev_sal <> sal;
Similar to the above, except that another window function is used and the WHERE
clause is changed.
Now we can see a new column.
Reference
https://stackoverflow.com/questions/53729837/how-to-select-only-incremental-records-in-big-query
Top comments (0)