I recently encoutered the following intimidating exception while hacking on
Spectrum
’s ML pipeline:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
textSocket
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
// ...snip a bunch of superfluous lines...
at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2160)
at org.apache.spark.sql.Dataset.first(Dataset.scala:2167)
at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
at com.spectrumio.data.models.toxicity.VectorAssemblerBug$.main(VectorAssemblerBug.scala:34)
at com.spectrumio.data.models.toxicity.VectorAssemblerBug.main(VectorAssemblerBug.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Note that the above stack trace is taken from a job created solely to reproduce the issue, not our production streaming job. See the appendix for its source.
Numerous people on Stack Overflow posted similar exceptions, however surprisingly none of the accepted solutions solved my particular problem. So, I fired up my debugger and began painstakingly stepping through the Spark codebase in order to find the cause of the exception.
Big Endian Data has a great tutorial on configuring Spark for remote debugging in IntelliJ.
As the stack trace implies, the VectorAssembler
stage of the pipeline was the culprit. VectorAssembler
s, for the unfamiliar, perform column-wise concatenation of feature vectors prior to passing them into a classification algorithm. Interestingly, Spark’s VectorAssembler
calls first()
for each inputted dataset, which is unsupported in streaming contexts. The following snippet shows why:
override def transform(dataset: Dataset[_]): DataFrame = {
// Schema transformation.
val schema = dataset.schema
lazy val first = dataset.toDF.first()
val attrs = $(inputCols).flatMap { c =>
val field = schema(c)
val index = schema.fieldIndex(c)
field.dataType match {
// ...unrelated cases elided
case _: VectorUDT =>
val group = AttributeGroup.fromStructField(field)
if (group.attributes.isDefined) {
// If attributes are defined, copy them with updated names.
group.attributes.get.zipWithIndex.map { case (attr, i) =>
if (attr.name.isDefined) {
// TODO: Define a rigorous naming scheme.
attr.withName(c + "_" + attr.name.get)
} else {
attr.withName(c + "_" + i)
} else {
// Otherwise, treat all attributes as numeric. If we cannot get the number of attributes
// from metadata, check the first row.
val numAttrs = group.numAttributes.getOrElse(first.getAs[Vector](index).size)
Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName(c + "_" + i))
// unrelated cases elided
// ...rest of the implementation
Here, we see Spark is trying to rename the vector’s ML attributes based on their indices. If there are no attributes defined, Spark assigns each column a NumericAttribute
named after its index in the vector. However, those column names are populated using Scala’s tabulate
method, which requires an array length. Since the array length in this case is calculated using the number of columns in the first row of the dataset, the lazy val first = dataset.toDF.first()
expression is evaluated and the exception is thrown.
This issue can be worked around. If you’re concatenating VectorUDT
dataframes - which you likely are if you’re using any of Spark’s default feature extraction classes - you can extract the underlying UDF that VectorAssembler
uses and use it instead:
val assemblerUdf = udf((vv: Any*) => {
val indices = ArrayBuilder.make[Int]
val values = ArrayBuilder.make[Double]
var cur = 0
vv.foreach {
case vec: Vector =>
vec.foreachActive { case (i, v) =>
if (v != 0.0) {
indices += cur + i
values += v
cur += vec.size
case null =>
throw new SparkException("Values to assemble cannot be null.")
case o =>
throw new SparkException(s"$o of type ${o.getClass.getName} is not supported.")
Vectors.sparse(cur, indices.result(), values.result()).compressed
You shouldn’t be affected by this issue if you’re concatenating non-VectorUDT
dataframes, so the above UDF does not include the original logic that handles DoubleType
columns. It should be easy enough to add back in if you need it.
The Spark team is aware of this issue. It took me several hours of searching, but eventually I found SPARK-21926, which tracks this issue as well as some others involving Transformers and streaming dataframes. When I have the chance, I’ll see if I can contribute a fix to Spark core.
As always, I’d love to hear your feedback on this post. Shoot me an e-mail at [email protected] and I’ll be sure to get back to you.
Appendix
If you’re interested, here’s a quick-and-dirty Spark job that reproduces the issue consistently:
import org.apache.spark.SparkException
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
import scala.collection.mutable.ArrayBuilder
object VectorAssemblerBug {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("VectorAssemblerBug")
.getOrCreate
import spark.implicits._
val counter = udf((s: String) => Vectors.dense(s.split(" ").length))
val assembler = new VectorAssembler()
.setInputCols(Array("count"))
.setOutputCol("assembled")
// fit pipeline to empty data to get a PipelineModel - VectorAssembler doesn't need training
val pipeline = new Pipeline()
.setStages(Array(assembler))
.fit(Seq.empty[Int].toDF("count"))
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val wordCounts = pipeline.transform(lines.withColumn("count", counter(col("value"))))
val query = wordCounts.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()