打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
数据湖治理篇-合并Iceberg小文件

  在建设准实时数据仓库时,由于列式存储具有较高的查询性能,因此,通常都采用ORC、Parquet数据格式,但是这种格式不能追加数据。而HDFS的数据块大小一般都是128MB或者256MB,如果等文件凑够一个HDFS Block大小再写入时,就会导致数据延迟增大。因此,难免产生一个非常常见但是很麻烦的问题,即HDFS小文件问题。过多的小文件会增加NameNode的压力,并且影响查询性能,所以我们在使用流式数据入库的之后,一般会对小文件进行合并处理。

  即使是使用Iceberg这种数据湖解决方案,也难免产生小文件。因此,Iceberg本身也想到了解决小文件的方案。

  如何每次就写入几条数据,Iceberg的每个分区在写文件的时候都会产生新的文件,这就导致底层文件系统里面产生了很多大小才几KB的文件。下面是我们在前面的演示环境中写了3次数据:

  -- 创建Iceberg表

  CREATE TABLE sensordata(

  sensor_id STRING,

  ts BIGINT,

  temperature DOUBLE,

  dt STRING

  ) USING iceberg

  PARTITIONED BY(dt);

  -- Append写入1条数据

  INSERT INTO sensordata VALUES('sensor_01',1635743301,-12.1,'2021-12-01');

  -- OverWrite写入一条数据

  INSERT OVERWRITE sensordata VALUES('sensor_02',1635743301,23.6,'2021-12-01');

  -- Append写入1条数据

  INSERT INTO sensordata VALUES('sensor_02',1638421701,-22.2,'2021-12-02');

  虽然每次仅仅写入1条数据,但是却产生了很多小文件。

  [bigdata@bigdata185 iceberg]$ tree

  .

  └── sensordata

  ├── data

  │ ├── dt=2021-12-01

  │ │ ├── 00000-0-275a936f-4d21-4a82-9346-bceac4381e6c-00001.parquet

  │ │ └── 00000-2-1189ac19-b488-4956-8de8-8dd96cd5920a-00001.parquet

  │ └── dt=2021-12-02

  │ └── 00000-1-cc4f552a-28eb-4ff3-a4fa-6c28ce6e5f79-00001.parquet

  └── metadata

  ├── 0dafa6f3-2dbd-4728-ba9b-af31a3416700-m0.avro

  ├── 2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m0.avro

  ├── 2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m1.avro

  ├── ad4cd65e-7351-4ad3-baaf-5e5bd99dc257-m0.avro

  ├── snap-232980156660427676-1-0dafa6f3-2dbd-4728-ba9b-af31a3416700.avro

  ├── snap-4599216928086762873-1-ad4cd65e-7351-4ad3-baaf-5e5bd99dc257.avro

  ├── snap-5874199297651146296-1-2b1fbd5a-6241-4f7d-a4a6-3121019b9afb.avro

  ├── v1tadata.json

  ├── v2tadata.json

  ├── v3tadata.json

  ├── v4tadata.json

  └── version-hint.text

  5 directories, 16 files

  [bigdata@bigdata185 iceberg]$ 

  当我们执行这个INSERT语句时,会发生以下过程:

  首先创建一个Parquet格式数据文件 - sensordata/data/dt=2021-12-01/00000-5-cbf2920c-3823-41a1-a612-04679b50a999-00001.parquet创建一个指向这个数据文件的清单文件 - sensordata/metadata/cd1171e3-d178-42d0-8f0d-634804f97a01-m0.avro创建指向该清单文件的清单列表文件 - sensordata/metadata/snap-2251043931717096659-1-cd1171e3-d178-42d0-8f0d-634804f97a01.avro基于当前的元数据文件(v1tadata.json)创建新的元数据文件,并通过新快照s1跟踪先前的快照s0,指向此清单列表文件 - sensordata/metadata/v2tadata.json最后,当前元数据指针的值version-hint.text在目录中自动更新,现在指向这个新的元数据文件(v2tadata.json)。

  当一个事务commit完成之后,会生成metadata.json和Manifest文件。

  优势就是数据以事务原子性的方式写入Iceberg表,但是不足恰好是,每次提交数据都要产生一次快照,这难免就产生的小文件。

  Iceberg使用v[number]tadata.json文件跟踪表元数据。对表的每次更改操作都会生成一个新的元数据文件以提供原子性。

  默认情况下,旧的元数据文件会保留历史记录。频繁提交的表,特别是在流作业写数据时,需要定期清理元数据文件。

  每张表的writetadata.delete-after-commit.enabled默认值为false,如果不设置为true,历史版本元数据就不会被删除。每张表最大的快照保留数writetadata.previous-versions-max,默认为100(可以看到100以内的每次Snapshot)。

  Iceberg每一次操作都会产生多个数据文件(metadata、data、snapshot),需要自行合并清理。

  通过

  org.apache.iceberg.actions.RewriteDataFiles来实现小文件合并时,如果仅仅对Iceberg表的数据进行小文件合并,但是不开启

  writetadata.delete-after-commit.enabled为true,历史不会被删除,开启后就会实现合并后清除历史文件。因此,建议设置合理的快照保存策略,

  writetadata.previous-versions-max(历史文件保留最大值为2,metadata的里面文件数则始终保持为3个)。

  # 启用提交后写入元数据删除

  writetadata.delete-after-commit.enabled=true

  # 配置保留历史数量(比如配置为2,则元数据和数据都保留2份历史数据和1份最新数据)

  writetadata.previous-versions-max=2

  CREATE TABLE sensordata_01(

  sensor_id STRING,

  ts BIGINT,

  temperature DOUBLE,

  dt STRING

  ) USING iceberg

  PARTITIONED BY(dt)

  TBLPROPERTIES ('writetadata.delete-after-commit.enabled'='true',

  'writetadata.previous-versions-max'='2');

  -- 必须在spark sql下执行,FlinkSQL不支持

  ALTER TABLE sensordata_01 SET TBLPROPERTIES ('writetadata.delete-after-commit.enabled'='true');

  ALTER TABLE sensordata_01 SET TBLPROPERTIES ('writetadata.previous-versions-max'='2');

  Iceberg跟踪表中的每个数据文件。更多的数据文件会导致更多的元数据存储在清单文件中,而小数据文件会导致不必要的元数据量和文件打开成本,从而降低查询数据的效率。

  Iceberg可以使用带有操作的Spark并行压缩数据文件rewriteDataFiles。这会将小文件组合成更大的文件,以减少元数据开销和运行时文件打开成本。

  Actions.forTable(table).rewriteDataFiles()

  .targetSizeInBytes(128 * 1024 * 1024) // 128 MB

  .execute()

  Iceberg可以使用带有操作的Flink并行压缩数据文件rewriteDataFiles。这会将小文件组合成更大的文件,以减少元数据开销和运行时文件打开成本。

  Actions.forTable(table)

  .rewriteDataFiles()

  .maxParallelism(1)

  .targetSizeInBytes(128 * 1024 * 1024) // 128MB

  .execute();

  Iceberg在其清单列表和清单文件中使用元数据来加快查询计划并修剪不必要的数据文件。元数据树用作表数据的索引。

  元数据树中的清单会按照它们添加的顺序自动压缩,当写入模式与读取过滤器对齐时,查询会更快。例如,在数据到达时写入每小时分区的数据与时间范围查询过滤器保持一致。

  当表的写入模式与查询模式不一致时,可以重写元数据以使用rewriteManifests操作将数据文件重新分组到清单中, Spark支持并行重写操作。

  table.rewriteManifests()

  .rewriteIf((file) -> file.length() < 32 * 1024 * 1024) // 32MB

  .clusterBy((file) -> file.partition().get(0, String.class))

  mit();

  table.rewriteManifests

  .rewriteIf((file)=> file.length < 32 * 1024 * 1024) // 32 MB

  .clusterBy((file)=> file.partition.get(0, classOf[String]))

  mit

  在每次向Iceberg表写数据时,都会创建一个新的办统招文凭快照,快照可以用于基于时间旅行查询,或者将表回滚到历史上的某一有效快照上。建议定期删除过期快照,以删除不再需要的数据文件,使表元数据的最小且可用。

  目前我们的应用场景只需要查询当前数据就可以了,不需要查询历史数据,所以我只保留了最新的快照。在每次压缩程序之后,做了处理,使当前快照时间以前的快照过期。程序会自动删除以前的过期数据文件。过期旧快照会将它们从元数据中删除,因此它们不再可用于时间旅行查询。而数据文件只有在不能被基于时间旅行查询之后,才会被删除。

  val snapshot=table.currentSnapshot

  if (snapshot !=null) {

  table.expireSnapshots.expireOlderThan(snapshot.timestampMillis)mit

  }

  过期的旧快照会将它们从元数据中删除,因此它们不再可用于时间旅行查询。

  // 6 清除5分钟前的历史快照

  Snapshot snapshot=table.currentSnapshot();

  long oldSnapshot=snapshot.timestampMillis() - TimeUnit.MINUTES.toMillis(5);

  if (snapshot !=null) {

  table.expireSnapshots().expireOlderThan(oldSnapshot)mit();

  }

  数据文件在不再被可用于时间旅行或回滚的快照引用之前不会被删除。定期过期的快照会删除未使用的数据文件。

  (1)使用Spark编写的合并Iceberg表小文件,是快照过期,删除不用的数据文件。

  package com.yunclass.iceberg.streaming

  import java.util

  import org.apache.hadoop.conf.Configuration

  import org.apache.iceberg.Table

  import org.apache.iceberg.actions.Actions

  import org.apache.iceberg.catalog.{Namespace, TableIdentifier}

  import org.apache.iceberg.expressions.Expressions

  import org.apache.iceberg.hadoop.HadoopCatalog

  import org.apache.iceberg.spark.SparkCatalog

  import org.apache.spark.SparkConf

  import org.apache.spark.sql.SparkSession

  object CombineTableFiles {

  def main(args: Array[String]): Unit={

  // 1 设置执行账号

  System.setProperty("HADOOP_USER_NAME", "bigdata")

  // 2 配置SparkSession

  val sparkConf=new SparkConf()

  .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

  .set(s"spark.sql.catalog.hadoop_catalog",classOf[SparkCatalog].getName)

  .set(s"spark.sql.catalog.hadoop_catalog.type","hadoop")

  .set(s"spark.sql.catalog.hadoop_catalog.warehouse","hdfs://bigdata185:9000/dw/iceberg")

  .setMaster("local[*]").setAppName("CombineTableFiles")

  sparkConf.set("spark.executorcessTreeMetrics.enabled","true")

  val sparkSession=SparkSession.builder().config(sparkConf).getOrCreate()

  // 3 获取TableLoader

  val conf=new util.HashMap[String, String]

  conf.put("type", "iceberg")

  conf.put("catalog-type", "hadoop")

  conf.put("warehouse", "hdfs://bigdata185:9000/dw/iceberg")

  val hadoopCatalog=new HadoopCatalog(new Configuration())

  hadoopCatalog.initialize("hadoop_catalog", conf)

  val identifier=TableIdentifier.of(Namespace.of("db"), "sensordata_01")

  val table=hadoopCatalog.loadTable(identifier)

  // 调用合并小文件方法

  // combineFiles(sparkSession, table)

  deleteSnapshot(table)

  }

  // 合并小文件

  def combineFiles(sparkSession: SparkSession, table: Table): Unit={

  Actions.forTable(sparkSession, table).rewriteDataFiles()

  .filter(Expressions.equal("dt", "2022-01-20"))

  .targetSizeInBytes(128 * 1024 * 1024)

  .execute()

  // 重新manifest文件

  table.rewriteManifests()

  .rewriteIf((file)=> file.length() < 28 * 1024 * 1024)

  .clusterBy((file)=> file.partition().get(0, classOf[String]))

  mit()

  }

  // 删除快照信息

  def deleteSnapshot(table: Table): Unit={

  val snapshot=table.currentSnapshot()

  val oldSnapshot=snapshot.timestampMillis()

  if (snapshot !=null) {

  table.expireSnapshots().expireOlderThan(oldSnapshot)mit()

  }

  }

  }

  (2)合并小文件之前的数据状态

  

  (3)合并之后的状态,将小于128MB的文件,合并成大数据文件。

  

  (1)使用Flink编写的合并Iceberg表小文件,是快照过期,删除不用的数据文件。

  package com.yunclass.iceberg.streaming;

  import com.sun.javafx.fxml.expression.Expression;

  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

  import org.apache.flink.table.api.Expressions;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.iceberg.Snapshot;

  import org.apache.iceberg.Table;

  import org.apache.iceberg.catalog.Catalog;

  import org.apache.iceberg.catalog.Namespace;

  import org.apache.iceberg.catalog.TableIdentifier;

  import org.apache.iceberg.flink.CatalogLoader;

  import org.apache.iceberg.flink.actions.Actions;

  import java.util.HashMap;

  import java.util.Map;

  public class CombineTableFileDemo {

  public static void main(String[] args) {

  // 1 设置执行用户

  System.setProperty("HADOOP_USER_NAME", "bigdata");

  // 2 获取Flink执行环境

  StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

  env.setParallelism(1);

  // 3 使用Hadoop Catalog模式加载Iceberg表

  Map icebergMap=new HashMap<>();

  icebergMap.put("type", "iceberg");

  icebergMap.put("catalog-type", "hadoop");

  icebergMap.put("property-version", "1");

  icebergMap.put("warehouse", "hdfs://bigdata185:9000/dw/iceberg");

  // 获取catalogLoader

  CatalogLoader hadoopCatalog=CatalogLoader.hadoop("hadoop_catalog", new Configuration(), icebergMap);

  Catalog catalog=hadoopCatalog.loadCatalog();

  TableIdentifier tableIdentifier=TableIdentifier.of(Namespace.of("db"), "sensordata_01");

  Table table=catalog.loadTable(tableIdentifier);

  // 调用方法

  // combineFiles(env, table);

  deleteOldSnapshot(table);

  }

  // 合并小文件

  private static void combineFiles(StreamExecutionEnvironment env, Table table) {

  Actions.forTable(env, table)

  .rewriteDataFiles()

  .maxParallelism(1)

  .targetSizeInBytes(128 * 1024 * 1024)

  .execute();

  // 重写Manifest文件

  table.rewriteManifests()

  .rewriteIf((file) -> file.length() < 32 * 1024 * 1024)

  .clusterBy((file) -> file.partition().get(0, String.class))

  mit();

  }

  // 删除过期快照

  private static void deleteOldSnapshot(Table table) {

  Snapshot snapshot=table.currentSnapshot();

  long oldSnapshot=snapshot.timestampMillis();

  if (snapshot !=null) {

  table.expireSnapshots().expireOlderThan(oldSnapshot)mit();

  }

  }

  }

  (2)合并小文件之前的数据状态

  

  (3)合并之后的状态,将小于32MB的文件,合并成大Manifest文件。

  

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
为什么我选择Apache Iceberg
使用 Iceberg on Kubernetes 打造新一代云原生数据湖
Apache Iceberg 表有哪些性能优化方式
Hadoop框架介绍
深度对比Delta、Iceberg和Hudi三大开源数据湖方案
数据系统架构的演变
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服