解决flink问题小技巧: 一般使用flink中出现了问题, 可以在Apache Flink 中文用户邮件列表:
http://apache-flink.147419.n8.nabble.com/
找一下, 是不是有人也遇到过这样的问题
两种方案:
1: 通过state先缓存收到的数据;然后等到广播流到达后再进行处理
2: 先在正常数据处理流的open方法中初始化访问一次配置,后续配置变更时再去使用广播中的数据对配置进行更新
这里我采用的是第二种方案:
比如说flink 中的 connect
streamData.keyBy(d => {
}).connect(testConfigBroadcastStream)
.process(new testProcessFunction())
.name("connect")
我会在process 中的testProcessFunction类的open方法中初始化访问一次配置
class testProcessFunction extends
KeyedBroadcastProcessFunction[String, (String, String, String), mutable.Map[String, (String, String)], (String, String)] {
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 初始化访问一下配置
主要实现方案:
历史的配置信息是从oracle读取, 增量的配置信息从kafka获取; 为了保证广播流比数据流先到,是在testProcessFunction类的open初始化的时候就去读取oracle配置信息。成功后配置缓存到concurrent.TrieMap中。
testProcessFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.util.Collector
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.ListBuffer
import scala.collection.{concurrent, mutable}
import scala.util.control.Breaks._
class testProcessFunctionextends KeyedBroadcastProcessFunction[String, FdcData[IndicatorResult],
FdcConfig[IndicatorConfig], ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]] with IndicatorCommFunction {
private val logger: Logger = LoggerFactory.getLogger(classOf[FdcDriftConfigBroadcastProcessFunction])
// 配置 {"indicatorid": {"controlPlanId": {"version": { "X" : "IndicatorConfig"}}}}
var driftIndicatorByAll: TrieMap[String, TrieMap[String, TrieMap[String, TrieMap[String, IndicatorConfig]]]] = IndicatorByAll
//数据缓存:
var driftValue = new mutable.HashMap[String, mutable.HashMap[String, List[Double]]]()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 获取全局变量
val p = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
ProjectConfig.getConfig(p)
val initConfig:ListBuffer[FdcConfig[IndicatorConfig]] = InitIndicatorOracle.OracleConfigList
initConfig.foreach(kafkaDatas=> {
kafkaDatas.`dataType` match {
case "indicatorconfig" =>
addIndicatorConfigByAll(kafkaDatas)
case _ => logger.warn(s"DriftIndicator job open no mach type: " + kafkaDatas.`dataType`)
override def processElement(record: FdcData[IndicatorResult], ctx: KeyedBroadcastProcessFunction[String, FdcData[IndicatorResult],
FdcConfig[IndicatorConfig], ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]]#ReadOnlyContext,
out: Collector[ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]]): Unit = {
try {
record.`dataType` match {
case "indicator" =>
val myDatas = record.datas
val indicatorResult = myDatas.toJson.fromJson[IndicatorResult]
val version = indicatorResult.controlPlanVersion.toString
val indicatorKey = indicatorResult.indicatorId.toString
val indicatorValueDouble = indicatorResult.indicatorValue.toDouble
if (!this.driftIndicatorByAll.contains(indicatorKey)) {
return
val controlPlanIdMap = driftIndicatorByAll(indicatorKey)
for(versionMap <- controlPlanIdMap.values) {
breakable {
if (!versionMap.contains(version)) {
break
out.collect(IndicatorList)
case _ => logger.warn(s"job processElement no mach type")
} catch {
case ex: Exception => logger.warn("")
override def processBroadcastElement(value: FdcConfig[IndicatorConfig], ctx: KeyedBroadcastProcessFunction[String, FdcData[IndicatorResult],
FdcConfig[IndicatorConfig], ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]]#Context,
out: Collector[ListBuffer[(IndicatorResult, Boolean, Boolean, Boolean)]]): Unit = {
value.`dataType` match {
case "indicatorconfig" => addIndicatorConfigByAll(value)
case _ => logger.warn(s"job processBroadcastElement no mach type: " + value.`dataType`)
InitIndicatorOracle
import com.hzw.fdc.util.ProjectConfig
import org.slf4j.{Logger, LoggerFactory}
import java.sql.{Connection, DriverManager}
import scala.collection.mutable.ListBuffer
object InitIndicatorOracle {
private val logger: Logger = LoggerFactory.getLogger(classOf[FdcIndicatorConfigBroadcastProcessFunction])
Class.forName("oracle.jdbc.driver.OracleDriver")
lazy val Conn: Connection = DriverManager.getConnection(ProjectConfig.MAIN_FAB_CORE_ORACLE_URL,
ProjectConfig.MAIN_FAB_CORE_ORACLE_USER, ProjectConfig.MAIN_FAB_CORE_ORACLE_PASSWORD)
logger.warn("byPassConn start!!!")
lazy val OracleConfigList: ListBuffer[FdcConfig[IndicatorConfig]] = initOracleConfig()
// 初始化Oracle配置
def initOracleConfig(): ListBuffer[FdcConfig[IndicatorConfig]] = {
val indicatorConfigDataByAll = ListBuffer[FdcConfig[IndicatorConfig]]()
logger.warn("Oracle indicatorConfig start!!!")
try {
val byPass = Conn.prepareStatement(
|select OBC.*
| , FCSOB.ID AS SINGLE_OCAP_ID
| , FCSOB.ALARM_BYPASS_CONDITION_ID
| , FCABC.CODE
| , FCABC.NAME
| , FCABC.HAS_PARAM
| , FCOBP.SINGLE_OCAP_BYPASS_ID
| , FCOBP.PARAM_INDEX
| , FCOBP.PARAM_VALUE
| FROM FDC_CONF_OCAP_BYPASS_CONDITION OBC
| LEFT JOIN FDC_CONF_SINGLE_OCAP_BYPASS FCSOB
| ON OBC.ID = FCSOB.BYPASS_CONDITION_ID
| AND FCSOB.STATUS = 'ENABLED'
| LEFT JOIN FDC_CONF_ALARM_BYPASS_CONDITION FCABC
| ON FCSOB.ALARM_BYPASS_CONDITION_ID = FCABC.ID
| LEFT JOIN FDC_CONF_OCAP_BYPASS_PARAMS FCOBP
| ON FCSOB.ID = FCOBP.SINGLE_OCAP_BYPASS_ID
| WHERE OBC.STATUS = 'ENABLED'
|""".stripMargin
val byPassResult = byPass.executeQuery()
val poList = new ListBuffer[ByPassConditionPO]()
while (byPassResult.next()) {
poList.append(ByPassConditionPO(
byPassResult.getLong("ID"),
byPassResult.getString("BYPASS_NAME"),
byPassResult.getBoolean("NOTIFICATION_SWITCH"),
byPassResult.getBoolean("ACTION_SWITCH"),
byPassResult.getLong("SINGLE_OCAP_ID"),
byPassResult.getLong("ALARM_BYPASS_CONDITION_ID"),
byPassResult.getString("CODE"),
byPassResult.getString("NAME"),
byPassResult.getBoolean("HAS_PARAM"),
byPassResult.getLong("SINGLE_OCAP_BYPASS_ID"),
byPassResult.getInt("PARAM_INDEX"),
byPassResult.getString("PARAM_VALUE")
logger.warn("Oracle indicatorConfig SIZE :" + count)
} catch {
case e: Exception => logger.warn("indicator load error" + e)
} finally {
Conn.close()
indicatorConfigDataByAll
事务型处理
企业在日常业务运营过程中会用到各类应用,例如:客户管理管理软件、基于Web的应用等,这些应用系统通常都会设置独立的数据处理层(应用程序本身)和数据存储层(事务型数据库系统)。
这些应用通常会连接外部服务或实际用户,并持续处理诸如订单、邮件、网站点击等传入的数据。期间每处理一条事件,应用都会通过执行远程数据库系统的事务来读取或者更新状态,多个应用会共享同一个数据库系统,有时候还
一、Flink 专栏Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。4、Flik
在flink原理解析中https://blog.csdn.net/aA518189/article/details/82908993,我们把flink的概念以及原理整体介绍了一遍,接下来这篇文章会具体的解析每个原理的具体实现和使用方式。
1.流对齐(align)
首先谈到流对齐,那一定是多流处理。那对齐的是什么呢?其实对齐的是检查点中的Barri...
Flink基础原理及应用场景Flink简介Flink特点数据来源数据存储架构图基本组件介绍流处理和批处理简介流处理批处理Flink同时处理流处理和批处理的原理Flink应用场景分析小结
Flink简介
Apache Flink是一个开源的分布式,高性能,高可用,准确的流处理框架
主要由Java代码实现
支持实时流处理和批处理(流处理的一种),批数据只是流数据的一个极限特例
Flink原生支持迭代...
apache flink
编者注:这是Apache Flink PMC成员 Fabian Hueske和Kostas Tzoumas的帖子 。 Fabian和Kostas也是数据工匠的联合创始人。
当今数据处理的很大一部分是对连续产生的数据进行的,例如,来自用户活动日志,Web日志,机器,传感器和数据库事务的数据。 到目前为止, 数据流技术在性能,正确性和可操作性等几个方面都缺乏,迫使用户...
Haisen大王:
G1的mixed gc详细过程
Haisen大王: