DEV Community

Cover image for Exploration of Spark Executor Memory
Lorenzo Lou
Lorenzo Lou

Posted on

Exploration of Spark Executor Memory

Overview

Hello everyone, today I would like to introduce a problem that we often encounter in our work: "Container killed by YARN for exceeding memory limits". What causes this problem? What is the difference between this problem and OOM? What is the relationship between this problem and the memory structure of Spark Executor? Today, let's explore these three questions.

First of all, a Spark cluster will start two types of JVM processes, the Driver and the Executor. The former is the main control process, responsible for creating the Spark context, submitting Spark jobs, and transforming jobs into compute tasks. It also coordinates task scheduling among the various Executor processes. The latter is responsible for executing specific compute tasks on worker nodes, returning results to the Driver, and providing storage functionality for RDDs that need to be persisted. Since the memory management of the Driver is relatively simple, this article mainly analyzes the memory management of the Executor. Hereafter, the term "Spark memory" refers specifically to the memory of the Executor. [Note that this article targets Spark version 2.x, deployed in YARN mode.]

the memory layout of the entire Executor side is shown in the following figure.

Image description

We can see that in the Yarn cluster management mode, Spark runs in the form of Executor Containers in NodeManager, with its maximum available memory limit specified by yarn.scheduler.maximum-allocation-mb, which we call MonitorMemory.

Executor Memory Strucutre On Yarn Overview

The Executor memory area on yarn is divided into two parts:

  1. JVM off-heap memory The size is specified by the spark.yarn.executor.memoryOverhead parameter, with a default size of executorMemory * 0.10, with a minimum of 384m. This memory is mainly used for JVM itself, strings, NIO Buffer (Direct Buffer), and other overheads. This part is for user code and Spark's non-operable memory, and can be adjusted by tuning parameters when it is insufficient.
  2. On-heap memory (Spark Executor Memory) The size is configured by the –executor-memory or spark.executor.memory parameter when the Spark application starts, which is the maximum heap memory allocated by the JVM (-Xmx). To more efficiently use this part of memory, Spark has logically partitioned and managed it. We will explain the unified memory management in detail below.

For Yarn clusters, there is a limit: ExecutorMemory + MemoryOverhead <= monitormemory. If the sum of ExecutorMemory and MemoryOverhead specified when the application is submitted is greater than MonitorMemory, the Executor will fail to be allocated. If the actual memory usage exceeds the upper limit threshold during runtime, the Yarn will terminate (kill) the Executor process.
After Spark 1.6, unified memory management was introduced, including two areas: on-heap memory and off-heap memory. The following is a detailed explanation of these two areas.
By default, Spark only uses on-heap memory. Spark manages on-heap memory in a logical "planning" manner. The on-heap memory area on the Executor side is logically divided into the following four areas:

Spark Executor Memory

Execution Memory

mainly used to store temporary data during calculations such as Shuffle, Join, Sort, and Aggregation.
Storage Memory: mainly used to store Spark cache data such as RDD caching, unroll data, and broadcast data.
User Memory: mainly used to store data needed for RDD transformation operations, such as RDD dependencies.
Reserved Memory: system reserved memory, which is used to store Spark internal objects [300MB].

Reserved Memory

The system reserves memory for storing Spark internal objects. Its size is hard-coded in the code and its value equals to 300MB, which cannot be modified (if in a testing environment, we can modify it using the spark.testing.reservedMemory parameter); if the memory allocated to an Executor is less than 1.5 * 300 = 450M, the Executor will not be able to run.

Storage Memory

Mainly used to store Spark cache data, such as RDD caching, broadcast data, and unroll data. The memory occupancy ratio is UsableMemory * spark.memory.fraction * spark.memory.storageFraction. In Spark 2+, by default, Storage Memory and Execution Memory each account for approximately 30% of the system's total memory (1 * 0.6 * 0.5 = 0.3). In Unified Memory management, these two types of memory can be borrowed from each other. We will discuss the specific borrowing mechanism in the next section.

Execution Memory

Mainly used to store temporary data during calculations such as Shuffle, Join, Sort, and Aggregation. The memory occupancy ratio is UsableMemory * spark.memory.fraction * (1 - spark.memory.storageFraction). In Spark 2+, by default, Storage Memory and Execution Memory each account for approximately 30% of the system's total memory (1 * 0.6 * (1 - 0.5) = 0.3). In Unified Memory management, these two types of memory can be borrowed from each other.

