A brief introduction on Hudi
Apache Hudi simplifies insert, update, delete operations at a record level on files stored in distributed systems like HDFS or at the cloud such as S3. Additionally, it provides schema evolution, change stream extraction and file storage optimization functionality. Spark can be used for processing data into Hudi datasets which can be queried afterwards as registered tables. Hudi is included when Spark,Hive or Presto is installed from EMR releases 5.28.0 and later.
Architecture Diagram
Input files are used to perform record level Update/Insert/Delete operations on existing or new Hudi datasets/tables. Due to the integration of Hudi with AWS Glue Data Catalog these Hudi tables become available for consumption in Amazon Athena, Redshift & other EMR processes that may follow in later stages of the pipeline.
Upserts/Inserts
Update and even insert operations on S3 files previously was not a simple task. These operations would require:
Extracting the data: The files/records that are to be operated on would need to be extracted. That could be done by either querying an existing Athena table or by identifying & extracting the affected files containing those records.
Exporting the result: When operations occur in a specific partition then updating/overwriting the partition files will not be an issue but what happens when the updates occur across multiple partitions or parts of the partition? Multiple partitions/files would have to be overwritten/updated with changed and unchanged records (in case of part updates)
Updating the metadata: New partitions would require an update on the AWS Glue Data Catalog in order to be queryable.
Hudi handles all those processes for us by operating on record keys and indexes when working with hudi datasets. Hudi contains three write modes.
i) Upsert: Input records will update the existing table based on index. The table will not contain any duplicates (default mode).
ii) Insert: Input records are inserted into the table with no index lookup. This operation is a lot faster but it may produce duplicates.
iii) Bulk_Insert: Similar operation like insert, to be used with inserting large loads especially for initial loads.
Schema Evolution
A useful feature of hudi is its ability to create tables based on the schema of input files and accommodate changes automatically. Specifically, if a new file comes in with a different schema (compared to the table) depending on the differences (new columns or less columns) will either expand or shrink the table schema. The columns that are removed from the schema are hidden until they re-appear on an incoming file.
There are some cases where we would only want to extend the table schema and not reduce it , that can be resolved by combining the schema of the existing table and the input file before exporting it as a Hudi dataset.
Storage Types
CoW or Copy On Write, a columnar based format, is the default storage type and is preferred for ready-heavy use cases with less frequent updates.
MoR or Merge on Read, a combination of row and columnar based formats and is preferred for write-heavy use cases with frequent updates.
Limitations
Some of the limitations of Hudi are caused by its storage format. Empty arrays can not be handled and they have to be nullified before inserted. Also empty spaces in column names must be replaced as well. On partitioned data, schema changes (specifically new columns) can not be introduced when working on existing partitions but on new ones, afterwards existing partitions can be modified to accommodate the new column(s).
An alternative way to configure an EMR Notebook for Hudi
An alternative way to use Hudi than connecting into the master node and executing the commands specified on the AWS docs is to submit a step containing those commands.
First create a shell file with the following commands & upload it into a S3 Bucket.
Then through the EMR UI add a custom Jar step with the S3 path as an argument. It is advised that the latest EMR Release is used for leveraging the latest Hudi version and Hive & Table metadata are checked as Catalogs. The Jar location s3://{your_region}.elasticmapreduce/libs/script-runner/script-runner.jar must be configured with the region where the EMR cluster is located.
The last step would be to execute the configuration code from the notebook editor.
%%configure
{ "conf": {
"spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet":"false"
}}
For using Hudi with Spark Submit or Spark shell the process is simpler, the parameters contained in the official guide will be sufficient.
Hands on example
Table initialization:
The following file will be used to populate and initialize a Hudi Table.
The configurations for the file to be ingested into Hudi are as follows:
table_name = 'hudi_table'
hudiOptions = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'transaction_id',
'hoodie.datasource.write.partitionpath.field': 'batch',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.write.precombine.field': 'update_ts',
'hoodie.datasource.hive_sync.table':table_name,
'hoodie.datasource.hive_sync.partition_fields':'batch',
'hoodie.datasource.hive_sync.database':database_name,
'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}
inputDF.write\
.format('org.apache.hudi')\
.option('hoodie.datasource.write.operation', 'upsert')\
.options(**hudiOptions)\
.mode('append')\
.save('s3://'+bucket_name+'/'+table_name)
Record key field: Transaction_id will be set as the unique identifier of the table (combined with the partition field).
Partition path field: the field used as a partition, in this example batch.
Precombine field: is set to update_ts this means for duplicate record keys the one with the largest precombine value will be chosen. The dataset we are using contains a duplicate transaction id of 100 with update_ts 20210830123753 and 20210830123757.
The rest of the configurations The table name & database for synchronization with the hive metastore.
A corresponding table has been created in Athena & a folder for the partition in the specified S3 directory.
Updating Records:
After some internal processing the status needs to be changed to โprocessedโ for the transaction_id 101.
Using the same configurations the new dataset is exported to the existing Hudi table.
inputDF.write\
.format('org.apache.hudi')\
.option('hoodie.datasource.write.operation', 'upsert')\
.options(**hudiOptions)\
.mode('append')\
.save('s3://'+bucket_name+'/'+table_name)
The _hoodie_commit_time along with the updated columns has changed as well. This will be used to identify changes in the next step.
Incremental Querying:
Based on the previous update, we could identify changes by their commit time. The latest commit was inserted at 20210830114805 (_hoodie_commit_time). By setting the instant time config by one second earlier that commit will be extracted.
readOptions = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': 20210830114804,
}
incrementalQuery = spark.read\
.format('org.apache.hudi')\
.options(**readOptions) \
.load('s3://'+bucket_name+'/'+table_name)
Handling Schema changes:
A new batch of transactions has arrived with an additional column named verified.
The new column will be added to the schema of the Hudi table automatically.
Top comments (1)
Hello. Do you have AWS Glue as catalog enabled?
I have such issue stackoverflow.com/questions/694849...