DEV Community

Kevin Wallimann
Kevin Wallimann

Posted on

How to recover from a deleted _spark_metadata folder in Spark Structured Streaming

Warning: The described procedures have been tested on Spark 2.4.3 and 3.0.1, but otherwise not on all possible environments. Be mindful of what you're doing on your system. Having said that, I'd be grateful for any feedback if you find caveats.

Introduction

Spark Structured Streaming guarantees exactly-once processing for file outputs. One element to maintain that guarantee is a folder called _spark_metadata which is located in the output folder. The folder _spark_metadata is also known as the "Metadata Log" and its files "Metadata log files". It may look like this:

/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
Enter fullscreen mode Exit fullscreen mode

A metadata log file may look like this:

v1
{"path":"file:///tmp/destination/part-00000-5ee05bb5-3c65-4028-9c9e-dbc99f5fdbca.c000.snappy.parquet","size":3919,"isDir":false,"modificationTime":1615462080000,"blockReplication":1,"blockSize":33554432,"action":"add"}
Enter fullscreen mode Exit fullscreen mode

When Spark writes a file to the output folder, it writes the absolute path of the added file to the metadata log file of the current micro-batch.

If a partial write occurs, that filename will not be added to the metadata log, and that's how Spark can maintain exactly-once semantics.

When Spark reads a file from the output folder, it only reads from files that are referenced in the metadata log. At least that's the idea. For more details on that topic, see https://dev.to/kevinwallimann/is-structured-streaming-exactly-once-well-it-depends-noe

Deleting the _spark_metadata folder

I hope it's clear by now that this folder should not be deleted. It should not be deleted!

Anyway, let's see what happens if we delete it nonetheless.
For this scenario, let's assume we have a structured streaming query, writing to a folder called /tmp/destination and a checkpoint folder called /tmp/checkpoint-location. After two micro-batches, the folder structure for the checkpoint-folder and the _spark_metadata folder looks like this:

/tmp/checkpoint-location/commits
/tmp/checkpoint-location/commits/0
/tmp/checkpoint-location/commits/1
/tmp/checkpoint-location/metadata
/tmp/checkpoint-location/offsets
/tmp/checkpoint-location/offsets/0
/tmp/checkpoint-location/offsets/1
/tmp/checkpoint-location/sources
/tmp/checkpoint-location/sources/0
/tmp/checkpoint-location/sources/0/0

/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
Enter fullscreen mode Exit fullscreen mode

Now for some reason, the _spark_metadata folder in the destination is deleted or moved, but not the corresponding checkpoints folder.

The following exception will be thrown sooner or later:

Caused by: java.lang.IllegalStateException: /tmp/destination/_spark_metadata/0 doesn't exist when compacting batch 9 (compactInterval: 10)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$3(CompactibleFileStreamLog.scala:187)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2(CompactibleFileStreamLog.scala:185)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2$adapted(CompactibleFileStreamLog.scala:183)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:74)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$1(CompactibleFileStreamLog.scala:183)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:181)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
    at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:75)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
Enter fullscreen mode Exit fullscreen mode

Looking at the checkpoint folder, we see the following files

/tmp/checkpoint-location/commits
/tmp/checkpoint-location/commits/0
/tmp/checkpoint-location/commits/1
/tmp/checkpoint-location/commits/2
/tmp/checkpoint-location/commits/3
/tmp/checkpoint-location/commits/4
/tmp/checkpoint-location/commits/5
/tmp/checkpoint-location/commits/6
/tmp/checkpoint-location/commits/7
/tmp/checkpoint-location/commits/8
/tmp/checkpoint-location/metadata
/tmp/checkpoint-location/offsets
/tmp/checkpoint-location/offsets/0
/tmp/checkpoint-location/offsets/1
/tmp/checkpoint-location/offsets/2
/tmp/checkpoint-location/offsets/3
/tmp/checkpoint-location/offsets/4
/tmp/checkpoint-location/offsets/5
/tmp/checkpoint-location/offsets/6
/tmp/checkpoint-location/offsets/7
/tmp/checkpoint-location/offsets/8
/tmp/checkpoint-location/offsets/9
/tmp/checkpoint-location/sources
/tmp/checkpoint-location/sources/0
/tmp/checkpoint-location/sources/0/0
Enter fullscreen mode Exit fullscreen mode

Meanwhile, the destination folder contains

