添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
所有文档
menu
没有找到结果,请重新输入

百度流式计算 BSC

Flink自定义JAR作业

背景

BSC 产品支持用户提交FLINK自定义jar作业, 以读KAFKA写BOS为例 ,其具体步骤如下:

1. 开发作业

使用IDEA进行开发,项目管理使用maven,相关版本为

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <!--  指定相关依赖的版本号  -->
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.11.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scope.setting>provided</scope.setting>
    </properties>
    <dependencies>
        <!--  1、 bsc运行环境中包含 flink 核心依赖,所以下面涉及到的 flink 核心依赖无需打到项目jar中,
              在打包的时候需要指定scope为provided
        <!--  flink 核心依赖:flink-table-api-java  -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.setting}</scope>
        </dependency>
        <!--  2、 bsc运行环境中包含 flink 常用依赖,所以下面涉及到的 flink 其他依赖无需打到项目jar中,
              在打包的时候需要指定scope为provided
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.setting}</scope>
        </dependency>
        <!--  3、 bsc运行环境中不包含 flink connector依赖,所以下面涉及到的 flink connector 依赖需要打到项目jar中。
                   - bos相关的依赖无须额外引用,
                   - kafka必须使用0.10版本的connector和client
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>~~~~
        </dependency>
        <!--  4、 下面可以引用 flink 之外的一些依赖  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <!--  scala编译插件  -->
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!--  按照上述的逻辑,必须打fat-jar才能把所有依赖提交到bsc中,因此需要使用此打包插件 -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

demo代码

读kafka写bos的代码如下:

