Spark Structured Streaming guarantees exactly-once processing for file outputs. One element to maintain that guarantee is a folder called
_spark_metadata
which is located in the output folder. The folder
_spark_metadata
is also known as the "Metadata Log" and its files "Metadata log files". It may look like this:
I hope it's clear by now that this folder should not be deleted. It should not be deleted!
Anyway, let's see what happens if we delete it nonetheless.
For this scenario, let's assume we have a structured streaming query, writing to a folder called /tmp/destination and a checkpoint folder called /tmp/checkpoint-location. After two micro-batches, the folder structure for the checkpoint-folder and the _spark_metadata folder looks like this:
Now for some reason, the _spark_metadata folder in the destination is deleted or moved, but not the corresponding checkpoints folder.
The following exception will be thrown sooner or later:
Caused by: java.lang.IllegalStateException: /tmp/destination/_spark_metadata/0 doesn't exist when compacting batch 9 (compactInterval: 10)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$3(CompactibleFileStreamLog.scala:187)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2(CompactibleFileStreamLog.scala:185)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2$adapted(CompactibleFileStreamLog.scala:183)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:74)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$1(CompactibleFileStreamLog.scala:183)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:181)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:75)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
Enter fullscreen modeExit fullscreen mode
As we can see, the _spark_metadata folder is missing the files 0 and 1, that were previously deleted.
Instead of simply writing /tmp/destination/_spark_metadata/9, Spark tries to concatenate the files 0, 1, ..., 8 to a file called 9.compact to improve reading efficiency and to avoid the small files problem. This process is called log compaction. That's when the exception is thrown because the files 0 and 1 unexpectedly don't exist. Log compaction doesn't happen in every micro-batch, but the frequency is determined by the compactInterval which is 10 by default.
How to fix the problem
1. Restore the files of the removed _spark_metadata folder
If the deleted _spark_metadata folder has only been moved and can be restored, its files should be restored.
The files of the deleted _spark_metadata folder should be moved into the new _spark_metadata folder. There should be no overlapping filenames.
After restoring the files, the _spark_metadata folder should look like this
Now, the query can be restarted and should finish without errors.
2. Create dummy log files
If the metadata log files are irrecoverable, we could create dummy log files for the missing micro-batches.
In our example, this could be done like this
for i in{0..1};do echo v1 >"/tmp/destination/_spark_metadata/$i";doneEnter fullscreen modeExit fullscreen mode
Now, the query can be restarted and should finish without errors.
Note that the information from the metadata log files 0 and 1 will definitely be lost, hence the exactly-once guarantee is lost for micro-batches 0 and 1, and you need to address this problem separately, but at least the query can continue.
3. Deferring compaction
If it's the middle of the night and you simply need that query to continue, or you have no write access to the filesystem, you can buy yourself some time by deferring
the compaction. However, this solution does not solve the root cause.
By default, the compactInterval is 10. You can increase it to e.g. 100 by restarting the query with this additional config
spark-submit --conf spark.sql.streaming.fileSink.log.compactInterval=100
Enter fullscreen modeExit fullscreen mode
The same exception will be thrown in 100 micro-batches, so this is really just a very temporary fix to keep the query running for a few more micro-batches.
Eventually, the missing log files have to be recreated.
Great article.. two cents I would like to add:
In any of the methods mentioned here, It only removes/defers the error for the spark producer job (one writing data on s3). But any consumer job who want to read the data already written on s3, will still face one of the issues mentioned below: 1. If you create the blank 0 file
Error:
Exception in thread "main" java.lang.IllegalStateException: Failed to read log file /Spark-Warehouse/_spark_metadata/0. Incomplete log file
Enter fullscreen modeExit fullscreen mode
a. If only 1 batch was present
Error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet at . It must be specified manually
b. If multiple batches were present and you deleted only 1 metadata file
Error: Exception in thread "main" java.lang.IllegalStateException: /Documents/Spark-Warehouse/_spark_metadata/0 doesn't exist (latestId: 1, compactInterval: 10)
Enter fullscreen modeExit fullscreen mode
MERN (Full Stack Neon Starter kit with complete authentication) Submission for Neon Open Source Starter Kit Challenge
Spurgeon Gnan Prakasham -
Built on Forem — the open source software that powers DEV and other inclusive communities.