class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {
/** Start the thread that simulates receiving data */
def onStart() {
new Thread("Dummy Source") { override def run() { receive() } }.start()
def onStop() { }
/** Periodically generate a random number from 0 to 9, and the timestamp */
private def receive() {
var counter = 0
while(!isStopped()) {
store(Iterator((counter, System.currentTimeMillis)))
counter += 1
Thread.sleep(5000)
// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))
// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)
// Create the stream
val stream = ssc.receiverStream(new DummySource())
// Process RDDs in the batch
stream.foreachRDD { rdd =>
// Access the SQLContext and create a table called demo_numbers we can query
val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
_sqlContext.createDataFrame(rdd).toDF("value", "time")
.registerTempTable("demo_numbers")
// Start the stream processing
ssc.start()
class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {
/** Start the thread that simulates receiving data */
def onStart() {
new Thread("Dummy Source") { override def run() { receive() } }.start()
def onStop() { }
/** Periodically generate a random number from 0 to 9, and the timestamp */
private def receive() {
var counter = 0
while(!isStopped()) {
store(Iterator((counter, System.currentTimeMillis)))
counter += 1
Thread.sleep(5000)
// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))
// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)
// Create the stream
val stream = ssc.receiverStream(new DummySource())
// Process batches in 1 minute windows
stream.window(org.apache.spark.streaming.Minutes(1)).foreachRDD { rdd =>
// Access the SQLContext and create a table called demo_numbers we can query
val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
_sqlContext.createDataFrame(rdd).toDF("value", "time")
.registerTempTable("demo_numbers")
// Start the stream processing
ssc.start()
第一分钟后,会产生 12 个条目 - 窗口中收集到的两个批处理中各有 6 个条目。
value
Spark 流式传输 API 中可用的滑动窗口函数包括 window、countByWindow、reduceByWindow 和 countByValueAndWindow。 有关这些函数的详细信息,请参阅 Transformations on DStreams(DStreams 的转换)。
为提供复原和容错功能,Spark 流式处理依赖检查点,确保即使出现节点故障,流处理仍可以不间断地继续。 Spark 创建持久存储(Azure 存储或 Data Lake Storage)的检查点。 这些检查点存储流式处理应用程序元数据,例如应用程序定义的配置和操作。 以及已排队但尚未处理的所有批处理。 有时,检查点还包括将数据保存在 RDD 中以便更快速地基于由 Spark 托管的 RDD 中存在的内容重新生成数据状态。
部署 Spark 流式处理应用程序
通常将 Spark 流式处理应用程序本地生成到 JAR 文件中。 然后通过将 JAR 文件复制到默认的附加存储将其部署到 HDInsight 上的 Spark。 可以使用 POST 操作,通过群集上可用的 LIVY REST API 启动该应用程序。 POST 的正文包括提供 JAR 路径的 JSON 文档。 和其 main 方法定义并运行流式处理应用程序的类的名称以及可选的作业资源要求(例如执行器、内存和核心的数量)。 以及应用程序代码所需的任何配置设置。
此外,可以使用 GET 请求针对 LIVY 终结点检查所有应用程序的状态。 最后,可以通过针对 LIVY 终结点发出 DELETE 请求终止正在运行的应用程序。 有关 LIVY API 的详细信息,请参阅使用 Apache LIVY 执行远程作业