添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

解决flink问题小技巧: 一般使用flink中出现了问题, 可以在Apache Flink 中文用户邮件列表: http://apache-flink.147419.n8.nabble.com/ 找一下, 是不是有人也遇到过这样的问题

两种方案:

1:  通过state先缓存收到的数据;然后等到广播流到达后再进行处理

2:  先在正常数据处理流的open方法中初始化访问一次配置,后续配置变更时再去使用广播中的数据对配置进行更新

这里我采用的是第二种方案:
比如说flink 中的 connect

streamData.keyBy(d => {
}).connect(testConfigBroadcastStream)
  .process(new testProcessFunction())
  .name("connect")

我会在process 中的testProcessFunction类的open方法中初始化访问一次配置

class testProcessFunction extends
  KeyedBroadcastProcessFunction[String, (String, String, String), mutable.Map[String, (String, String)], (String, String)] {
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    // 初始化访问一下配置

     主要实现方案:

      历史的配置信息是从oracle读取, 增量的配置信息从kafka获取; 为了保证广播流比数据流先到,是在testProcessFunction类的open初始化的时候就去读取oracle配置信息。成功后配置缓存到concurrent.TrieMap中。

testProcessFunction
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.util.Collector import org.slf4j.{Logger, LoggerFactory} import scala.collection.concurrent.TrieMap import scala.collection.mutable.ListBuffer import scala.collection.{concurrent, mutable} import scala.util.control.Breaks._ class testProcessFunctionextends KeyedBroadcastProcessFunction[String, FdcData[IndicatorResult], FdcConfig[IndicatorConfig], ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]] with IndicatorCommFunction { private val logger: Logger = LoggerFactory.getLogger(classOf[FdcDriftConfigBroadcastProcessFunction]) // 配置 {"indicatorid": {"controlPlanId": {"version": { "X" : "IndicatorConfig"}}}} var driftIndicatorByAll: TrieMap[String, TrieMap[String, TrieMap[String, TrieMap[String, IndicatorConfig]]]] = IndicatorByAll //数据缓存: var driftValue = new mutable.HashMap[String, mutable.HashMap[String, List[Double]]]() override def open(parameters: Configuration): Unit = { super.open(parameters) // 获取全局变量 val p = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool] ProjectConfig.getConfig(p) val initConfig:ListBuffer[FdcConfig[IndicatorConfig]] = InitIndicatorOracle.OracleConfigList initConfig.foreach(kafkaDatas=> { kafkaDatas.`dataType` match { case "indicatorconfig" => addIndicatorConfigByAll(kafkaDatas) case _ => logger.warn(s"DriftIndicator job open no mach type: " + kafkaDatas.`dataType`) override def processElement(record: FdcData[IndicatorResult], ctx: KeyedBroadcastProcessFunction[String, FdcData[IndicatorResult], FdcConfig[IndicatorConfig], ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]]#ReadOnlyContext, out: Collector[ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]]): Unit = { try { record.`dataType` match { case "indicator" => val myDatas = record.datas val indicatorResult = myDatas.toJson.fromJson[IndicatorResult] val version = indicatorResult.controlPlanVersion.toString val indicatorKey = indicatorResult.indicatorId.toString val indicatorValueDouble = indicatorResult.indicatorValue.toDouble if (!this.driftIndicatorByAll.contains(indicatorKey)) { return val controlPlanIdMap = driftIndicatorByAll(indicatorKey) for(versionMap <- controlPlanIdMap.values) { breakable { if (!versionMap.contains(version)) { break out.collect(IndicatorList) case _ => logger.warn(s"job processElement no mach type") } catch { case ex: Exception => logger.warn("") override def processBroadcastElement(value: FdcConfig[IndicatorConfig], ctx: KeyedBroadcastProcessFunction[String, FdcData[IndicatorResult], FdcConfig[IndicatorConfig], ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]]#Context, out: Collector[ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]]): Unit = { value.`dataType` match { case "indicatorconfig" => addIndicatorConfigByAll(value) case _ => logger.warn(s"job processBroadcastElement no mach type: " + value.`dataType`)
InitIndicatorOracle
import com.hzw.fdc.util.ProjectConfig
import org.slf4j.{Logger, LoggerFactory}
import java.sql.{Connection, DriverManager}
import scala.collection.mutable.ListBuffer
object InitIndicatorOracle {
  private val logger: Logger = LoggerFactory.getLogger(classOf[FdcIndicatorConfigBroadcastProcessFunction])
  Class.forName("oracle.jdbc.driver.OracleDriver")
  lazy val Conn: Connection = DriverManager.getConnection(ProjectConfig.MAIN_FAB_CORE_ORACLE_URL,
    ProjectConfig.MAIN_FAB_CORE_ORACLE_USER, ProjectConfig.MAIN_FAB_CORE_ORACLE_PASSWORD)
  logger.warn("byPassConn start!!!")
  lazy val OracleConfigList: ListBuffer[FdcConfig[IndicatorConfig]] = initOracleConfig()
  // 初始化Oracle配置
  def initOracleConfig(): ListBuffer[FdcConfig[IndicatorConfig]] = {
    val indicatorConfigDataByAll =  ListBuffer[FdcConfig[IndicatorConfig]]()
    logger.warn("Oracle indicatorConfig start!!!")
    try {
      val byPass = Conn.prepareStatement(
          |select      OBC.*
          |                    , FCSOB.ID AS SINGLE_OCAP_ID
          |                    , FCSOB.ALARM_BYPASS_CONDITION_ID
          |                    , FCABC.CODE
          |                    , FCABC.NAME
          |                    , FCABC.HAS_PARAM
          |                    , FCOBP.SINGLE_OCAP_BYPASS_ID
          |                    , FCOBP.PARAM_INDEX
          |                    , FCOBP.PARAM_VALUE
          |        FROM        FDC_CONF_OCAP_BYPASS_CONDITION OBC
          |        LEFT JOIN   FDC_CONF_SINGLE_OCAP_BYPASS FCSOB
          |        ON          OBC.ID = FCSOB.BYPASS_CONDITION_ID
          |        AND         FCSOB.STATUS = 'ENABLED'
          |        LEFT JOIN   FDC_CONF_ALARM_BYPASS_CONDITION FCABC
          |        ON          FCSOB.ALARM_BYPASS_CONDITION_ID = FCABC.ID
          |        LEFT JOIN   FDC_CONF_OCAP_BYPASS_PARAMS FCOBP
          |        ON          FCSOB.ID = FCOBP.SINGLE_OCAP_BYPASS_ID
          |        WHERE       OBC.STATUS = 'ENABLED'
          |""".stripMargin
      val byPassResult = byPass.executeQuery()
      val poList = new ListBuffer[ByPassConditionPO]()
      while (byPassResult.next()) {
        poList.append(ByPassConditionPO(
          byPassResult.getLong("ID"),
          byPassResult.getString("BYPASS_NAME"),
          byPassResult.getBoolean("NOTIFICATION_SWITCH"),
          byPassResult.getBoolean("ACTION_SWITCH"),
          byPassResult.getLong("SINGLE_OCAP_ID"),
          byPassResult.getLong("ALARM_BYPASS_CONDITION_ID"),
          byPassResult.getString("CODE"),
          byPassResult.getString("NAME"),
          byPassResult.getBoolean("HAS_PARAM"),
          byPassResult.getLong("SINGLE_OCAP_BYPASS_ID"),
          byPassResult.getInt("PARAM_INDEX"),
          byPassResult.getString("PARAM_VALUE")
      logger.warn("Oracle indicatorConfig SIZE :" + count)
    } catch {
      case e: Exception => logger.warn("indicator load error" + e)
    } finally {
      Conn.close()
    indicatorConfigDataByAll
事务型处理
企业在日常业务运营过程中会用到各类应用,例如:客户管理管理软件、基于Web的应用等,这些应用系统通常都会设置独立的数据处理层(应用程序本身)和数据存储层(事务型数据库系统)。
这些应用通常会连接外部服务或实际用户,并持续处理诸如订单、邮件、网站点击等传入的数据。期间每处理一条事件,应用都会通过执行远程数据库系统的事务来读取或者更新状态,多个应用会共享同一个数据库系统,有时候还
				
一、Flink 专栏Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。4、Flik
flink原理解析中https://blog.csdn.net/aA518189/article/details/82908993,我们把flink的概念以及原理整体介绍了一遍,接下来这篇文章会具体的解析每个原理的具体实现和使用方式。 1.对齐(align) 首先谈到对齐,那一定是多处理。那对齐的是什么呢?其实对齐的是检查点中的Barri...
Flink基础原理及应用场景Flink简介Flink特点数据来源数据存储架构图基本组件介绍处理和批处理简介处理批处理Flink同时处理处理和批处理的原理Flink应用场景分析小结 Flink简介 Apache Flink是一个开源的分布式,高性能,高可用,准确的处理框架 主要由Java代码实现 支持实时处理和批处理(处理的一种),批数据只是数据的一个极限特例 Flink原生支持迭代...
apache flink 编者注:这是Apache Flink PMC成员 Fabian Hueske和Kostas Tzoumas的帖子 。 Fabian和Kostas也是数据工匠的联合创始人。 当今数据处理的很大一部分是对连续产生的数据进行的,例如,来自用户活动日志,Web日志,机器,传感器和数据库事务的数据。 到目前为止, 数据流技术在性能,正确性和可操作性等几个方面都缺乏,迫使用户...
Haisen大王: 整个并发标记周期将由初始标记(Initial Mark)、根分区扫描(Root Region Scanning)、并发标记(Concurrent Marking)、重新标记(Remark)、清除(Cleanup)几个阶段组成。 这几个阶段,算是 Mixed GC 的范围内,对吗 G1的mixed gc详细过程 Haisen大王: CASE 1、Young GC 可以单独执行; CASE 2、如果需要回收老年代,也是以 Young GC 开始,执行 Mixed GC 一系列步骤:初始标记、并发标记、...,最终标记,清理 是这样的吗