快速入门Flink (6) —— Flink的广播变量、累加器与分布式缓存
写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,
写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新
。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站: http://alices.ibilibili.xyz/ , 博客主页: https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影
。我希望在最美的年华,做最好的自己
!
通过快速入门Flink系列的(1-5)篇博客,博主已经为大家介绍了一些Flink中常见的概念与一些基础的操作,感兴趣的朋友们可以收藏一下菌哥的Flink专栏哟(? 快速入门Flink )。本篇博客,博主为大家介绍的是 Flink的广播变量、累加器与分布式缓存 。
码字不易,先赞后看
1.5 Flink的广播变量
Flink支持 广播变量 ,就是将数据广播到具体的 taskmanager 上,数据存储在内存中, 这样可以减缓大量的 shuffle 操作; 比如在数据 join 阶段,不可避免的就是大量的 shuffle 操作,我们可以把其中一个 dataSet 广播出去,一直加载到 taskManager 的内存 中,可以直接在内存中拿数据,避免了大量的 shuffle, 导致集群性能下降; 广播变量创建后,它可以运行在集群中的任何 function 上,而不需要多次传递给集群节点。另外需要 记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。
一句话解释,可以理解为是一个 公共的共享变量 ,我们可以把一个 dataset 数据集广播出去, 然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。 如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集, 比较浪费内存(也就是一个节点中可能会存在多份 dataset 数据)。
注意: 因为广播变量是要把 dataset 广播到内存中,所以广播的数据量不能太大,否则会出现OOM这样的问题。
- Broadcast:Broadcast 是通过 withBroadcastSet(dataset,string)来注册的
- Access:通过 getRuntimeContext().getBroadcastVariable(String)访问广播变量
让我们来通过一张图来感受下,使用广播变量和不使用广播变量,到底差在哪里。
小结一下:
■ 可以理解广播就是一个公共的共享变量 ■ 将一个数据集广播后,不同的Task 都可以在节点上获取到 ■ 每个节点只存一份 ■ 如果不使用广播,每一个 Task 都会拷贝一份数据集,造成内存资源浪费
用法:
在需要使用广播的操作后,使用withBroadcastSet 创建广播
在操作中,使用 getRuntimeContext.getBroadcastVariable [广播数据类型] ( 广播名 ) 获取广播变量
示例:
创建一个学生数据集,包含以下数据:
|学生 ID | 姓名 | |------|------| List((1, “张三”), (2, “李四”), (3, “王五”))
将该数据,发布到广播。再创建一个 成绩 数据集。
|学生 ID | 学科 | 成绩 | |------|------|-----| List( (1, “语文”, 50),(2, “数学”, 70), (3, “英文”, 86))
请通过广播获取到学生姓名,将数据转换为
List( (“张三”, “语文”, 50),(“李四”, “数学”, 70), (“王五”, “英文”, 86))
步骤
1) 获取批处理运行环境
2) 分别创建两个数据集
3) 使用 RichMapFunction 对 成绩 数据集进行 map 转换
4) 在数据集调用 map 方法后,调用 withBroadcastSet 将 学生 数据集创建广播
5) 实现 RichMapFunction
a. 将成绩数据(学生 ID,学科,成绩) -> (学生姓名,学科,成绩) b. 重写 open 方法中,获取广播数据 c. 导入 scala.collection.JavaConverters._ 隐式转换 d. 将广播数据使用 asScala 转换为 Scala 集合,再使用 toList 转换为 scala List 集合 e. 在 map 方法中使用广播进行转换
6) 打印测试
参考代码
import java.util
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.scala._
* @Author: Alice菌
* @Date: 2020/8/1 20:30
* @Description:
object BatchBroadcastDemo {
def main(args: Array[String]): Unit = {
// 1、创建批处理运行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 2、分别创建两个数据集
// 创建学生数据集
val stuDataSet: DataSet[(Int, String)] = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五")))
// 创建成绩数据集
val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))
// 3、使用RichMapFunction 对成绩数据集进行map转换
// 返回值类型(学生名字,学科称名,成绩)
val result: DataSet[(String, String, Int)] = scoreDataSet.map(
new RichMapFunction[(Int, String, Int), (String, String, Int)] {
// 定义获取学生数据集的集合
var studentMap: Map[Int, String] = _
// 初始化的时候被执行一次,在对象的生命周期中只被执行一次
override def open(parameters: Configuration): Unit = {
// 因为获取到的广播变量中的数据类型是java的集合类型,但是我们的代码是scala,因此需要将java的集合转换成scala的集合
// 我们这里将list转换成了map对象,之所以能够转换是因为list中的元素是对偶元组,因此可以转换成 kv 键值对类型
// 之所以要转换,是因为后面好用,传递一个学生id,可以直接获取到学生的名字
import scala.collection.JavaConversions._
// 获取到广播变量的内容
val studentList: util.List[(Int, String)] = getRuntimeContext.getBroadcastVariable[(Int, String)]("student")
studentMap = studentList.toMap
// 要对集合中的每个元素执行map操作,也就是说集合中有多少元素,就被执行多少次
override def map(value: (Int, String, Int)): (String, String, Int) = {
//(Int, String, Int)=》(学生id,学科名字,学生成绩)
//返回值类型(学生名字,学科名,成绩)
val stuId: Int = value._1
val stuName: String = studentMap.getOrElse(stuId, "")
//(学生名字,学科名,成绩)
(stuName, value._2, value._3)
}).withBroadcastSet(stuDataSet,"student")
result.print()
//(张三,语文,50)
//(李四,数学,70)
//(王五,英文,86)
}
1.6 Flink的分布式缓存
Flink 提供了一个类似于 Hadoop 的分布式缓存,让并行运行实例的函数可以在本地访 问。这 个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等! 缓存的使用流程:
使用 ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS 上的文件),为缓存文件指定一个名字注册该缓存文件!当程序执行时候,Flink 会自动将复制文件或者目录到所有 worker 节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!
【注意】广播是将变量分发到各个 worker 节点的内存上,分布式缓存是将文件缓存到各个 worker 节点上 ;
用法
- 使用 Flink 运行时环境的 registerCachedFile
- 在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 文件名 )获取分布式缓存
示例
创建一个成绩数据集
List( (1, “语文”, 50),(2, “数学”, 70), (3, “英文”, 86))
请通过分布式缓存获取到学生姓名,将数据转换为
List( (“张三”, “语文”, 50),(“李四”, “数学”, 70), (“王五”, “英文”, 86))
注: distribute_cache_student 测试文件保存了学生 ID 以及学生姓名
1,张三
操作步骤
- 将 distribute_cache_student 文件上传到 HDFS / 目录下
- 获取批处理运行环境
- 创建成绩数据集
- 对成绩数据集进行 map 转换,将(学生 ID, 学科, 分数)转换为(学生姓名,学科, 分数)
a. RichMapFunction 的 open 方法中,获取分布式缓存数据 b. 在 map 方法中进行转换
- 实现 open 方法
a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件 b. 使用 Scala.fromFile 读取文件,并获取行 c. 将文本转换为元组(学生 ID,学生姓名),再转换为 List
- 实现 map 方法
a. 从分布式缓存中根据学生 ID 过滤出来学生 b. 获取学生姓名 c. 构建最终结果元组
参考代码
import java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.scala._
import scala.io.Source
* @Author: Alice菌
* @Date: 2020/8/1 22:40
* @Description:
object BatchDisCachedFile {
def main(args: Array[String]): Unit = {
// 获取批处理运行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 注册一个分布式缓存
env.registerCachedFile("hdfs://node01:8020/test/input/distribute_cache_student","student")
// 创建成绩数据集
val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))
val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(
new RichMapFunction[(Int, String, Int), (String, String, Int)] {
var studentMap: Map[Int, String] = _
// 初始化的时候被调用一次
override def open(parameters: Configuration): Unit = {
// 获取分布式缓存的文件
val studentFile: File = getRuntimeContext.getDistributedCache.getFile("student")
val linesIter: Iterator[String] = Source.fromFile(studentFile).getLines()
studentMap = linesIter.map(lines => {
val words: Array[String] = lines.split(",")
(words(0).toInt, words(1))
}).toMap
override def map(value: (Int, String, Int)): (String, String, Int) = {
val stuName: String = studentMap.getOrElse(value._1, "")
(stuName, value._2, value._3)
// 输出结果
resultDataSet.print()
//(张三,语文,50)
//(李四,数学,70)
//(王五,英文,86)
}
1.7 Flink Accumulators & Counters
Accumulator 即 累加器 ,与 Mapreduce counter 的应用场景差不多,都能很好地观察 task 在运行期间的数据变化 可以在 Flink job 任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter 是一个具体的累加器 (Accumulator) ,我们可以实 现 IntCounter, LongCounter 和 DoubleCounter。
步骤
1) 创建累加器
private IntCounter numLines = new IntCounter();
2) 注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
3) 使用累加器
this.numLines.add(1);
4) 获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")
示例:
需求: 给定一个数据源 “a”,“b”,“c”,“d” 通过累加器打印出多少个元素
参考代码
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
* @Author: Alice菌
* @Date: 2020/8/1 23:26
* @Description:
object BatchCounterDemo {
def main(args: Array[String]): Unit = {
//1、创建执行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//2、创建执行环境
val sourceDataSet: DataSet[String] = env.fromElements("a","b","c","d")
//3、对sourceDataSet 进行map操作
val resultDataSet: DataSet[String] = sourceDataSet.map(new RichMapFunction[String, String] {
// 创建累加器
val counter: IntCounter = new IntCounter
// 初始化的时候执行一次
override def open(parameters: Configuration): Unit = {
// 注册累加器
getRuntimeContext.addAccumulator("MyAccumulator", this.counter)
// 初始化的时候被执行一次
override def map(value: String): String = {
counter.add(1)
value
resultDataSet.writeAsText("data/output/Accumulators",WriteMode.OVERWRITE)
val result: JobExecutionResult = env.execute("BatchCounterDemo")
val MyAccumlatorValue: Int = result.getAccumulatorResult[Int]("MyAccumulator")