添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Spark Structured Streaming guarantees exactly-once processing for file outputs. One element to maintain that guarantee is a folder called _spark_metadata which is located in the output folder. The folder _spark_metadata is also known as the "Metadata Log" and its files "Metadata log files". It may look like this:

/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
    Enter fullscreen mode
    Exit fullscreen mode

When Spark writes a file to the output folder, it writes the absolute path of the added file to the metadata log file of the current micro-batch.

If a partial write occurs, that filename will not be added to the metadata log, and that's how Spark can maintain exactly-once semantics.

When Spark reads a file from the output folder, it only reads from files that are referenced in the metadata log. At least that's the idea. For more details on that topic, see https://dev.to/kevinwallimann/is-structured-streaming-exactly-once-well-it-depends-noe

I hope it's clear by now that this folder should not be deleted. It should not be deleted!

Anyway, let's see what happens if we delete it nonetheless.
For this scenario, let's assume we have a structured streaming query, writing to a folder called /tmp/destination and a checkpoint folder called /tmp/checkpoint-location. After two micro-batches, the folder structure for the checkpoint-folder and the _spark_metadata folder looks like this:

/tmp/checkpoint-location/commits
/tmp/checkpoint-location/commits/0
/tmp/checkpoint-location/commits/1
/tmp/checkpoint-location/metadata
/tmp/checkpoint-location/offsets
/tmp/checkpoint-location/offsets/0
/tmp/checkpoint-location/offsets/1
/tmp/checkpoint-location/sources
/tmp/checkpoint-location/sources/0
/tmp/checkpoint-location/sources/0/0
/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
    Enter fullscreen mode
    Exit fullscreen mode

Now for some reason, the _spark_metadata folder in the destination is deleted or moved, but not the corresponding checkpoints folder.

The following exception will be thrown sooner or later:

Caused by: java.lang.IllegalStateException: /tmp/destination/_spark_metadata/0 doesn't exist when compacting batch 9 (compactInterval: 10)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$3(CompactibleFileStreamLog.scala:187)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2(CompactibleFileStreamLog.scala:185)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2$adapted(CompactibleFileStreamLog.scala:183)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:74)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$1(CompactibleFileStreamLog.scala:183)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:181)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
    at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:75)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
    Enter fullscreen mode
    Exit fullscreen mode
/tmp/checkpoint-location/commits
/tmp/checkpoint-location/commits/0
/tmp/checkpoint-location/commits/1
/tmp/checkpoint-location/commits/2
/tmp/checkpoint-location/commits/3
/tmp/checkpoint-location/commits/4
/tmp/checkpoint-location/commits/5
/tmp/checkpoint-location/commits/6
/tmp/checkpoint-location/commits/7
/tmp/checkpoint-location/commits/8
/tmp/checkpoint-location/metadata
/tmp/checkpoint-location/offsets
/tmp/checkpoint-location/offsets/0
/tmp/checkpoint-location/offsets/1
/tmp/checkpoint-location/offsets/2
/tmp/checkpoint-location/offsets/3
/tmp/checkpoint-location/offsets/4
/tmp/checkpoint-location/offsets/5
/tmp/checkpoint-location/offsets/6
/tmp/checkpoint-location/offsets/7
/tmp/checkpoint-location/offsets/8
/tmp/checkpoint-location/offsets/9
/tmp/checkpoint-location/sources
/tmp/checkpoint-location/sources/0
/tmp/checkpoint-location/sources/0/0
    Enter fullscreen mode
    Exit fullscreen mode
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
/tmp/destination/_spark_metadata/4
/tmp/destination/_spark_metadata/5
/tmp/destination/_spark_metadata/6
/tmp/destination/_spark_metadata/7
/tmp/destination/_spark_metadata/8
    Enter fullscreen mode
    Exit fullscreen mode

As we can see, the _spark_metadata folder is missing the files 0 and 1, that were previously deleted.
Instead of simply writing /tmp/destination/_spark_metadata/9, Spark tries to concatenate the files 0, 1, ..., 8 to a file called 9.compact to improve reading efficiency and to avoid the small files problem. This process is called log compaction. That's when the exception is thrown because the files 0 and 1 unexpectedly don't exist. Log compaction doesn't happen in every micro-batch, but the frequency is determined by the compactInterval which is 10 by default.

How to fix the problem

1. Restore the files of the removed _spark_metadata folder

If the deleted _spark_metadata folder has only been moved and can be restored, its files should be restored.
The files of the deleted _spark_metadata folder should be moved into the new _spark_metadata folder. There should be no overlapping filenames.

After restoring the files, the _spark_metadata folder should look like this

/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
/tmp/destination/_spark_metadata/4
/tmp/destination/_spark_metadata/5
/tmp/destination/_spark_metadata/6
/tmp/destination/_spark_metadata/7
/tmp/destination/_spark_metadata/8
    Enter fullscreen mode
    Exit fullscreen mode

Now, the query can be restarted and should finish without errors.

2. Create dummy log files

If the metadata log files are irrecoverable, we could create dummy log files for the missing micro-batches.
In our example, this could be done like this

for i in {0..1}; do echo v1 > "/tmp/destination/_spark_metadata/$i"; done
    Enter fullscreen mode
    Exit fullscreen mode

Now, the query can be restarted and should finish without errors.

Note that the information from the metadata log files 0 and 1 will definitely be lost, hence the exactly-once guarantee is lost for micro-batches 0 and 1, and you need to address this problem separately, but at least the query can continue.

3. Deferring compaction

If it's the middle of the night and you simply need that query to continue, or you have no write access to the filesystem, you can buy yourself some time by deferring
the compaction. However, this solution does not solve the root cause.

By default, the compactInterval is 10. You can increase it to e.g. 100 by restarting the query with this additional config

spark-submit --conf spark.sql.streaming.fileSink.log.compactInterval=100 
    Enter fullscreen mode
    Exit fullscreen mode

The same exception will be thrown in 100 micro-batches, so this is really just a very temporary fix to keep the query running for a few more micro-batches.

Eventually, the missing log files have to be recreated.

Great article.. two cents I would like to add:
In any of the methods mentioned here, It only removes/defers the error for the spark producer job (one writing data on s3). But any consumer job who want to read the data already written on s3, will still face one of the issues mentioned below:
1. If you create the blank 0 file

Error:
Exception in thread "main" java.lang.IllegalStateException: Failed to read log file /Spark-Warehouse/_spark_metadata/0. Incomplete log file
    Enter fullscreen mode
    Exit fullscreen mode
a. If only 1 batch was present
Error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet at . It must be specified manually
b. If multiple batches were present and you deleted only 1 metadata file
Error: Exception in thread "main" java.lang.IllegalStateException: /Documents/Spark-Warehouse/_spark_metadata/0 doesn't exist (latestId: 1, compactInterval: 10)
    Enter fullscreen mode
    Exit fullscreen mode
            

MERN (Full Stack Neon Starter kit with complete authentication) Submission for Neon Open Source Starter Kit Challenge

Spurgeon Gnan Prakasham -

Built on Forem — the open source software that powers DEV and other inclusive communities.

Made with love and Ruby on Rails. DEV Community © 2016 - 2024.