主要内容:介绍Flink中的RichFunction函数。
上一篇文章:
Flink——实战之MySQL Sink
在介绍MySQL Sink时,强调了这里要选择继承RichSinkFunction接口,而非SinkFunction接口:
streaming.addSink(new MyJdbcSink).setParallelism(1)
class MyJdbcSink extends RichSinkFunction[Person] {...}
从而实现将数据从Flink写入MySQL。其实,streaming.addSink()
函数明明需要传入一个SinkFunction,那为何传入RichSinkFunction也可实现自定义Sink呢?
在IDEA中展示RichSinkFunction的继承关系层次结构图:
蓝色实线:extends继承一个抽象类
绿色实线:extends继承一个类
绿色虚线:implements实现一个接口
Function接口是所有用户自定义函数的base interface,RichFunction和SinkFunction都是继承Function的接口。可以看到,SinkFunction和RichFunction接口中有各有不同的方法,而后者的方法更丰富一些,功能也就越多,所以称为“富函数”。
RichFunction接口中各函数介绍
An base interface for all rich user-defined functions. This class defines methods for the life cycle of the functions, as well as methods to access the context in which the functions are executed.
源码给出的解释如上。大意:RichFunction是所有用户自定义富函数的base interface。RichFunction与常规函数的不同在于:它拥有一些具有生命周期的方法,并可以获取函数运行时的上下文。下面解释一下各个方法的作用:
class MyRichFunction extends RichFunction[String] {
override def open(parameters: Configuration): Unit = ???
override def close(): Unit = ???
override def getRuntimeContext: RuntimeContext = ???
override def getIterationRuntimeContext: IterationRuntimeContext = ???
override def setRuntimeContext(t: RuntimeContext): Unit = ???
RichFunction:
open()
:在函数调用前,open()函数先被调用,用于初始化操作;
close()
:生命周期中最后一个被调用的方法,做一些清理工作;
setRuntimeContext(RuntimeContext)
:设置运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子运行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据;
getRuntimeContext()
:获取运行时上下文;
getIterationRuntimeContext()
:获取迭代运行时上下文。
SinkFunction:
invoke()
:将传入的数据写入Sink。需要强调的是,每传入一个数据就要调用一次该函数。若Sink是MySQl,每次都要创建一个连接。
由此可见,RichFunction可以实现更复杂的功能,并能提高性能,节省资源。上面说到的RichSinkFunction类之所以有上述方法,是因为从RichFunction接口继承而来。其实,在Flink当中,所有函数类都有继承或者间接继承RichFunction,都有其Rich版本。
RichFunction实现map算子
下面举一个简单例子:Flink从文件读取数据,经过map算子处理后,取出每行中的name字段,程序结束时输出所统计到的人数。
文本数据:
小明,20,man,180.2
小红,22,woman,178.4
小黑,18,man,192.9
小兰,19,woman,188.0
小爱,30,woman,177.3
main函数:
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream: DataStream[String] = env.readTextFile("D:\\Work\\Code\\flinkdev\\src\\main\\resources\\textfile\\customdata.txt")
env.setParallelism(1)
inputStream.map(new MyRichMapFunction).print("name")
env.execute("RichMapFunction test")
自定义RichMapFunction函数:
class MyRichMapFunction extends RichMapFunction[String, String] {
var count: Int = 0
var startTime: Long = _
val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSS")
override def open(parameters: Configuration): Unit = {
super.open(parameters)
startTime = System.currentTimeMillis()
println("open函数调用时间:" + timeFormat.format(startTime))
println("--------------------------------------------")
override def map(value: String): String = {
println("map函数调用时间:" + timeFormat.format(System.currentTimeMillis()))
count += 1
value.split(",")(0)
override def close(): Unit = {
println("--------------------------------------------")
println("close函数调用时间:" + timeFormat.format(System.currentTimeMillis()))
super.close()
println("共统计个数:" + count)
open函数调用时间:2020-04-19 19:00:19.0331
--------------------------------------------
map函数调用时间:2020-04-19 19:00:19.0616
name> 小爱
map函数调用时间:2020-04-19 19:00:19.0616
name> 小黑
map函数调用时间:2020-04-19 19:00:19.0616
name> 小红
map函数调用时间:2020-04-19 19:00:19.0616
name> 小明
map函数调用时间:2020-04-19 19:00:19.0616
name> 小兰
--------------------------------------------
close函数调用时间:2020-04-19 19:00:19.0616
共统计个数:5
验证了RichFunction中open()、map()
、close()函数执行的顺序以及在函数生命周期内可获取上下文变量count。需要注意:程序中添加了env.setParallelism(1)
,即指定了只使用一个线程。若不设定,默认是取机器的CPU核数。且每调用一次MyRichMapFunction
,就会调用一次open()
、map()
、close()
函数。