DEV Community

Cover image for S3 Committers and EMRFS in AWS EMR’s Big Data Processing using Spark
Victor
Victor

Posted on

S3 Committers and EMRFS in AWS EMR’s Big Data Processing using Spark

The landscape of big data and cloud computing is rapidly evolving. As the technical challenges grow, solutions like the Magic Committer and EMRFS emerge, pushing the boundaries of what’s possible.

These technologies have proven themselves when the issue regards Data Writing. This issue is constantly present in the Big Data environment, where the volume of data processing and read/writing tends to get out of hand if the correct configurations and settings are not used.

This post is made to explore committers and to help you optimize your data writing to Amazon S3.

Committers

In Hadoop, the FileOutputFormatCommitter is responsible for managing the process of promoting files created during a task attempt to the final query output. It handles failures of tasks and jobs and supports speculative execution by listing directories and renaming their content to the final destination during the commit phase.

To overcome challenges and optimize the process of committing work to Amazon S3, the hadoop-aws module now includes explicit support for S3 through the S3A filesystem client.

What Are S3A Committers?

The S3A Committers are explicitly designed to ensure safety and high performance when outputting work to S3.

Issues with Spark's Standard Output Committer

The standard FileOutputCommitter in Hadoop can be slow and unreliable when used with certain distributed file systems, including Amazon S3. There are a few reasons for this:

  • Consistency and Atomicity: The FileOutputCommitter relies on file system semantics that assumes atomic renames and consistent directory listings. However, Amazon S3, an object storage system, does not provide strong consistency guarantees for listing files and renaming operations. This can lead to race conditions and potential data inconsistencies when multiple tasks or jobs try to write output files simultaneously.

  • Multi-part Uploads: The FileOutputCommitter writes data to temporary directories and performs a final renaming operation to move the files to the output directory. However, Amazon S3 supports multi-part uploads for large files, and the rename operation involves copying the data between directories. This copying process can be slow and resource-intensive, especially for large datasets.

  • Retries and Failures: In distributed systems, failures can occur at various levels, such as network issues, task failures, or job failures. The FileOutputCommitter may not handle these failures gracefully, leading to incomplete or inconsistent output. It may require manual intervention or additional recovery mechanisms to recover from failures and ensure data integrity.

To address these limitations, explicit support for committing work to Amazon S3 was introduced through the S3A committers. These committers are specifically designed to leverage the unique features of S3, such as multi-part uploads and eventual consistency, to provide more reliable and high-performance output operations when working with S3 as an object store.

Types of S3A Committers

The S3A Committers are transformative tools designed to optimize the process of writing large datasets directly to S3.

  • DirectoryCommitter: The DirectoryCommitter is the default S3A committer used for most scenarios. It behaves by writing data to temporary task-specific directories and then commits them to the final output directory upon job completion. Benefits: This committer offers good performance and reliability while handling task failures and speculative execution.

  • MagicS3GuardCommitter: The MagicS3GuardCommitter extends the DirectoryCommitter and adds S3Guard metadata consistency support. It uses the S3Guard feature to provide stronger consistency guarantees during file listing and rename operations. Benefits: This committer is helpful when strong metadata consistency is required, preventing inconsistencies that may arise due to S3’s eventual consistency.

  • PartitionedCommitter: The PartitionedCommitter is designed for partitioned data storage, commonly used in columnar file formats like Apache Parquet. It behaves by writing data to temporary task-specific directories based on the partition columns and commits them to the final output directories upon job completion. Benefits: This committer optimizes output operations for partitioned datasets, improving performance and enabling efficient data organization in S3.

Quick Setup for S3A Committers

Starting from Spark version 2.2.0, you can set the fs configurations, such as spark.hadoop.fs.s3a.committer.name, using the SparkConf object at runtime. This allows you to customize and configure the behavior of the S3A committer directly from your Spark application code.

Before Spark 2.2.0, setting the fs configurations dynamically at runtime was not directly supported, and you would need to rely on other methods, such as specifying the configurations in the Spark spark-defaults.conf configuration file or through command-line arguments when launching your Spark application.

However, in Spark 2.2.0 and later versions, you can programmatically set these fs configurations using the SparkConf object within your Spark application, providing more flexibility and control over the S3A committer behavior during runtime execution.

