Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in 👉
Early Release on the O'Reilly platform
The first section shows the case producing this error. The second section explains why the exception occurs. Finally, the third section gives the solution.
AnalysisException: Queries with streaming sources must be executed with writeStream.start()
Producing this exception is quite easy. Let's create a streaming with structured streaming and apply the consumption logic of low level APIs (e.g. foreach method):
val sparkSession = SparkSession.builder().appName("Failing source")
.master("local[*]")
.getOrCreate()
val rows = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
.collect()
Note that the code above is perfectly valid in Spark SQL semantics:
val ordersReader = sparkSession.read.format("jdbc")
.option("url", InMemoryDatabase.DbConnection)
.option("driver", InMemoryDatabase.DbDriver)
.option("dbtable", "orders")
.option("user", InMemoryDatabase.DbUser)
.option("password", InMemoryDatabase.DbPassword)
.load()
import sparkSession.implicits._
val ordersIds: Array[Int] = ordersReader.select("id")
.map(row => row.getInt(0))
.collect()
// comes from the post: http://www.waitingforcode.com/apache-spark-sql/loading-data-rdbms/read
However, the first snippet executed in the context of structured streaming gives the following exception:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.completeString(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:202)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:62)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
AnalysisException: Queries with streaming sources must be executed with writeStream.start() explained
To understand the problem, we can do several things. In our case, we'll jump between exception's stack trace execution lines and discover what every method is doing.
First of all, the check producing the exception starts in
org.apache.spark.sql.execution.QueryExecution#assertSupported()
. This method is triggered only when the configuration entry
spark.sql.streaming.unsupportedOperationCheck
is set to true, what is the case of executed code.
assertSupported() calls utility method
UnsupportedOperationChecker#checkForBatch(plan: LogicalPlan)
that executes the following function in logic plan (children + parent):
case p if p.isStreaming =>
throwError("Queries with streaming sources must be executed with writeStream.start()")(p)
case _ =>
As you can see, if one of plan's parts is a streaming, the exception we've met is thrown. The key to understand why is hidden in this famous isStreaming part. This isStreaming is in fact a method checking if any of logical plan's parts has a streaming data source:
But what is the difference between streaming and batch data sources ? Quite easy - streaming data source, unlike the batch one, returns data in continuous manner. And how Spark can distinguish them ? In fact, isStreaming method is defined in
LogicalPlan
abstract class and the implementation quoted above is only the
default one
. The logical plan is implemented by every operation leaf (algebraic or language construct) in Spark SQL, as DISTINCT (org.apache.spark.sql.catalyst.plans.logical.Distinct), JOIN (org.apache.spark.sql.catalyst.plans.logical.Join) or WHERE (org.apache.spark.sql.catalyst.plans.logical.Filter). But it's also implemented by structured streaming parts, as
org.apache.spark.sql.execution.streaming.StreamingRelation
used to make a link between data source (Kafka in our case) and logical plan. And this short implementation of LogicalPlan looks like:
As you see, StreamingRelation overrides isStreaming method and returns true. And because this logical plan implementation is constructed when the
DataStreamReader.load()
is called, the code written with batch logic can't simply fit to the streaming requirements.
Solving AnalysisException: Queries with streaming sources must be executed with writeStream.start()
Since the problem is our batch-oriented code, nothing more simple to fix it than converting the code to streaming-oriented. In structured streaming Spark programmers introduced the concepts of sources and sinks. The second ones define how the data is consumed. There are several basic sinks, as foreach (data read in foreach loop), console (data printed to the console) or file (data persisted to files).
The sinks are created directly from read stream by calling
writeStream()
method on loaded DataFrame. It will create an instance of
DataStreamWriter
class that can be used to consume streamed data just after calling its
start()
method.
Fixed code consuming data in foreach loop looks like that:
// It's only a dummy implementation showing how to
// use .foreach() in structured streaming
df.writeStream
.foreach(new ForeachWriter[Row] {
override def process(row: Row): Unit = {
println(s"Processing ${row}")
override def close(errorOrNull: Throwable): Unit = {}
override def open(partitionId: Long, version: Long): Boolean = {
.start()
At first glance the use of structured streaming is not obvious. It differs a little from the DStream-based streaming. The first meaningful difference is that we can't easily consume read data by directly calling actions on source stream. Instead of that we must define the sink to which retrieved data will be put.
📚 Newsletter
Get new posts, recommended reading and other exclusive information every week.
SPAM free
- no 3rd party ads, only the information about waitingforcode!