添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
<groupId> org . apache . flink </ groupId > <artifactId> flink - core </ artifactId > <version> $ { flink . version }</ version > </ dependency > <dependency> <groupId> org . apache . flink </ groupId > <artifactId> flink - streaming - java_2 . 11 </ artifactId > <version> $ { flink . version }</ version > </ dependency > <dependency> <groupId> org . apache . flink </ groupId > <artifactId> flink - connector - kafka - 0.11 _2 . 11 </ artifactId > <version> $ { flink . version }</ version > </ dependency > <dependency> <groupId> org . apache . flink </ groupId > <artifactId> flink - connector - filesystem_2 . 11 </ artifactId > <version> $ { flink . version }</ version > </ dependency >
##### 代码示例
Flink消费KafkaHDFS的简单demo代码
```java
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.time.ZoneId;
import java.util.Properties;
public class Kafka2Hdfs {
    public static void main(String[] args) throws Exception {
        //kafka 连接信息
        String bootstrap_servers = "kafka1:9092,kafka2:9092:kafka3:9092";
        String groupId = "test_group";
        String offset = "latest";
        String topic = "test-topic";
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrap_servers);
        properties.setProperty("group.id", groupId);
        properties.setProperty("auto.offset.reset", offset);
        // 创建Flink执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(256);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointInterval(10 * 60 * 1000);    //checkpoint间隔10min
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 创建 Kafka Source
        FlinkKafkaConsumer011<String> kafkaSource = new FlinkKafkaConsumer011(
                topic,
                new SimpleStringSchema(),
                properties);
        // 创建 HDFS Sink
        String filePath = "/user/bdms";
        // 文件滚动策略1 :  可指定文件滚动规则
        DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
                .builder()
                .withMaxPartSize(1024 * 1024 * 256)      // 设置每个文件的最大大小 ,默认是128M。这里设置为256M
                .withRolloverInterval(Long.MAX_VALUE)   // 滚动写入新文件的时间,默认60s。这里设置为无限大
                .withInactivityInterval(60 * 1000)      // 60s空闲,就滚动写入新的文件
                .build();
        // 文件滚动策略2 : 当checkpoint时,文件滚动
        OnCheckpointRollingPolicy rollingPolicy2 = OnCheckpointRollingPolicy.build();
        StreamingFileSink hdfsSink = StreamingFileSink
                .forRowFormat(new Path(filePath), new SimpleStringEncoder<String>("UTF-8"))  // 输出的文件是按行存储的
//                .forBulkFormat(new Path(filePath))     // 也可以将输出结果用 Parquet 等格式进行压缩存储
                .withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd", ZoneId.of("Asia/Shanghai")))   //分桶策略 默认"yyyy-MM-dd--HH"  这里设置按天分
//                .withBucketAssigner(new BasePathBucketAssigner())   //分桶策略: 不分桶,所有文件写到根目录;
                .withRollingPolicy(rollingPolicy)
                .withBucketCheckInterval(1000L) // 桶检查间隔,这里设置为1s
                .build();
        // 添加Source、Sink
        DataStreamSource<String> sourceStream = env.addSource(kafkaSource);
        sourceStream.addSink(hdfsSink);
        // 执行任务
        env.execute("Kafka2Hdfs");

作者:wangsong