Here are some examples of how to quickly enable the magic committer. For instance, with Spark we would do:

Examples

Here are some examples of how to quickly enable the magic committer.

Spark Example:

bash
spark-submit --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 --conf spark.hadoop.fs.s3a.committer.name=magic --conf spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled=true \
--class YourMainClass \
your_application.jar
Enter fullscreen mode Exit fullscreen mode

PySpark Example:

python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("YourApplication") \
.config("spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled","true") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
.config("spark.hadoop.fs.s3a.committer.name","magic") \
.getOrCreate()
Enter fullscreen mode Exit fullscreen mode

Scala Example:

scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
.appName("YourApplication")
.config("spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled","true")
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.hadoop.fs.s3a.committer.name","magic")
.getOrCreate()
Enter fullscreen mode Exit fullscreen mode

The S3A committers’ issue with ORC files

While S3A committers are designed to address the challenges of writing data to Amazon S3, there are a few considerations and limitations when using them with the ORC (Optimized Row Columnar) data format:

  • Transactional Support: By default, ORC does not provide built-in transactional support. This means that the ORC format may not fully leverage those features when using S3A committers, which offer transactional capabilities. Transactional support typically ensures atomic and consistent data commits, which may be important in specific scenarios requiring ACID properties.

  • Consistency Guarantees: S3A committers provide mechanisms for stronger metadata consistency, such as using S3Guard. However, the ORC format itself may not fully exploit these consistency guarantees. This can lead to inconsistencies or issues when reading or accessing ORC data written using S3A committers.

  • Performance Considerations: While S3A committers can improve performance for output operations, the specific performance benefits may vary depending on data size, network conditions, and workload characteristics. As a columnar data format, ORC already offers efficient compression and predicate pushdown capabilities, which can contribute to performance gains. It’s important to carefully evaluate the overall performance impact when combining S3A committers with ORC.

Key Considerations

  • Transactional Support: ORC does not provide built-in transactional support, limiting its compatibility with S3A committers.

  • Consistency Guarantees: ORC may not fully exploit the stronger metadata consistency provided by S3A committers.

  • Performance Considerations: The specific performance benefits may vary and should be carefully evaluated.

Duplicated Data Appended Instead of Overwriting with S3A Committers and ORC Data Format

When utilizing the S3A committers in conjunction with the ORC (Optimized Row Columnar) data format for data processing, it has been observed that instead of overwriting or replacing the existing data, the Spark job appends the processed data. As a result, duplicate records are created, leading to data redundancy and potential issues with data consistency.

The root cause of this issue lies in the interaction between the S3A committers and the behavior of the ORC data format. The S3A committers are designed to perform efficient and reliable output operations to Amazon S3, leveraging features such as multi-part uploads and eventual consistency. However, the ORC format itself does not inherently provide mechanisms for transactional support or overwrite operations. Therefore, when using S3A committers, the default behavior of ORC is to append the new data rather than replace the existing data.

The presence of duplicate data can have several adverse consequences, including increased storage costs, degraded query performance, and inconsistencies in analytical results. Over time, the accumulation of duplicate records can significantly impact data quality and subsequent data processing tasks.

In the present POC and Use Case, to circumvent the aforementioned issue, EMRFS was the solution of choice to go ahead.

Note: The same spark job/query was executed with Parquet files, and no significant issue was found.

To circumvent this issue in the present POC and Use Case, EMRFS was the solution of choice to go ahead.

Note: The same spark job/query was executed with Parquet files, and no significant issue was found.

EMRFS is an Amazon S3-compatible file system that is used with Amazon EMR.

It enables high-performance, scalable data processing using the Hadoop ecosystem’s wide range of tools, including MapReduce, Apache Spark, and Apache Hive. By leveraging the robustness and scalability of Amazon S3, EMRFS provides benefits like data durability and elasticity.

EMRFS treats Amazon S3 as an extension of the Hadoop File System, making it possible to use Amazon S3 as a data store for Hadoop while providing features like EMRFS consistent view, S3 server-side and client-side encryption, and Amazon S3 authorization.

EMRFS S3-optimized committer

The EMRFS S3-optimized committer is an output committer available on EMR versions 5.19.0 and later. It optimizes the commit phase of jobs and reduces the amount of data written to Amazon S3, making it a suitable option for big data jobs where output is written to S3.

Traditional Spark and Hadoop jobs suffer from the “eventual consistency” model of S3, which can cause issues when files are read immediately after being written. The EMRFS S3-optimized committer solves this by using a different mechanism for writing and committing output data, reducing the impact of S3’s eventual consistency.

EMRFS direct write

EMRFS direct write, introduced in Amazon EMR 6.1.0, is another enhancement to overcome the limitations of writing to S3 from Spark and Hadoop. Rather than writing data to HDFS and then copying it to S3 as the final step (which can introduce delays and use unnecessary cluster resources), direct write writes data directly to S3 during shuffle and spill operations.

By writing intermediate data directly to S3, direct write can help avoid data locality issues, reduce network traffic, and increase overall job performance. However, it also introduces potential considerations due to S3’s eventual consistency model, so EMRFS consistent view or the EMRFS S3-optimized committer may be beneficial when using this feature.

In summary, EMRFS, the S3-optimized committer, and direct write are all tools provided by Amazon EMR to enable efficient, high-performance processing of big data workloads with Amazon S3 as the data store. Users can choose the most suitable tools and configurations based on the nature and requirements of their specific workloads.

EMRFS direct write is also recommended when the output format is other than Parquet, such as CSV, ORC, etc.

How to enable EMRFS

EMRFS (Elastic MapReduce File System) is integrated by default when you create an Amazon EMR cluster and can be utilized by specifying an Amazon S3 path as a part of your job flow.

You can leverage the features of EMRFS, like Consistent View and EMRFS Metadata, during the cluster creation process via the AWS Management Console, the AWS Command Line Interface (CLI), or the SDKs. These features are not enabled by default, so you must explicitly turn them on.

Here are the steps to enable EMRFS with consistent view in the AWS Management Console:

  1. In the EMR AWS Management Console, choose “Create cluster”.
  2. In the “Create Cluster — Quick Options” pane, choose “Go to advanced options”.
  3. Choose your software and steps as needed in the “Software and Steps” section.
  4. Scroll to the “File System configurations” section and click “Add S3 bucket”.
  5. Specify the S3 bucket’s path.
  6. In the “Consistency” section, check the “Consistent” box.
  7. In the “Retry count” field, enter the number of times the EMRFS should retry any failed S3 requests.
  8. Click on “Create cluster”.

Enabling it through the AWS CLI or SDK involves specifying additional arguments and parameters, specifically — emrfs followed by Consistent=True for EMRFS consistent view and Retries=n for retry count.

AWS CLI Example

bash
aws emr create-cluster --release-label emr-6.0.0 --applications Name=Hadoop Name=Hive Name=Test --emrfs Consistent=True,RetryCount=5
Enter fullscreen mode Exit fullscreen mode

Note on IAM Roles and Permissions

Remember to replace emr-6.0.0 with your desired EMR version and 5 with your preferred number of retries. Add more applications as per your requirements.

Note: Please ensure you have the necessary IAM roles and permissions for Amazon EMR to access your specific S3 buckets. The default service role, EMR_DefaultRole, usually provides sufficient permissions. If you require a custom role, you must configure that before setting up the cluster.

Performance Improvements

In the use case below, there was an average of 70% reduction in processing time for a specific Spark job without using EMRFS or any optimized S3A committer.

Specific spark job without EMRFS or any optimized S3A committer (this job was still RUNNING and finished in around 9 hours elapsed time):

Image description

Same specific spark job with EMRFS enabled:

Image description

Note:These are the same Spark jobs by definition; in this case, a scala code reads a Spark-SQL query (hybrid.sql) and executes it, hence the name of the job.

Conclusion

In conclusion, integrating the S3 EMR Magic Committer or EMRFS has substantially revolutionized our data processing operations. The resulting 70% performance gain isn’t a minor feat but an absolute game-changer. We’ve seen an undeniable acceleration in processing speed and a considerable reduction in execution time, paving the way for our teams to accomplish more in less time without sacrificing the quality of our work.

The S3 EMR Magic Committer or EMRFS aren’t just tools; they’re productivity enhancers. They’re evidence of what’s possible when leveraging the correct settings to improve the writing capabilities of a Spark job; they can be the difference between saving a worthwhile penny as well when time is money in the Cloud environment.

Top comments (0)