In this post, I will talk about some technologies that I used to build an open-source data lake for a project I worked on.
The main approach of this project was to use the minimun code effort to build a LakeHouse, to reach it I used hudi deltastreamer to the extract data from sources, dbt/spark to transform the data and Trino to query the Lakehouse and as a data store we use S3 but you can use MinIO to be "full" open source.
All the code for this project is at the following link:
What is a lakehouse?
The lakehouse architecture seeks to combine the benefits of both approaches by bringing together the data lake and data warehouse concepts into a unified platform. It enables organizations to have a single, cohesive system that provides: unified storage, schema enforcement, data processing frameworks, time travel and versioning.
The Hive Metastore stores and manages metadata information about Hive tables and their schemas, including the table names, column names, data types, partitioning information, and storage location. It acts as a catalog or directory for Hive, allowing users to query and analyze data without having to worry about the underlying storage details.
Apache Spark Thrift Server
The Thrift Server allows external applications to connect to a Spark cluster and submit SQL queries or execute Spark SQL statements programmatically. It acts as an intermediary layer between client applications and the Spark cluster, providing a standardized interface for query execution and result retrieval.
It's an open-source data management framework that provides efficient and scalable data storage and processing capabilities. It has key features like upserts and incremental inserts, data atomization, ACID transactions, schema evolution and query optimizations.
DBT (data build tool)
It is an open-source analytics engineering tool that helps analysts and data engineers transform, organize, and manage data in their data pipelines. It is designed to work with modern data warehouses and follows a "transform-first" approach, where data transformations are performed prior to loading data into a reporting or analytics environment.
It is an open-source distributed SQL query engine designed for high-performance interactive analytics. It was initially developed by Facebook to address their need for a fast and scalable query engine capable of processing large volumes of data.
- Transactional data is stored in PostgreSQL and pulled by the deltastreamer application (a built-in configurable application) that stores the data, which contains Hudi config files, in S3 and syncs metadata in Hive Metastore, which uses PostgreSQL as the backend database.
- Then, the data pulled is transformed using DBT, which contains the business logic that generates analytics data. This logic is sent to Spark, which applies the transformations needed to the data.
- Finally, Trino connects to the S3 and Hive Metastore to query the analytics data.
Under Infra folder there are some folder:
- Hive run.sh file that configure and runHive metastore and also connects it to PostgreSQL as backend. – Dockerfile used to create image.
- Spark – Jars contains all dependencies needed work with Hudi, PostgreSQL and S3. – start-spark.sh file that runs spark depending on workload (master, worker or thirft) – Dockerfile used to create image.
- Trino – hive.properties file used by Trino to connect hive/S3
To run the services run:
cd infra docker-compose up -d
First of all we need to create a database and an user to use them for all tis project
>psql -h localhost -p 5432 -U postgresuser -d metastore - passwd CREATE ROLE deuser SUPERUSER LOGIN PASSWORD 'depasswd'; CREATE DATABASE DEV_DATABASE; GRANT ALL PRIVILEGES ON DATABASE DEV_DATABASE TO deuser;
In this project we'll use a sample database northwind, to load it to posgrest let's follow this:
cd northwind_data psql -h localhost -p 5432 -U deuser -d dev_database -a -f full_data.sql - depasswd
And connect to Spark Thrift server by jdbc:hive2://localhost:1000 using any SQL client like DBeaver.
CREATE DATABASE IF NOT EXISTS transactional; CREATE DATABASE IF NOT EXISTS analytics;
To extract the data, we'll use the HoodieDeltaStreamer app contained in the hudi-utilities-slim jar. We have to run this jar using spark-submit in our Spark standalone cluster.
HoodieDeltaStreamer is easy to use, but the problem I had was the lack of documentation, and it took me some time to configure the right version to use with Spark, S3, PostgreSQL, and Hive Metastore.
Before expalain HoodieDeltaStreamer, there are three important concepts you need to know when you write data using the Hudi format:
- datasource.write.recordkey.field - Indicates the Primary Key which must be unique.
- datasource.write.partitionpath.field - Indicates the Partition Key which indicates how the data will be paritioned.
- datasource.write.precombine.field -Indicates the field that will determine which field will be used when some records have the same primary key (datasource.write.recordkey.field). Most of the time, you have to use a datetime/timestamp field that helps to know which record was recently written.
HoodieDeltaStreamer has many parameters you need to know to use it right. Depending on your needs, we'll use the following configurations:
- class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer hudi-utilities-slim-bundle_2.12–0.12.2.jar - we use slim version since it's recommendec by hudi docs.
- source-class org.apache.hudi.utilities.sources.JdbcSource - It's used because we are conecting with a DB by jdbc.
- target-base-path - Path were data is stracted form DB, it's a mandatory field.
- source-ordering-field - It indicates the precombine field (datasource.write.precombine.field)
- table-type - COW or MOR.
- hoodie-conf hoodie.datasource.hive_sync._ - Configs to sync with Hive metastore, the connection between Spark and Hive metastore is set up at- conf spark.hive.metastore.uris=thrift://hive-metastore:9083
- hoodie-conf hoodie.deltastreamer.jdbc._ - Configurations like user, password, driver, host, database and table to connect with the Database.
- hoodie-conf hoodie.deltastreamer.jdbc.incr.pull=TRUE - It is used to avoid pulling large amounts of data from the Database by indicating that it will only pull data that matches a condition. The first run using this configuration write a checkpoint at the file YYYYMMDDHHMMSS.commit with the name "deltastreamer.checkpoint.key".
- hoodie-conf hoodie.deltastreamer.jdbc.table.incr.column.name - This column will be used to match a condition to pull new data, like datasource.write.precombine.field. This column must have the format YYYY-MM-DD HH:mm:ss.SSS, similar to deltastreamer.checkpoint.key
On the next execution, behind the scene HoodieDeltaStreamer will execute this query to pull new data.
SELECT * FROM TABLE WHERE (hoodie.deltastreamer.jdbc.table.incr.column.name) > ( deltastreamer.checkpoint.key)
HoodieDeltaStreamer has a parameter( - checkpoint ) used to pull data from any point in time and behid the scene executes the query SELECT * FROM TABLE WHERE (hoodie.deltastreamer.jdbc.table.incr.column.name) > ( - checkpoint)
We have to connect to spark and execute to execute HoodieDeltaStreame app.
docker exec -ti spark-worker-a /bin/bash bash /opt/spark-apps/load_sources.sh
First of all, we need dbt installed on the machine where dbt will run; in this case, you can use your computer. I ran the following commands:
virtualenv -p /usr/bin/python3 venv sudo apt install python3.10-dev build-essential gcc sudo apt install libblas-dev libatlas-base-dev sudo apt install libssl-dev libffi-dev sudo apt install libxml2-dev libxslt1-dev sudo apt install libpq-dev libldap2-dev libsasl2-dev . venv/bin/activate pip install --upgrade pip pip install dbt-spark pip install dbt-spark[PyHive]
Then create a new project and place the lakehouse/dbt_project/northwind/models files in the folder you create by running.
dbt init northwind
We need to configure profiles.yaml file to connect dbt to spark, I have to set up analytics as schema, all tables created by dbt will be storaged inside this database that uses warehouse in … - conf spark.sql.warehouse.dir=s3a://$HUDI_S3_BUCKET/data/dbt
In the case proposed, we use soft deletes in the source database, so in the PostgreSQL database, every table has three columns to deal with deletion and updating: created_at, updated_at, and deleted_at.
- Created_at and updated_at fields are filled with the timestamp when the record is inserted.
- Updated_at is filled with the timestamp when any field of the record is updated.
- When a record is deleted, the field deleted_at and updated_at are filled by the timestamp of the deletion.
Transactional database - It stores replicated tables from postgreSQL, but in hudi format, tables contains created_at, updated_at and deleted_at fields.
- Staging tables, contains stg_* tables, they take tables form Transactional dabase as input. in this area soft deletes are no considerated.
- Core tables, contains dim_* and fact_* tables, taking staging tables as inputs.
- Analytic views, contains views used for reporting.
First of all, we need to deal with deleted records, so the prehook part takes only the deleted records from the transactional table, extracts their ids, and deletes them on the stg_* table. On the first run, this prehook won't be executed since the table doesn't exist yet.
Then, to update or insert new records, we extract only the new records from the transactional table using WHERE updated_at > (SELECT max(updated_at) FROM this ) since our dbt-model is incremental.
Finally, behind the scenes, Hudi takes primaryKey and precombineKey from the configuration part to insert or update the records using Spark/Hudi integration.
Depending on the business logic, you can use Hudi tables or Hive tables with parquet format. For example, in fact_sales, we need to store data by date because of query performance.
We use parquet and insert_overwrite by partitons since in this stage we have no need to deal with deletes or updates, we just need the last image of the records, that stg tables already have.
Sometimes you need an additional layer or just that a query to run it constantly; in this case, you should use a Hive view. To create it, you have to omit the config part in dbt-model. Behind the scenes, dbt-spark creates a Hive view.
We will use Trino to query Hudi tables, Hive tables and views. So we have configured the right Trino.
First, create a Trino catalog for Hive to connect with the Hive metastore. By default, Trino can query Hive tables, but we need to add hive.views-execution.enabled=true to enable querying Hive views.
Then, to query Hudi tables, we need to add hive.hudi-catalog-name=hudi, which redirects queries to Hudi tables to another Trino catalog.
Finally we create another trino catalog that has the configuration to query Hudi tables.
That's all!! Well, this project didn't have the need for a near-real-time syncronization between database servers and the lakehouse. A solution is to implement Debezium and Kafka, then configure Hudi DeltaStreamer to support it.