import lombok . Getter ; import lombok . Setter ; import org . apache . flink . api . common . functions . MapFunction ; import org . apache . flink . api . common . serialization . SimpleStringEncoder ; import org . apache . flink . api . common . typeinfo . TypeInformation ; import org . apache . flink . api . java . typeutils . RowTypeInfo ; import org . apache . flink . api . java . typeutils . TypeExtractor ; import org . apache . flink . configuration . Configuration ; import org . apache . flink . core . fs . Path ; import org . apache . flink . formats . json . JsonRowDeserializationSchema ; import org . apache . flink . runtime . state . filesystem . FsStateBackend ; import org . apache . flink . shaded . jackson2 . com . fasterxml . jackson . databind . ObjectMapper ; import org . apache . flink . streaming . api . CheckpointingMode ; import org . apache . flink . streaming . api . datastream . DataStream ; import org . apache . flink . streaming . api . environment . CheckpointConfig ; import org . apache . flink . streaming . api . environment . StreamExecutionEnvironment ; import org . apache . flink . streaming . api . functions . sink . DiscardingSink ; import org . apache . flink . streaming . api . functions . sink . filesystem . StreamingFileSink ; import org . apache . flink . streaming . api . functions . sink . filesystem . bucketassigners . DateTimeBucketAssigner ; import org . apache . flink . streaming . api . functions . sink . filesystem . rollingpolicies . DefaultRollingPolicy ; import org . apache . flink . streaming . api . operators . OneInputStreamOperator ; import org . apache . flink . streaming . connectors . kafka . FlinkKafkaConsumer010 ; import org . apache . flink . table . api . DataTypes ; import org . apache . flink . table . filesystem . stream . StreamingFileCommitter ; import org . apache . flink . table . filesystem . stream . StreamingFileCommitter . CommitMessage ; import org . apache . flink . table . filesystem . stream . StreamingFileWriter ; import org . apache . flink . table . types . DataType ; import org . apache . flink . table . types . utils . TypeConversions ; import org . apache . flink . types . Row ; import org . apache . kafka . clients . consumer . ConsumerConfig ; import org . slf4j . Logger ; import java . io . BufferedInputStream ; import java . io . File ; import java . io . FileOutputStream ; import java . io . IOException ; import java . io . OutputStream ; import java . net . URL ; import java . net . URLConnection ; import java . util . Arrays ; import java . util . Base64 ; import java . util . LinkedHashMap ; import java . util . List ; import java . util . Map ; import java . util . Properties ; import java . util . concurrent . TimeUnit ; import java . util . stream . Collectors ; import java . util . zip . ZipEntry ; import java . util . zip . ZipInputStream ; * 使用Flink DataStream接口完成读kafka,写bos的jar作业示例demo * 示例中: * - 读kafka使用SSL协议,用户需自行指定kafka的kafka client参数,并利用demo提供的文件下载、解析能力,读取配置文件和bos上的证书zip包 * - 从kafka读取的数据为json格式,需要用户自行指定schema * - 写bos进行了按照时间分桶的操作,以及生成commit文件 public class Kafka2Bos { // 启动日志处理 public static Logger logger = org . slf4j . LoggerFactory . getLogger ( Kafka2Bos . class ) ; // 用于解析json public static ObjectMapper objectMapper = new ObjectMapper ( ) ; public static void main ( String [ ] args ) throws Exception { // 1. 获取参数 * args(0) 为BSC为用户指定的checkpoint目录,无法更改。 * 如果用户想要使用SPARK的checkpoint功能,只能使用提供的args(0)作为checkpoint目录。 String checkpointLocation = args [ 0 ] ; * args[1] 为BSC提供的作业运行参数,以base64编码存储,解码后需要解析成Map再使用 * 参数格式: * key1=value1 * key2=value2 * 本代码中示例: * bootStrapServer=kafka.bj.baidubce.com:9092 * topic=test * bosEndpoint=https://bj.bcebos.com * ... Map < String , String > variables = new LinkedHashMap < > ( ) ; try { String kvStr = new String ( Base64 . getDecoder ( ) . decode ( args [ 1 ] ) ) ; Arrays . stream ( kvStr . split ( "\n" ) ) . forEach ( kv -> { String [ ] variable = kv . split ( "=" ) ; variables . put ( variable [ 0 ] , variable [ 1 ] ) ; } ) ; } catch ( Exception e ) { logger . error ( "decode job variables failed" , e ) ; throw new Exception ( e ) ; /** 为配置参数赋值 */ String sourceBootstrapServer = variables . get ( "bootStrapServer" ) ; String sourceTopic = variables . get ( "topic" ) ; String sourceGroupId = variables . get ( "groupId" ) ; String certBosUrl = variables . get ( "certBosUrl" ) ; String bosSink = variables . get ( "bosSink" ) ; String bosDatatimePattern = variables . get ( "bosDatatimePattern" ) ; // 2. create StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ; /** 配置一些运行参数,如检查点参数 */ env . setStateBackend ( new FsStateBackend ( checkpointLocation ) ) ; env . enableCheckpointing ( 1000 ) ; env . getCheckpointConfig ( ) . setCheckpointInterval ( 1000 ) ; env . getCheckpointConfig ( ) . setCheckpointingMode ( CheckpointingMode . AT_LEAST_ONCE ) ; env . getCheckpointConfig ( ) . enableExternalizedCheckpoints ( CheckpointConfig . ExternalizedCheckpointCleanup . RETAIN_ON_CANCELLATION ) ; // 3. 创建 kafka streaming Source // 设置 kafka consumer 参数,并读取 client.properties 中的配置 Properties sourceProp = new Properties ( ) ; sourceProp . setProperty ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , sourceBootstrapServer ) ; sourceProp . setProperty ( ConsumerConfig . GROUP_ID_CONFIG , sourceGroupId ) ; ClassLoader classLoader = Kafka2Bos . class . getClassLoader ( ) ; sourceProp . load ( classLoader . getResourceAsStream ( "client.properties" ) ) ; * 提取kafka value中的数据字段, 配置用户自定义schema * 使用RowTypeInfo的形式来配置所有的类型 * 原始value数据格式由用户上游kafka自定义,只需要能在以下逻辑中可以正确提取数据即可。 * 以json格式为例 * "stringtype": "lRAhSQgShKn77uD", * "longtype": 1199158871, * "floattype": 0.038981155578358462, * "binarytype": "null", * "integertype": 1, * "bytetype": -58, * "booleantype": true, * "doubletype": 439147658, * "shorttype": 13538 List < DataType > dataTypes = Arrays . asList ( DataTypes . STRING ( ) , DataTypes . BIGINT ( ) , DataTypes . FLOAT ( ) , DataTypes . BYTES ( ) , DataTypes . INT ( ) , DataTypes . TINYINT ( ) , DataTypes . BOOLEAN ( ) , DataTypes . DOUBLE ( ) , DataTypes . SMALLINT ( ) ) ; TypeInformation [ ] types = dataTypes . stream ( ) . map ( type -> TypeConversions . fromDataTypeToLegacyInfo ( type ) ) . collect ( Collectors . toList ( ) ) . toArray ( new TypeInformation [ dataTypes . size ( ) ] ) ; String [ ] filedNames = Arrays . asList ( "stringtype" , "longtype" , "floattype" , "binarytype" , "integertype" , "bytetype" , "booleantype" , "doubletype" , "shorttype" ) . toArray ( new String [ dataTypes . size ( ) ] ) ; RowTypeInfo typeInfo = new RowTypeInfo ( types , filedNames ) ; JsonRowDeserializationSchema schema = new JsonRowDeserializationSchema . Builder ( typeInfo ) . build ( ) ; /** 创建 FlinkKafkaConsumer,使用0.10版本 */ FlinkKafkaConsumer010 < Row > flinkKafkaConsumer = new FlinkKafkaConsumer010 < Row > ( sourceTopic , schema , sourceProp ) { @Override public void open ( Configuration configuration ) throws Exception { this . properties . putAll ( copySslFileAndGetLocalProperties ( sourceProp , certBosUrl ) ) ; super . open ( configuration ) ; flinkKafkaConsumer . setStartFromEarliest ( ) ; DataStream < Row > source = env . addSource ( flinkKafkaConsumer ) ; // 4. 进行一些数据处理pipeline // 此处举例为只提取需要的字段, 并转换json为字符串 * 输出数据全部字段,但有输出转储的只有如下三个,分别为: * "stringtype": "lRAhSQgShKn77uD-1", * "longtype": 1199158871, * "floattype": 0.038981155578358462 DataStream < String > operatorDataStream = source . map ( new MapFunction < Row , KafkaValue > ( ) { @Override public KafkaValue map ( Row row ) throws Exception { KafkaValue kafkaValue = new KafkaValue ( ) ; kafkaValue . setStringtype ( row . getField ( 0 ) . toString ( ) + "-" + row . getField ( 4 ) . toString ( ) ) ; kafkaValue . setLongtype ( ( long ) row . getField ( 1 ) ) ; kafkaValue . setFloattype ( ( float ) row . getField ( 2 ) ) ; return kafkaValue ; } ) . map ( new MapFunction < KafkaValue , String > ( ) { @Override public String map ( KafkaValue kafkaValue ) throws Exception { return objectMapper . writeValueAsString ( kafkaValue ) ; } ) ; // 5. 创建 bos streaming Sink // 这里使用了按照时间分桶的策略,并设置了滚动策略,配置了一些参数 StreamingFileSink . RowFormatBuilder bosStreamingFileSinkBuilder = StreamingFileSink . forRowFormat ( new Path ( bosSink ) , new SimpleStringEncoder < > ( ) ) . withBucketAssigner ( new DateTimeBucketAssigner ( bosDatatimePattern ) ) . withRollingPolicy ( DefaultRollingPolicy . builder ( ) . withRolloverInterval ( TimeUnit . MINUTES . toMillis ( 1 ) ) . withInactivityInterval ( TimeUnit . MINUTES . toMillis ( 1 ) ) . withMaxPartSize ( 128L * 1024L * 1024L ) . build ( ) ) . withBucketCheckInterval ( 10000 ) ; StreamingFileWriter fileWriter = new StreamingFileWriter ( bosStreamingFileSinkBuilder . getBucketCheckInterval ( ) , bosStreamingFileSinkBuilder ) ; // 设置了commit算子,用于标志该时间段数据已经写完 DataStream < StreamingFileCommitter . CommitMessage > writerStream = operatorDataStream . transform ( StreamingFileWriter . class . getSimpleName ( ) , TypeExtractor . createTypeInfo ( CommitMessage . class ) , ( OneInputStreamOperator ) fileWriter ) . setParallelism ( 1 ) ; writerStream . addSink ( new DiscardingSink ( ) ) . setParallelism ( 1 ) ; // 6. 启动执行 env . execute ( "flink-kafka-to-bos-jar-demo" ) ; * kafka value 对应的对象格式 @Getter @Setter public static class KafkaValue { private String stringtype ; private Long longtype ; private Float floattype ; private String binarytype ; private Integer integertype ; private Byte bytetype ; private Boolean booleantype ; private Double doubletype ; private Short shorttype ; * 从bos下载ssl证书压缩包,解压后加入到配置中 * 注意:需要先在bos上创建好对应的目录,并设置权限为公共读,并且传入的url应该为普通访问链接,而非CDN加速链接 * @param properties * @return private static Properties copySslFileAndGetLocalProperties ( Properties properties , String certBosUrl ) { String userDir = System . getProperty ( "user.dir" ) ; /** 尝试3次,如果失败则抛出异常 */ int i = 0 ; for ( ; i < 3 ; i ++ ) { try { FileUtil . downloadBosFileAndUnzip ( userDir , certBosUrl ) ; break ; } catch ( IOException e ) { logger . error ( "download bos file fail when try: {}" , i , e ) ; if ( i >= 3 ) { throw new RuntimeException ( "download bos file fail" ) ; properties . setProperty ( "ssl.truststore.location" , userDir + "/" + properties . getProperty ( "ssl.truststore.location" ) ) ; properties . setProperty ( "ssl.keystore.location" , userDir + "/" + properties . getProperty ( "ssl.keystore.location" ) ) ; logger . info ( "ssl.truststore.location: " + properties . getProperty ( "ssl.truststore.location" ) ) ; logger . info ( "ssl.keystore.location: " + properties . getProperty ( "ssl.keystore.location" ) ) ; return properties ; * bos文件下载工具类 public static class FileUtil { private static final String PATH_SEPARATOR = "/" ; * 通过url下载bos上的xx.tar.gz(设置为公共读权限), * 并解压至指定路径生成一个文件目录 * @param outputDir * @param url public static void downloadBosFileAndUnzip ( String outputDir , String url ) throws IOException { URLConnection connection = new URL ( url ) . openConnection ( ) ; ZipInputStream zipIn = null ; try { zipIn = new ZipInputStream ( new BufferedInputStream ( connection . getInputStream ( ) ) ) ; ZipEntry entry ; while ( ( entry = zipIn . getNextEntry ( ) ) != null ) { if ( entry . isDirectory ( ) ) { createDirectory ( outputDir , entry . getName ( ) ) ; } else { File tmpFile = new File ( outputDir + PATH_SEPARATOR + entry . getName ( ) ) ; OutputStream out = null ; try { out = new FileOutputStream ( tmpFile ) ; int length = 0 ; byte [ ] b = new byte [ 2048 ] ; while ( ( length = zipIn . read ( b ) ) != - 1 ) { out . write ( b , 0 , length ) ; } catch ( IOException ex ) { logger . error ( "write to {} fail" , tmpFile , ex ) ; } finally { if ( out != null ) { out . close ( ) ; } catch ( IOException ex ) { throw new IOException ( "解压归档文件出现异常" , ex ) ; } finally { try { if ( zipIn != null ) { zipIn . close ( ) ; } catch ( IOException ex ) { throw new IOException ( "关闭tarFile出现异常" , ex ) ; * 构建证书目录 * @param outputDir * @param subDir public static void createDirectory ( String outputDir , String subDir ) { File file = new File ( outputDir + PATH_SEPARATOR + subDir ) ; if ( ! file . exists ( ) ) { file . mkdir ( ) ;

执行 mvn clean package 命令之后,能编译出用于bsc运行的jar包 flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

2. 新增资源

进入BSC控制台,选择资源管理,点击 新增资源 按钮。资源类型需要选择为 JOB_FILE/JAR ,上传方式与jar包大小相关,可以选择bos上传或者本地上传。

上传完成之后效果如下:

3. 新增作业

打开BSC控制台,点击 新建作业 按钮,新建一个FLINK_STREAM/JAR作业如下图示例。

编辑作业参数

作业开发 的富文本编辑框中,配置SPARK jar的参数信息如示例:

main.jar = flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar ; -- 函数参数设置,必须以“main.args.”开头 main.args.bootStrapServer = kafka.gz.baidubce.com:9092 ; main.args.topic = topictopictopic ; main.args.groupId = groupgroupgroup ; main.args.certBosUrl = https://bucket.bj.bcebos.com/kafka-key.zip ; main.args.bosSink = bos://bucket/object ; main.args.bosDatatimePattern = yyyy-MM-dd--HH ;

引用jar资源

资源引用 栏中选择刚才上传的jar包,点击引用;并将资源详情中的 资源原名 作为作业参数main.jar的参数

保存发布作业

依次点击保存、发布按钮,将作业发布到作业运维列表。

4. 运行作业

切换到 作业运维 的详情页面,点击启动,选择相应的网络参数并申请启动资源,点击确认。如果使用了检查点,可以选择 从上次作业停止时间点 启动

启动之后,可以看到作业运行日志,但 jar作业不支持实时监控的查看

5. 更新作业

如果需要更新作业jar包,需要按照如下步骤执行:

  • 停止运行中的作业。
  • 在资源管理列表对相应的jar包发起"新增版本"操作。
  • 在作业开发页面对相应作业的资源引用执行"解绑-引用"操作。
  • 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
  • 重启启动作业。
  • 如果仅仅是需要修改作业参数,可以简化步骤为: