最近有需求需要实时监控HDFS指定目录,然后对新增文件的每一行进行处理。
采用了Flink DataStream API的
readFile
方法来实现。
文件都是
.gz
结尾的,也会存在脏文件进来,所以需要对文件名进行过滤,于是就用
FilePathFilter
。
在不过滤之前,可以正常监听目录;但是加了过滤后就不能监听了。
纳闷了好一会,后来点进去仔细看了看源码,才发现是使用
FilePathFilter
的姿势有问题。
这里的
FilePathFilter
居然是用来过滤目录的。。。实现代码如下:
代码摘自
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, Path path) throws IOException {
final FileStatus[] statuses;
try {
statuses = fileSystem.listStatus(path);
} catch (IOException e) {
if (statuses == null) {
} else {
Map<Path, FileStatus> files = new HashMap<>();
for (FileStatus status : statuses) {
if (!status.isDir()) {
Path filePath = status.getPath();
long modificationTime = status.getModificationTime();
if (!shouldIgnore(filePath, modificationTime)) {
files.put(filePath, status);
} else if (format.getNestedFileEnumeration() && format.acceptFile(status)){
files.putAll(listEligibleFiles(fileSystem, status.getPath()));
return files;
public boolean acceptFile(FileStatus fileStatus) {
final String name = fileStatus.getPath().getName();
return !name.startsWith("_")
&& !name.startsWith(".")
&& !filesFilter.filterPath(fileStatus.getPath());
源码中FilePathFilter
类的filterPath
中也确实写了是过滤目录的:
// org.apache.flink.api.common.io.FilePathFilter
// 如果在处理目录时,需要忽略给定的filePath,则返回true
public abstract boolean filterPath(Path filePath);
所以是不支持过滤文件名的,只支持过滤目录名。
我最开始是过滤了.gz结尾的,由于没有一个目录是以.gz结尾的,导致一个文件都扫描不到。。。
- Flink实时监控目录下的文件,官方提供的Source不支持过滤文件名,需要我们自己实现
- Flink监控文件这块比Spark鸡肋很多,Spark Streaming这块实现还不错
String fileNameTag=".csv";
TextInputFormat format = new TextInputFormat(new Path(filePath));
Configuration configuration = new Configuration();
configuration.setBoolean("recursive.file.enumeration", true);
// 设置递归获取文件
读写文件1 读取文件-readFile2 写入到文件-StreamingFileSink 2.1 在了解-StreamingFileSink之前你需要了解的知识点 2.1.1 结论 2.2 行编码 2.2.1 行编码自定义-BucketAssigner 2.3 批量编码 2.3.1 批量编码自定义-BucketAssigner
1 读取文件-readFile
Q:什么是文件数据源?
A:Apache Flink提供了一个可重置的数据源连接器,支持将
Flink读取文件目录:
因为目录下的文件可能会不断新增,在新增过程中文件处于传输阶段
会出现比如01.data文件正在上传,在hdfs中显示的是01.data._COPYING_文件,只有真正上传完成后才能读取,而不设置过滤器的话就会报错,会提示._COPYING_文件不存在,所以像这样的临时文件需要我们过滤掉, 目前默认过滤器已经满足了我们的需求:方案如下
* 2.流处理: 监听并读取hdfs文件夹目录下的所有文件
* @throws Exception
看到这些需求,我们可能想到spark都支持的不错,Flink支持的怎么样呢?本篇文章详细介绍一下Flink如何实现,递归,正则表达式等方式读取hdfs的目录。
Flink递归读取hdfs上多路径文件
比如,读取如下dat...
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import java.text.ParseException;
import java.text.SimpleDa
1.setStartFromEarliest不起作用
在IDEA中调试,消费Kafka的数据,然后发现setStartFromEarliest不起作用,Consumer显示默认的offset还是latest。通过Con...
Flink基于模型,内置了很多强大功能的算子,可以帮助我们快速开发应用程序。作为Flink开发老手,大多算子的写法和场景想来已是了然于胸,但是使用过程常常会有一些小小的问题:工欲善其事,必先利其器!快速高效的使用合适的算子开发程序,往往可以达到事半功倍的效果。想着这个道理,特此整理一份常见的Flink算子!!也作为自己的工作笔记。欢迎大家收藏~Flink 让用户灵活且高效编写Flink流式程序。...
Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。
DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据
在flink中针对读取csv文件的输出可以有3种格式,都是通过引用inputFormat来控制的,分别为 PojoCsvInputFormat输出类型为pojo, RowCsvInputFormat输出类型为Row, TupleCsvInputFormat输出类型为Tuple。本例子就用RowCsvInputFormat。
可以进入到RowCsvInputFormat 看看其构造函数都有哪些
public RowCsvInputFormat(
Path filePath,
TypeInforma
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.