/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
/tmp/destination/_spark_metadata/4
/tmp/destination/_spark_metadata/5
/tmp/destination/_spark_metadata/6
/tmp/destination/_spark_metadata/7
/tmp/destination/_spark_metadata/8
Enter fullscreen mode Exit fullscreen mode

As we can see, the _spark_metadata folder is missing the files 0 and 1, that were previously deleted.
Instead of simply writing /tmp/destination/_spark_metadata/9, Spark tries to concatenate the files 0, 1, ..., 8 to a file called 9.compact to improve reading efficiency and to avoid the small files problem. This process is called log compaction. That's when the exception is thrown because the files 0 and 1 unexpectedly don't exist. Log compaction doesn't happen in every micro-batch, but the frequency is determined by the compactInterval which is 10 by default.

How to fix the problem

1. Restore the files of the removed _spark_metadata folder

If the deleted _spark_metadata folder has only been moved and can be restored, its files should be restored.
The files of the deleted _spark_metadata folder should be moved into the new _spark_metadata folder. There should be no overlapping filenames.

After restoring the files, the _spark_metadata folder should look like this

/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
/tmp/destination/_spark_metadata/4
/tmp/destination/_spark_metadata/5
/tmp/destination/_spark_metadata/6
/tmp/destination/_spark_metadata/7
/tmp/destination/_spark_metadata/8
Enter fullscreen mode Exit fullscreen mode

Now, the query can be restarted and should finish without errors.

2. Create dummy log files

If the metadata log files are irrecoverable, we could create dummy log files for the missing micro-batches.
In our example, this could be done like this

for i in {0..1}; do echo v1 > "/tmp/destination/_spark_metadata/$i"; done
Enter fullscreen mode Exit fullscreen mode

or on HDFS

for i in {0..1}; do echo v1 > "/tmp/$i"; hdfs dfs -copyFromLocal "/tmp/$i" "/tmp/destination/_spark_metadata/$i"; done
Enter fullscreen mode Exit fullscreen mode

This will create the files

/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
Enter fullscreen mode Exit fullscreen mode

Now, the query can be restarted and should finish without errors.

Note that the information from the metadata log files 0 and 1 will definitely be lost, hence the exactly-once guarantee is lost for micro-batches 0 and 1, and you need to address this problem separately, but at least the query can continue.

3. Deferring compaction

If it's the middle of the night and you simply need that query to continue, or you have no write access to the filesystem, you can buy yourself some time by deferring
the compaction. However, this solution does not solve the root cause.

By default, the compactInterval is 10. You can increase it to e.g. 100 by restarting the query with this additional config

spark-submit --conf spark.sql.streaming.fileSink.log.compactInterval=100 
Enter fullscreen mode Exit fullscreen mode

The same exception will be thrown in 100 micro-batches, so this is really just a very temporary fix to keep the query running for a few more micro-batches.

Eventually, the missing log files have to be recreated.

Top comments (3)

Collapse
 
gupash profile image
Ashish • Edited

Great article.. two cents I would like to add:
In any of the methods mentioned here, It only removes/defers the error for the spark producer job (one writing data on s3). But any consumer job who want to read the data already written on s3, will still face one of the issues mentioned below:
1. If you create the blank 0 file

Error:
Exception in thread "main" java.lang.IllegalStateException: Failed to read log file /Spark-Warehouse/_spark_metadata/0. Incomplete log file
Enter fullscreen mode Exit fullscreen mode

2. If you don't create the blank file:

a. If only 1 batch was present
Error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet at . It must be specified manually
b. If multiple batches were present and you deleted only 1 metadata file
Error: Exception in thread "main" java.lang.IllegalStateException: /Documents/Spark-Warehouse/_spark_metadata/0 doesn't exist (latestId: 1, compactInterval: 10)
Enter fullscreen mode Exit fullscreen mode
Collapse
 
kevinwallimann profile image
Kevin Wallimann

Hi @gupash
Thanks for your comment.
Indeed if you create a blank 0 file, it will throw the error that you posted. However, the dummy log file that I described in the article contains the string "v1". In that case, no error should be thrown on the reader's side. Maybe I could have pointed out this fact more clearly.

Collapse
 
jmagana2000 profile image
jmagana2000

I was missing files 0 through 5 and I just copied 6 and renamed to 0 to 5 and that worked.