DEV Community

Michael Martin
Michael Martin

Posted on

Build an Open Source LakeHouse with minimun code effort (Spark + Hudi + DBT+ Hivemetastore + Trino)

Post from https://medium.com/@michael.mt66/open-source-lakehouse-18d23f5276ba

In this post, I will talk about some technologies that I used to build an open-source data lake for a project I worked on.
The main approach of this project was to use the minimun code effort to build a LakeHouse, to reach it I used hudi deltastreamer to the extract data from sources, dbt/spark to transform the data and Trino to query the Lakehouse and as a data store we use S3 but you can use MinIO to be "full" open source.
All the code for this project is at the following link:

What is a lakehouse?
The lakehouse architecture seeks to combine the benefits of both approaches by bringing together the data lake and data warehouse concepts into a unified platform. It enables organizations to have a single, cohesive system that provides: unified storage, schema enforcement, data processing frameworks, time travel and versioning.

Hive Metastore
The Hive Metastore stores and manages metadata information about Hive tables and their schemas, including the table names, column names, data types, partitioning information, and storage location. It acts as a catalog or directory for Hive, allowing users to query and analyze data without having to worry about the underlying storage details.

Apache Spark Thrift Server
The Thrift Server allows external applications to connect to a Spark cluster and submit SQL queries or execute Spark SQL statements programmatically. It acts as an intermediary layer between client applications and the Spark cluster, providing a standardized interface for query execution and result retrieval.

Apache Hudi
It's an open-source data management framework that provides efficient and scalable data storage and processing capabilities. It has key features like upserts and incremental inserts, data atomization, ACID transactions, schema evolution and query optimizations.

DBT (data build tool)
It is an open-source analytics engineering tool that helps analysts and data engineers transform, organize, and manage data in their data pipelines. It is designed to work with modern data warehouses and follows a "transform-first" approach, where data transformations are performed prior to loading data into a reporting or analytics environment.

Apache Trino
It is an open-source distributed SQL query engine designed for high-performance interactive analytics. It was initially developed by Facebook to address their need for a fast and scalable query engine capable of processing large volumes of data.

Architecture

  1. Transactional data is stored in PostgreSQL and pulled by the deltastreamer application (a built-in configurable application) that stores the data, which contains Hudi config files, in S3 and syncs metadata in Hive Metastore, which uses PostgreSQL as the backend database.
  2. Then, the data pulled is transformed using DBT, which contains the business logic that generates analytics data. This logic is sent to Spark, which applies the transformations needed to the data.
  3. Finally, Trino connects to the S3 and Hive Metastore to query the analytics data.

Image description

Infraestructure

Under Infra folder there are some folder:

  • Hive run.sh file that configure and runHive metastore and also connects it to PostgreSQL as backend. – Dockerfile used to create image.
  • Spark  – Jars contains all dependencies needed work with Hudi, PostgreSQL and S3. – start-spark.sh file that runs spark depending on workload (master, worker or thirft) – Dockerfile used to create image.
  • Trino – hive.properties file used by Trino to connect hive/S3

To run the services run:

cd infra
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Data

First of all we need to create a database and an user to use them for all tis project

>psql -h localhost -p 5432 -U postgresuser -d metastore
 - passwd
CREATE ROLE deuser SUPERUSER LOGIN PASSWORD 'depasswd';
CREATE DATABASE DEV_DATABASE;
GRANT ALL PRIVILEGES ON DATABASE DEV_DATABASE TO deuser;
Enter fullscreen mode Exit fullscreen mode

In this project we'll use a sample database northwind, to load it to posgrest let's follow this:

cd northwind_data
psql -h localhost -p 5432 -U deuser -d dev_database -a -f full_data.sql
 - depasswd
Enter fullscreen mode Exit fullscreen mode

And connect to Spark Thrift server by jdbc:hive2://localhost:1000 using any SQL client like DBeaver.

CREATE DATABASE IF NOT EXISTS transactional;
CREATE DATABASE IF NOT EXISTS analytics;
Enter fullscreen mode Exit fullscreen mode

Extract

To extract the data, we'll use the HoodieDeltaStreamer app contained in the hudi-utilities-slim jar. We have to run this jar using spark-submit in our Spark standalone cluster.

HoodieDeltaStreamer is easy to use, but the problem I had was the lack of documentation, and it took me some time to configure the right version to use with Spark, S3, PostgreSQL, and Hive Metastore.

Before expalain HoodieDeltaStreamer, there are three important concepts you need to know when you write data using the Hudi format:

  • datasource.write.recordkey.field  - Indicates the Primary Key which must be unique.
  • datasource.write.partitionpath.field  - Indicates the Partition Key which indicates how the data will be paritioned.
  • datasource.write.precombine.field -Indicates the field that will determine which field will be used when some records have the same primary key (datasource.write.recordkey.field). Most of the time, you have to use a datetime/timestamp field that helps to know which record was recently written.