Other/User Memory

mainly used to store data needed for RDD transformation operations, such as RDD dependencies. The memory occupancy ratio is UsableMemory * (1 - spark.memory.fraction). In Spark 2+, it defaults to 40% of the available memory (1 * (1 - 0.6) = 0.4).

There are a few interesting points here that we can elaborate on:

  1. Why is 300MB reserved memory set? In the initial version of Unified Memory Management, the "Other" part of memory did not have a fixed value of 300MB, but instead used a percentage similar to static memory management, initially set to 25%. However, in practice, setting a low amount of memory (e.g. 1GB) led to OOM errors. This issue is discussed in detail here: "Make unified memory management work with small heaps". Therefore, the "Other" part of memory was modified to reserve 300MB of memory upfront.
  2. spark.memory.fraction decreased from 0.75 to 0.6 The initial value of spark.memory.fraction was set to 0.75, and many analyses of Unified Memory Management also described it as such. However, it was found that this value was set too high and resulted in long GC times. Spark 2.0 lowered the default value to 0.6. For more detailed discussion, see "Reduce spark.memory.fraction default to avoid overrunning old gen in JVM default config".
  3. Off-heap Memory Spark 1.6 introduced Off-heap memory (see SPARK-11389). In this mode, memory is not allocated within the JVM, but instead uses Java's unsafe API to directly request memory from the operating system, similar to C's malloc(). This allows Spark to directly access off-heap memory, reducing unnecessary memory overhead and frequent GC scans and collections, improving processing performance. Additionally, off-heap memory can be accurately allocated and released, and the space occupied by serialized data can be accurately calculated, making it easier to manage and reducing errors compared to on-heap memory. The downside is that one must write the logic for memory allocation and release themselves.

Task Memory Manager

Tasks in the Executor are executed as threads and share the JVM's resources (i.e., Execution memory). There is no strong isolation between the memory resources of tasks (i.e., tasks do not have a dedicated heap area). Therefore, it is possible for an earlier arriving task to occupy a large amount of memory, causing a later arriving task to be suspended due to insufficient memory.

In Spark's task memory management, a HashMap is used to store the mapping between tasks and their memory consumption. The amount of memory that each task can occupy is half to one over n of the potential available computing memory (potential available computing memory is the initial computing memory plus preemptive storage memory). When the remaining memory is less than half to one over n, the task will be suspended until other tasks release execution memory, and the memory lower limit of half to one over n is satisfied, and the task is awakened. Here, n is the number of active tasks in the current Executor.

For example, if the Execution memory size is 10GB and there are 5 running tasks in the current Executor, the range of memory that this task can apply for is from 10 / (2 * 5) to 10 / 5, which is 1GB to 2GB.

During task execution, if more memory is needed, it will be requested. If there is available memory, the request will be automatically successful; otherwise, an OutOfMemoryError will be thrown. The maximum number of tasks that can run simultaneously in each Executor is determined by the number of CPU cores N allocated by the Executor and the number of CPU cores C required by each task. Specifically:

N = spark.executor.cores
C = spark.task.cpus

Therefore, the maximum task parallelism of each Executor can be represented as TP = N / C. The value of C depends on the application type, and most applications can use the default value of 1. Thus, the main factor affecting the maximum task parallelism (i.e., the maximum number of active tasks) in the Executor is N.

Based on the memory usage characteristics of tasks, the Executor memory model described above can be simply abstracted as the following diagram:

Image description

Example

To better understand the use of on-heap and off-heap memory mentioned above, here is a simple example.
3.1. Only On-Heap Memory Used
We have submitted a Spark job with the following memory configuration:

--executor-memory 18g.

As we have not set the


 and

 ```spark.memory.storageFraction```

 parameters, we can see from the Spark UI that the available Storage Memory is displayed as follows:


![Image description](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/xvszf7yqelvosdt6nvbg.png)



From the image above, we can see that the available Storage Memory is 10.1GB. How was this number obtained? According to the previous rule, we can perform the following calculations:



Enter fullscreen mode Exit fullscreen mode

systemMemory = spark.executor.memory
reservedMemory = 300MB
usableMemory = systemMemory - reservedMemory
StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction




If we substitute the values, we get the following:


Enter fullscreen mode Exit fullscreen mode

systemMemory = 18Gb = 19327352832 bytes
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 19327352832 - 314572800 = 19012780032
StorageMemory = usableMemory * spark.memory.fraction * spark.memory.storageFraction = 19012780032 * 0.6 * 0.5 = 5703834009.6 = 5.312109375GB



The value of 10.1GB displayed on the Spark UI does not match the value we obtained. Why is that? This is because the available Storage Memory displayed on the Spark UI is actually the sum of the Execution Memory and Storage Memory, i.e., usableMemory * spark.memory.fraction:


Enter fullscreen mode Exit fullscreen mode

StorageMemory = usableMemory * spark.memory.fraction = 19012780032 * 0.6 = 11407668019.2 = 10.62421GB



This value is also incorrect because even though we have set --executor-memory 18g, the memory available to Spark's Executor is not as large as this. It is only 17179869184 bytes, which is obtained from Runtime.getRuntime.maxMemory. Therefore, we can perform the following calculations:


Enter fullscreen mode Exit fullscreen mode

systemMemory = 17179869184 bytes
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
StorageMemory= usableMemory * spark.memory.fraction = 16865296384 * 0.6 = 9.42421875 GB



When we convert 16865296384 * 0.6 bytes to GB by dividing it by 1024 * 1024 * 1024, we get 9.42421875 GB, which is still different from the value displayed on the UI. This is because the Spark UI converts bytes to GB by dividing them by 1000 * 1000 * 1000, as shown below:


Enter fullscreen mode Exit fullscreen mode

systemMemory = 17179869184 bytes
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
StorageMemory = usableMemory * spark.memory.fraction = 16865296384 * 0.6 bytes = 16865296384 * 0.6 / (1000 * 1000 * 1000) = 10.1GB



Now, the value matches. 

We have set --executor-memory to 18g, but the memory that Spark's executor side can get through Runtime.getRuntime.maxMemory is actually not that big, it's only 17179869184 bytes. How is this data calculated?

Runtime.getRuntime.maxMemory is the maximum memory that the program can use, and its value is smaller than the actual configured executor memory value. This is because the heap portion of the memory allocation pool is divided into three parts: Eden, Survivor, and Tenured. There are two Survivor areas in these three parts, and we can only use one of them at any time. Therefore, we can use the following formula to describe it:


Enter fullscreen mode Exit fullscreen mode
  1. ExecutorMemory = Eden + 2 * Survivor + Tenured
  2. Runtime.getRuntime.maxMemory = Eden + Survivor + Tenured ```

The value of 17179869184 bytes above may be different depending on your GC configuration, but the calculation formula above is the same.

Using Heap and Off-Heap Memory

Now, what if we enable off-heap memory? Our memory configurations are as follows:

spark.executor.memory 18g
spark.memory.offHeap.enabled true
spark.memory.offHeap.size 10737418240
Enter fullscreen mode Exit fullscreen mode

From the above, we can see that the off-heap memory is 10GB. Now, the available Storage Memory shown on Spark UI is 20.9GB, as follows:

Image description

Actually, the available Storage Memory shown on Spark UI is the sum of heap memory and off-heap memory. The calculation formula is as follows:

  • Heap
systemMemory = 17179869184 bytes
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
totalOnHeapStorageMemory = usableMemory * spark.memory.fraction = 16865296384 * 0.6 = 10119177830
Enter fullscreen mode Exit fullscreen mode
  • Off-Heap
totalOffHeapStorageMemory = spark.memory.offHeap.size = 10737418240
Enter fullscreen mode Exit fullscreen mode
  • Total Storage Memory
StorageMemory = totalOnHeapStorageMemory + totalOffHeapStorageMemory = (10119177830 + 10737418240) bytes = (20856596070 / (1000 * 1000 * 1000)) GB = 20.9 GB
Enter fullscreen mode Exit fullscreen mode

By now, we have shown you a clear picture of the memory architecture and management logic of Spark executor. Going back to the first two questions we mentioned earlier, when the executor serializes data, all memory management is actually not handled by the JVM. Therefore, continuous requests can lead to the YARN container exceeding the memory limit and being killed by YARN. And when Spark encounters OOM due to data skew issues, the OOM actually occurs inside the JVM. As a user, we can easily determine the direct cause of our problem based on the different error messages of these two OOMs, whether it is caused by the overhead memory area or the insufficient no-heap JVM memory, and thus correctly deduce the direction we need to optimize.

Top comments (0)