添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

主要内容:介绍Flink中的RichFunction函数。

上一篇文章: Flink——实战之MySQL Sink 在介绍MySQL Sink时,强调了这里要选择继承RichSinkFunction接口,而非SinkFunction接口:

streaming.addSink(new MyJdbcSink).setParallelism(1)
class MyJdbcSink extends RichSinkFunction[Person] {...}

从而实现将数据从Flink写入MySQL。其实,streaming.addSink()函数明明需要传入一个SinkFunction,那为何传入RichSinkFunction也可实现自定义Sink呢?

在IDEA中展示RichSinkFunction的继承关系层次结构图:

蓝色实线:extends继承一个抽象类

绿色实线:extends继承一个类

绿色虚线:implements实现一个接口

Function接口是所有用户自定义函数的base interface,RichFunction和SinkFunction都是继承Function的接口。可以看到,SinkFunction和RichFunction接口中有各有不同的方法,而后者的方法更丰富一些,功能也就越多,所以称为“富函数”。

RichFunction接口中各函数介绍

An base interface for all rich user-defined functions. This class defines methods for the life cycle of the functions, as well as methods to access the context in which the functions are executed.

源码给出的解释如上。大意:RichFunction是所有用户自定义富函数的base interface。RichFunction与常规函数的不同在于:它拥有一些具有生命周期的方法,并可以获取函数运行时的上下文。下面解释一下各个方法的作用:

class MyRichFunction extends RichFunction[String] {
    override def open(parameters: Configuration): Unit = ???
    override def close(): Unit = ???
    override def getRuntimeContext: RuntimeContext = ???
    override def getIterationRuntimeContext: IterationRuntimeContext = ???
    override def setRuntimeContext(t: RuntimeContext): Unit = ???

RichFunction:

  • open():在函数调用前,open()函数先被调用,用于初始化操作;
  • close():生命周期中最后一个被调用的方法,做一些清理工作;
  • setRuntimeContext(RuntimeContext):设置运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子运行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据;
  • getRuntimeContext():获取运行时上下文;
  • getIterationRuntimeContext():获取迭代运行时上下文。
  • SinkFunction:

  • invoke():将传入的数据写入Sink。需要强调的是,每传入一个数据就要调用一次该函数。若Sink是MySQl,每次都要创建一个连接。
  • 由此可见,RichFunction可以实现更复杂的功能,并能提高性能,节省资源。上面说到的RichSinkFunction类之所以有上述方法,是因为从RichFunction接口继承而来。其实,在Flink当中,所有函数类都有继承或者间接继承RichFunction,都有其Rich版本。

    RichFunction实现map算子

    下面举一个简单例子:Flink从文件读取数据,经过map算子处理后,取出每行中的name字段,程序结束时输出所统计到的人数。

    文本数据:

    小明,20,man,180.2
    小红,22,woman,178.4
    小黑,18,man,192.9
    小兰,19,woman,188.0
    小爱,30,woman,177.3
    

    main函数:

    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val inputStream: DataStream[String] = env.readTextFile("D:\\Work\\Code\\flinkdev\\src\\main\\resources\\textfile\\customdata.txt")
        env.setParallelism(1)
        inputStream.map(new MyRichMapFunction).print("name")
        env.execute("RichMapFunction test")
    

    自定义RichMapFunction函数:

    class MyRichMapFunction extends RichMapFunction[String, String] {
        var count: Int = 0
        var startTime: Long = _
        val timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSS")
        override def open(parameters: Configuration): Unit = {
          super.open(parameters)
          startTime = System.currentTimeMillis()
          println("open函数调用时间:" + timeFormat.format(startTime))
          println("--------------------------------------------")
        override def map(value: String): String = {
          println("map函数调用时间:" + timeFormat.format(System.currentTimeMillis()))
          count += 1
          value.split(",")(0)
        override def close(): Unit = {
          println("--------------------------------------------")
          println("close函数调用时间:" + timeFormat.format(System.currentTimeMillis()))
          super.close()
          println("共统计个数:" + count)
    
    open函数调用时间:2020-04-19 19:00:19.0331
    --------------------------------------------
    map函数调用时间:2020-04-19 19:00:19.0616
    name> 小爱
    map函数调用时间:2020-04-19 19:00:19.0616
    name> 小黑
    map函数调用时间:2020-04-19 19:00:19.0616
    name> 小红
    map函数调用时间:2020-04-19 19:00:19.0616
    name> 小明
    map函数调用时间:2020-04-19 19:00:19.0616
    name> 小兰
    --------------------------------------------
    close函数调用时间:2020-04-19 19:00:19.0616
    共统计个数:5
    

    验证了RichFunction中open()、map()、close()函数执行的顺序以及在函数生命周期内可获取上下文变量count。需要注意:程序中添加了env.setParallelism(1),即指定了只使用一个线程。若不设定,默认是取机器的CPU核数。且每调用一次MyRichMapFunction,就会调用一次open()map()close()函数。