HoodieDeltaStreamer has many parameters you need to know to use it right. Depending on your needs, we'll use the following configurations:

  • class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer hudi-utilities-slim-bundle_2.12–0.12.2.jar - we use slim version since it's recommendec by hudi docs.
  • source-class org.apache.hudi.utilities.sources.JdbcSource  - It's used because we are conecting with a DB by jdbc.
  • target-base-path - Path were data is stracted form DB, it's a mandatory field.
  • source-ordering-field  - It indicates the precombine field (datasource.write.precombine.field)
  • table-type - COW or MOR.
  • hoodie-conf hoodie.datasource.hive_sync._ - Configs to sync with Hive metastore, the connection between Spark and Hive metastore is set up at- conf spark.hive.metastore.uris=thrift://hive-metastore:9083
  • hoodie-conf hoodie.deltastreamer.jdbc._  - Configurations like user, password, driver, host, database and table to connect with the Database.
  • hoodie-conf hoodie.deltastreamer.jdbc.incr.pull=TRUE - It is used to avoid pulling large amounts of data from the Database by indicating that it will only pull data that matches a condition. The first run using this configuration write a checkpoint at the file YYYYMMDDHHMMSS.commit with the name "deltastreamer.checkpoint.key".
  • hoodie-conf hoodie.deltastreamer.jdbc.table.incr.column.name  - This column will be used to match a condition to pull new data, like datasource.write.precombine.field. This column must have the format YYYY-MM-DD HH:mm:ss.SSS, similar to deltastreamer.checkpoint.key

On the next execution, behind the scene HoodieDeltaStreamer will execute this query to pull new data.

SELECT * FROM TABLE WHERE (hoodie.deltastreamer.jdbc.table.incr.column.name) > ( deltastreamer.checkpoint.key)
Enter fullscreen mode Exit fullscreen mode

HoodieDeltaStreamer has a parameter( - checkpoint ) used to pull data from any point in time and behid the scene executes the query SELECT * FROM TABLE WHERE (hoodie.deltastreamer.jdbc.table.incr.column.name) > ( - checkpoint)

We have to connect to spark and execute to execute HoodieDeltaStreame app.

docker exec -ti spark-worker-a /bin/bash
bash /opt/spark-apps/load_sources.sh
Enter fullscreen mode Exit fullscreen mode

Transform

First of all, we need dbt installed on the machine where dbt will run; in this case, you can use your computer. I ran the following commands:

virtualenv -p /usr/bin/python3 venv

sudo apt install python3.10-dev build-essential gcc
sudo apt install libblas-dev libatlas-base-dev
sudo apt install libssl-dev libffi-dev
sudo apt install libxml2-dev libxslt1-dev
sudo apt install libpq-dev  libldap2-dev libsasl2-dev

. venv/bin/activate

pip install --upgrade pip
pip install dbt-spark
pip install dbt-spark[PyHive]
Enter fullscreen mode Exit fullscreen mode

Then create a new project and place the lakehouse/dbt_project/northwind/models files in the folder you create by running.

dbt init northwind
Enter fullscreen mode Exit fullscreen mode

We need to configure profiles.yaml file to connect dbt to spark, I have to set up analytics as schema, all tables created by dbt will be storaged inside this database that uses warehouse in …  - conf spark.sql.warehouse.dir=s3a://$HUDI_S3_BUCKET/data/dbt

Databases and layers

In the case proposed, we use soft deletes in the source database, so in the PostgreSQL database, every table has three columns to deal with deletion and updating: created_at, updated_at, and deleted_at.

  • Created_at and updated_at fields are filled with the timestamp when the record is inserted.
  • Updated_at is filled with the timestamp when any field of the record is updated.
  • When a record is deleted, the field deleted_at and updated_at are filled by the timestamp of the deletion.

Transactional database  - It stores replicated tables from postgreSQL, but in hudi format, tables contains created_at, updated_at and deleted_at fields.

Analytics database

  • Staging tables, contains stg_* tables, they take tables form Transactional dabase as input. in this area soft deletes are no considerated.
  • Core tables, contains dim_* and fact_* tables, taking staging tables as inputs.
  • Analytic views, contains views used for reporting.

STG Tables

First of all, we need to deal with deleted records, so the prehook part takes only the deleted records from the transactional table, extracts their ids, and deletes them on the stg_* table. On the first run, this prehook won't be executed since the table doesn't exist yet.

Then, to update or insert new records, we extract only the new records from the transactional table using WHERE updated_at > (SELECT max(updated_at) FROM this ) since our dbt-model is incremental.

Finally, behind the scenes, Hudi takes primaryKey and precombineKey from the configuration part to insert or update the records using Spark/Hudi integration.

Fact and Dim tables

Depending on the business logic, you can use Hudi tables or Hive tables with parquet format. For example, in fact_sales, we need to store data by date because of query performance.

We use parquet and insert_overwrite by partitons since in this stage we have no need to deal with deletes or updates, we just need the last image of the records, that stg tables already have.

Views

Sometimes you need an additional layer or just that a query to run it constantly; in this case, you should use a Hive view. To create it, you have to omit the config part in dbt-model. Behind the scenes, dbt-spark creates a Hive view.

Querying

We will use Trino to query Hudi tables, Hive tables and views. So we have configured the right Trino.

First, create a Trino catalog for Hive to connect with the Hive metastore. By default, Trino can query Hive tables, but we need to add hive.views-execution.enabled=true to enable querying Hive views.

Then, to query Hudi tables, we need to add hive.hudi-catalog-name=hudi, which redirects queries to Hudi tables to another Trino catalog.

Finally we create another trino catalog that has the configuration to query Hudi tables.

Next steps:

That's all!! Well, this project didn't have the need for a near-real-time syncronization between database servers and the lakehouse. A solution is to implement Debezium and Kafka, then configure Hudi DeltaStreamer to support it.

References:

https://hudi.apache.org/
https://trino.io/docs/current/
https://www.getdbt.com/
https://cwiki.apache.org/confluence/display/hive/design

Top comments (1)

Collapse
 
albertatstarrocks profile image
Albert Wong

By the way, your blog post was #2 in google searching for "hudi minio example hms"