Show hidden characters
Once you click Run, you will see the output (wordcount.show()) in the Driver Console as below.
You will see the standard output as below. This continues to display until it receives the data from Kafka.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/16 21:30:43 INFO SparkContext: Running Spark version 1.6.1
16/06/16 21:30:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/16 21:30:45 INFO SecurityManager: Changing view acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: Changing modify acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vgiridatabricks); users with modify permissions: Set(vgiridatabricks)
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriver' on port 49990.
16/06/16 21:30:45 INFO Slf4jLogger: Slf4jLogger started
16/06/16 21:30:45 INFO Remoting: Starting remoting
16/06/16 21:30:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:49991]
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 49991.
16/06/16 21:30:45 INFO SparkEnv: Registering MapOutputTracker
16/06/16 21:30:45 INFO SparkEnv: Registering BlockManagerMaster
16/06/16 21:30:45 INFO DiskBlockManager: Created local directory at /private/var/folders/4q/fjc7vkmn4wn_m4fynrvxk2hc0000gn/T/blockmgr-568204eb-276e-4be7-8b0d-5d451f439eef
16/06/16 21:30:45 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/06/16 21:30:45 INFO SparkEnv: Registering OutputCommitCoordinator
16/06/16 21:30:46 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/06/16 21:30:46 INFO SparkUI: Started SparkUI at http://10.10.10.111:4040
16/06/16 21:30:46 INFO Executor: Starting executor ID driver on host localhost
16/06/16 21:30:46 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49992.
16/06/16 21:30:46 INFO NettyBlockTransferService: Server created on 49992
16/06/16 21:30:46 INFO BlockManagerMaster: Trying to register BlockManager
16/06/16 21:30:46 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49992 with 2.4 GB RAM, BlockManagerId(driver, localhost, 49992)
16/06/16 21:30:46 INFO BlockManagerMaster: Registered BlockManager
16/06/16 21:30:46 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
16/06/16 21:30:46 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:46 INFO VerifiableProperties: Property group.id is overridden to
16/06/16 21:30:46 INFO VerifiableProperties: Property zookeeper.connect is overridden to
Creating function called to create new StreamingContext
16/06/16 21:30:47 INFO WriteAheadLogManager for Thread: Recovered 2 write ahead log files from file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/receivedBlockMetadata
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval automatically set to 10000 ms
16/06/16 21:30:47 INFO ForEachDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO StateDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO MappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO FlatMappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO ForEachDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO StateDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO MappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO FlatMappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO FlatMappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO FlatMappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO FlatMappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO MappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO MappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO MappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO StateDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO StateDStream: Storage level = StorageLevel(false, true, false, false, 1)
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval = 10000 ms
16/06/16 21:30:47 INFO StateDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO StateDStream: Initialized and validated org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO ForEachDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO ForEachDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO ForEachDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO RecurringTimer: Started timer for JobGenerator at time 1466137848000
16/06/16 21:30:47 INFO JobGenerator: Started JobGenerator at 1466137848000 ms
16/06/16 21:30:47 INFO JobScheduler: Started JobScheduler
16/06/16 21:30:47 INFO StreamingContext: StreamingContext started
16/06/16 21:30:48 INFO StateDStream: Time 1466137846000 ms is invalid as zeroTime is 1466137846000 ms and slideDuration is 2000 ms and difference is 0 ms
16/06/16 21:30:48 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:48 INFO VerifiableProperties: Property group.id is overridden to
16/06/16 21:30:48 INFO VerifiableProperties: Property zookeeper.connect is overridden to
16/06/16 21:30:48 INFO JobScheduler: Added jobs for time 1466137848000 ms
16/06/16 21:30:48 INFO JobGenerator: Checkpointing graph for time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updating checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO JobScheduler: Starting job streaming job 1466137848000 ms.0 from job set of time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updated checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO CheckpointWriter: Submitted checkpoint of time 1466137848000 ms writer queue
16/06/16 21:30:48 INFO CheckpointWriter: Saving checkpoint for time 1466137848000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000'
16/06/16 21:30:48 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000.bk
16/06/16 21:30:48 INFO CheckpointWriter: Checkpoint for time 1466137848000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000', took 4209 bytes and 37 ms
16/06/16 21:30:49 INFO SparkContext: Starting job: show at KafkaSSCtoES.scala:71
16/06/16 21:30:49 INFO DAGScheduler: Registering RDD 2 (map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO DAGScheduler: Got job 0 (show at KafkaSSCtoES.scala:71) with 1 output partitions
16/06/16 21:30:49 INFO DAGScheduler: Final stage: ResultStage 1 (show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.8 KB, free 4.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.5 KB, free 7.3 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:49992 (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 0
16/06/16 21:30:49 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 5 is the same as ending offset skipping wordcounttopic 1
16/06/16 21:30:49 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 90 ms on localhost (1/3)
16/06/16 21:30:49 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16/06/16 21:30:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 11 ms on localhost (2/3)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 2
16/06/16 21:30:49 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 9 ms on localhost (3/3)
16/06/16 21:30:49 INFO DAGScheduler: ShuffleMapStage 0 (map at KafkaSSCtoES.scala:63) finished in 0.114 s
16/06/16 21:30:49 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/06/16 21:30:49 INFO DAGScheduler: looking for newly runnable stages
16/06/16 21:30:49 INFO DAGScheduler: running: Set()
16/06/16 21:30:49 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/06/16 21:30:49 INFO DAGScheduler: failed: Set()
16/06/16 21:30:49 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.4 KB, free 16.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.7 KB, free 21.4 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:49992 (size: 4.7 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 1894 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 1.0 (TID 3)
16/06/16 21:30:49 INFO CacheManager: Partition rdd_4_0 not found, computing it
16/06/16 21:30:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:49992 in memory (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 3 blocks
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
16/06/16 21:30:49 INFO MemoryStore: Block rdd_4_0 stored as bytes in memory (estimated size 4.0 B, free 14.1 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:49992 (size: 4.0 B, free: 2.4 GB)
16/06/16 21:30:50 INFO GenerateUnsafeProjection: Code generated in 175.318511 ms
16/06/16 21:30:50 INFO JobScheduler: Added jobs for time 1466137850000 ms
16/06/16 21:30:50 INFO JobGenerator: Checkpointing graph for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updating checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updated checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO CheckpointWriter: Submitted checkpoint of time 1466137850000 ms writer queue
16/06/16 21:30:50 INFO CheckpointWriter: Saving checkpoint for time 1466137850000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000'
16/06/16 21:30:50 INFO GenerateSafeProjection: Code generated in 12.79301 ms
16/06/16 21:30:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 3). 1915 bytes result sent to driver
16/06/16 21:30:50 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000
16/06/16 21:30:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 493 ms on localhost (1/1)
16/06/16 21:30:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/06/16 21:30:50 INFO CheckpointWriter: Checkpoint for time 1466137850000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000', took 4259 bytes and 27 ms
16/06/16 21:30:50 INFO DAGScheduler: ResultStage 1 (show at KafkaSSCtoES.scala:71) finished in 0.496 s
16/06/16 21:30:50 INFO DAGScheduler: Job 0 finished: show at KafkaSSCtoES.scala:71, took 0.865830 s
+----+-----+
|word|count|
+----+-----+
+----+-----+
Now I hit the Enter in the Kafka Producer Console and checking the Driver Output in IntelliJ.
Now see the Output in Elastic Search head plugin :
Start the elasticsearch service as below.
$ cd /usr/local/Cellar/elastic*/bin
./elasticsearch
As per the Above program we are creating the Index Name and Type as kafkawordcount/kwc. Below snapshot shows you the head plugin before the execution of Spark Steaming job. You can see no index name called
kafkawordcount
See the out put in elastic search head plugin after First Hit in the Kafka Producer Console of the below words.
Hello Kafka from Spark Streaming Learning Kafka is fun. and learning kafka with spark streaming is more fun and even when the destination is elasticsearch it will be more fun
See the full output:
Hope you enjoyed this tutorial.
Hey. Thanks for the tutorial. I tried to publish protobuf encoded stream to a kafka broker but it seems that the spark instance crashes if I am subscribed to that topic even if I’m not doing anything with that data. Can you (or someone from the community) help me with figuring out how spark will “not” crash if I throw out bytestreams (like protobufs). It seems non-ascii data is what causes the crashes.
I am looking (albeit rather unsuccessfully) into streaming protocol buffers over Kafka for IoT data.
Thanks.
Like Like
That’s correct. Spark crashes for any non ascii present in your data streams. You might want to use utf-8 conversion or ignore before sending to worker nodes. I think one of my other blog explains this. Please search for it.
Like Like
Hey, thanks for the reply. I solved the issue by overriding the default utf8 conversion to string in pyspark. Do you know why utf8 is the default? It has caused me quite a few headaches. IMO, a bytestream (or equivalent) should be default.
Like Liked by 1 person
Hello Giri,
I am not able to resolve import error
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
object kafka is not a member of package org.apache.spark.streaming
not found: object kafka
Getting this two error in ScalaIDE and while compile in SBT also – i have included the dependencies, not sure what I am missing, can you please help me.
Providing some details below, if you need additional information, please let me know.
My Build.sbt
libraryDependencies ++= Seq(
“org.apache.spark” %% “spark-core” % “2.0.0” % “provided”,
“org.apache.spark” %% “spark-streaming” % “2.0.0” % “provided”,
“org.apache.spark” %% “spark-streaming-kafka-0-8” % “2.0.0”,
“org.apache.kafka” % “kafka_2.11” % “0.9.0.1”
Error while compile using sbt:
[error] C:\Parthi\Knowledge\kafkaspark\src\main\scala\KafkaExample.scala:36: value createDirectStream is not a member of object org.apache.spark.streaming.kafka.KafkaUtils
[error] val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 2 s, completed Feb 11, 2017 2:41:06 PM
Like Like
Parthiban,
Please find my github code here https://github.com/vgiri2015/spark-latest-1.6.1/blob/master/build.sbt . You can remove org.apache.kafka dependency here.
Also look at this https://hadoopist.wordpress.com/2016/08/09/creat-a-simple-build-sbt-file-for-a-spark-project/
Like Like