[离线计算-Spark|Hive] HDFS小文件处理
背景
HDFS 小文件过多会对hadoop 扩展性以及稳定性造成影响, 因为要在namenode 上存储维护大量元信息.
大量的小文件也会导致很差的查询分析性能,因为查询引擎执行查询时需要进行太多次文件的打开/读取/关闭.
小文件解决思路
通常能想到的方案就是通过Spark API 对文件目录下的小文件进行读取,然后通过Spark的算子repartition操作进行合并小文件,repartition 分区数通过输入文件的总大小和期望输出文件的大小通过预计算而得。
总体流程如下:
该方案适合针对已发现有小文件问题,然后对其进行处理. 下面介绍下hudi是如何实现在写入时实现对小文件的智能处理.
Hudi小文件处理
Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用
在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小
hudi 小文件处理流程:
每次写入都会遵循此过程,以确保Hudi表中没有小文件。
核心代码:
写入文件分配:
org.apache.hudi.table.action.commit.UpsertPartitioner#assignInserts
//获取分区路径
Set<String> partitionPaths = profile.getPartitionPaths();
//根据先前提交期间写入的记录获取平均记录大小。用于估计有多少记录打包到一个文件中。
long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),config);
LOG.info("AvgRecordSize => " + averageRecordSize);
//获取每个分区文件路径下小文件
Map<String, List<SmallFile>> partitionSmallFilesMap =
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);
for (String partitionPath : partitionPaths) {
List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
//未分配的写入记录
long totalUnassignedInserts = pStat.getNumInserts();
for (SmallFile smallFile : smallFiles) {
//hoodie.parquet.max.file.size 数据文件最大大小,Hudi将试着维护文件大小到该指定值
//算出数据文件大小 - 小文件 就是剩余可以写入文件大小, 除以平均记录大小就是插入的记录行数
long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts);
//分配记录到小文件中
if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
// create a new bucket or re-use an existing bucket
int bucket;
if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
bucket = updateLocationToBucket.get(smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
} else {
bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
bucketNumbers.add(bucket);
recordsPerBucket.add(recordsToAppend);
//减去已经分配的记录数
totalUnassignedInserts -= recordsToAppend;
//如果记录没有分配完
if (totalUnassignedInserts > 0) {
//hoodie.copyonwrite.insert.split.size 每个分区条数
long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
//是否自动计算每个分区条数
if (config.shouldAutoTuneInsertSplits()) {
insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
//计算要创建的bucket
int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
for (int b = 0; b < insertBuckets; b++) {
bucketNumbers.add(totalBuckets);
if (b == insertBuckets - 1) {
//针对最后一个buket处理,就是写完剩下的记录
recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
} else {
recordsPerBucket.add(insertRecordsPerBucket);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.INSERT;
bucketInfo.partitionPath = partitionPath;
bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
}
获取每个分区路径下小文件: org.apache.hudi.table.action.commit.UpsertPartitioner#getSmallFiles
if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
for (HoodieBaseFile file : allFiles) {
//获取小于 hoodie.parquet.small.file.limit 参数值就为小文件
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = file.getFileSize();