Spark大数据分析实战 第8章 Structured Streaming结构化流处理引擎  
   
所属分类 spark
浏览量 1300
什么是Structured Streaming
Structured Streaming单词计数
Structured Streaming编程模型
Structured Streaming查询输出
Structured Streaming窗口操作
什么是Structured Streaming
Spark2.0  新的流处理框架Structured Streaming(结构化流)
可伸缩的、容错的流处理引擎,构建在Spark SQL引擎之上
使用Structured Streaming可以在静态数据(Dataset/DataFrame)上像批处理计算一样进行流式计算
随着数据的不断到达,Spark SQL引擎会增量地、连续地对其进行处理,并更新最终结果
可以使用Scala、Java、Python或R中的Dataset/DataFrame API来执行数据流的聚合、滑动窗口计算、流式数据与离线数据的join()操作等
这些操作与Spark SQL使用同一套引擎来执行
通过使用检查点和预写日志来确保端到端的只执行一次
Exactly Once,指每个记录将被精确处理一次,数据不会丢失,并且不会多次处理 
默认情况下,Structured Streaming使用微批处理引擎将数据流作为一系列小批次作业进行处理
实现端到端的延迟低至100毫秒
Spark 2.3  引入了一种新的低延迟处理模式,称为连续处理,
将端到端的延迟进一步降低至1毫秒
流批一体 以同样的方式编写代码
Structured Streaming在底层自动实现快速、可伸缩、容错等处理
Structured Streaming单词计数
Spark 2.0开始,Dataset和DataFrame可以表示静态有界数据,也可以表示流式无界数据
与静态Dataset/DataFrame一样,可以使用公共入口点SparkSession 从数据源创建流式Dataset/DataFrame
并对它们应用与静态Dataset/DataFrame相同的操作
Structured Streaming单词计数
 def main(args: Array[String]): Unit = {
      //创建本地SparkSession 
      val spark = SparkSession
        .builder
        .appName("StructuredNetworkWordCount")
        .master("local[*]")
        .getOrCreate()
      spark.sparkContext.setLogLevel("WARN") 
      // 导入SparkSession对象中的隐式转换
      import spark.implicits._
      //从Socket连接中获取输入流数据创建DataFrame
      val lines: DataFrame = spark.readStream
        .format("socket")
        .option("host", "centos01")
        .option("port", 9999)
        .load()
      //分割每行数据为单词
      val words: Dataset[String] = lines.as[String].flatMap(_.split(" "))
      //计算单词数量(value为默认的列名)
      val wordCounts: DataFrame = words.groupBy("value").count()
      // 输出计算结果,三种模式 complete append update
      val query = wordCounts.writeStream 
        .outputMode("complete")
        .format("console")
        .start()
      //等待查询终止
      query.awaitTermination()
   }
Structured Streaming编程模型
核心思想
将实时数据流视为一张不断追加的表,基于这张表进行处理
这是一个新的流处理模型,非常类似于批处理模型,就像在静态表上执行标准的批处理式查询一样
将输入的数据流视为一张 输入表 ,到达流的每个数据项都像一个新行被追加到输入表 
对输入数据流的查询将生成一张 结果表
每一个触发间隔(例如每1秒)都会向输入表添加新行,最终更新结果表
无论何时更新结果表,都建议将更改后的结果行写入外部存储
单词计数中,Spark将不断检查Socket连接中的新数据
如果有新数据,将运行一个“增量”查询
该查询将以前的计数结果与新数据的计数结果组合起来计算更新后的计数结果
Structured Streaming查询输出
结果输出有三种模式 
完全模式(Complete Mode)更新后的整个结果表写入外部存储 
追加模式(Append Mode)  默认模式  自上次触发后,只将结果表中追加的新行写入外部存储 
更新模式(Update Mode)  只有自上次触发后在结果表中更新(包括增加)的行才会写入外部存储(Spark 2.1.1起可用) 
val query = wordCounts.writeStream
  // 等同于.outputMode(OutputMode.Update)
  .outputMode("update")
  .format("console")
  .start()
支持将计算结果输出到多种外部存储 
(1)文件 将计算结果以文件的形式输出到指定目录中
   默认文件格式为parquet 也支持orc json csv等 
writeStream
    .format("parquet")
    .option("path", "path/to/destination/dir")
    .start()
(2)Kafka  将计算结果输出到一个或多个主题
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "myTopic")
    .start()
(3)控制台 
writeStream
    .format("console")
    .start()
(4)内存 用于小量数据测试
writeStream
    .format("memory")
    .queryName("tableName")
    .start()
Structured Streaming窗口操作
窗口聚合与分组类似  可以使用groupBy()和window()操作来表示窗口聚合 
import spark.implicits._
//流数据DataFrame的schema:{ timestamp: Timestamp, word: String }
val words = ... 
//将数据按窗口和单词分组,并计算每组的数量
val windowedCounts = words.groupBy(
   //窗口
   window($"timestamp", "10 minutes", "5 minutes"), 
   //单词
   $"word" 
).count()
如果某个事件延迟到达 如何处理
例如12:03 生成的单词在 12:11 被应用接收
应用应该使用12:03 这个时间去更新窗口12:00-12:10中的单词计数,而不是12:11
Structured Streaming可以在很长一段时间内维护部分聚合的中间状态
以便延迟数据可以正确更新旧窗口的聚合
Spark 2.1中引入了水印(watermarking)
它允许引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态
水印表示某个时刻(事件时间)以前的数据将不再更新,因此水印指的是一个时间点
每次触发窗口计算的同时会进行水印的计算
首先统计本次聚合操作的窗口数据中的最大事件时间
然后使用最大事件时间减去所能容忍的延迟时间即是水印
当新接收的数据事件时间小于水印时,该数据不会进行计算,在内存中也不会维护该数据的状态
每隔5分钟计算最近10分钟的数据,延迟阈值为10分钟 
 上一篇  
   
 下一篇  
 Spark大数据分析实战 第6章 Kafka分布式消息系统 
 Spark大数据分析实战 第7章 Spark Streaming实时流处理引擎 
 Clickhouse 监控运维常用SQL 
 大数据架构发展趋势之计算和存储分离 
 shell类型查看 
 MAC 重启终端后 配置环境变量不生效