佛山建设工程交易中心网站,阿里 wordpress,小程序招商加盟,昆明cms建站模板Spark与Iceberg集成落地实践#xff08;一#xff09; 文章目录 Spark与Iceberg集成落地实践#xff08;一#xff09;清理快照与元数据配置表维度自动清理元数据文件属性手动清理 清理孤岛文件合并数据文件 清理快照与元数据
配置表维度自动清理元数据文件属性
每一次写…Spark与Iceberg集成落地实践一 文章目录 Spark与Iceberg集成落地实践一清理快照与元数据配置表维度自动清理元数据文件属性手动清理 清理孤岛文件合并数据文件 清理快照与元数据
配置表维度自动清理元数据文件属性
每一次写入数据和表变更都会进行一次元数据的版本迭代默认保存所有。
PropertyDescriptionwrite.metadata.delete-after-commit.enabled每次表提交后是否删除旧的跟踪的元数据文件write.metadata.previous-versions-max要保留的旧元数据文件的数量
SPARK DDL语句
建表时确认metadata生命周期 sparkSession.sql(CREATE TABLE local.iceberg_db.table2( id bigint, data string, ts timestamp) USING iceberg PARTITIONED BY (day(ts)) TBLPROPERTIES(write.metadata.delete-after-commit.enabledtrue,write.metadata.previous-versions-max3));
更改表的metadata生命周期 sparkSession.sql(ALTER TABLE local.iceberg_db.table2 SET TBLPROPERTIES( write.metadata.delete-after-commit.enabledtrue, write.metadata.previous-versions-max3 ));作用
这只会删除元数据日志中跟踪的元数据文件而不会删除孤立的元数据文件。
清理从metadata.json链路开始的至data的所有文件如下图 #mermaid-svg-cAHGuo6n6e32TzLQ {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-cAHGuo6n6e32TzLQ .error-icon{fill:#552222;}#mermaid-svg-cAHGuo6n6e32TzLQ .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-cAHGuo6n6e32TzLQ .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-cAHGuo6n6e32TzLQ .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-cAHGuo6n6e32TzLQ .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-cAHGuo6n6e32TzLQ .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-cAHGuo6n6e32TzLQ .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-cAHGuo6n6e32TzLQ .marker{fill:#333333;stroke:#333333;}#mermaid-svg-cAHGuo6n6e32TzLQ .marker.cross{stroke:#333333;}#mermaid-svg-cAHGuo6n6e32TzLQ svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-cAHGuo6n6e32TzLQ .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-cAHGuo6n6e32TzLQ .cluster-label text{fill:#333;}#mermaid-svg-cAHGuo6n6e32TzLQ .cluster-label span{color:#333;}#mermaid-svg-cAHGuo6n6e32TzLQ .label text,#mermaid-svg-cAHGuo6n6e32TzLQ span{fill:#333;color:#333;}#mermaid-svg-cAHGuo6n6e32TzLQ .node rect,#mermaid-svg-cAHGuo6n6e32TzLQ .node circle,#mermaid-svg-cAHGuo6n6e32TzLQ .node ellipse,#mermaid-svg-cAHGuo6n6e32TzLQ .node polygon,#mermaid-svg-cAHGuo6n6e32TzLQ .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-cAHGuo6n6e32TzLQ .node .label{text-align:center;}#mermaid-svg-cAHGuo6n6e32TzLQ .node.clickable{cursor:pointer;}#mermaid-svg-cAHGuo6n6e32TzLQ .arrowheadPath{fill:#333333;}#mermaid-svg-cAHGuo6n6e32TzLQ .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-cAHGuo6n6e32TzLQ .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-cAHGuo6n6e32TzLQ .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-cAHGuo6n6e32TzLQ .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-cAHGuo6n6e32TzLQ .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-cAHGuo6n6e32TzLQ .cluster text{fill:#333;}#mermaid-svg-cAHGuo6n6e32TzLQ .cluster span{color:#333;}#mermaid-svg-cAHGuo6n6e32TzLQ div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-cAHGuo6n6e32TzLQ :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 数据层 元数据层 data file1 data file2 data file3 data file4 v2.metadata.json Manifest list1 Manifest file1 Manifest file2 手动清理 org.apache.iceberg.Table table org.apache.iceberg.spark.Spark3Util.loadIcebergTable(spark, local.iceberg_db.table2);long tsToExpire System.currentTimeMillis() - (1000 * 60 * 60 * 24); // 保留一天org.apache.iceberg.spark.actions.SparkActions.get().expireSnapshots(table).expireOlderThan(tsToExpire).execute();清理孤岛文件
孤岛文件的产生
在 Spark 和其他分布式处理引擎中任务或作业失败可能会留下未被表元数据引用的文件在某些情况下正常快照过期可能无法确定文件不再需要并将其删除。任务失败之后最好进行一次清理表孤岛文件若表相关任务成功则不需要进行清理孤岛文件操作。 org.apache.iceberg.Table table org.apache.iceberg.spark.Spark3Util.loadIcebergTable(spark, local.iceberg_db.table2);org.apache.iceberg.spark.actions.SparkActions.get().deleteOrphanFiles(table).execute();合并数据文件
目前发现需要分区类有标记删除的记录才会进行合并why? org.apache.iceberg.Table table org.apache.iceberg.spark.Spark3Util.loadIcebergTable(spark, local.iceberg_db.table2);org.apache.iceberg.spark.actions.SparkActions.get().rewriteDataFiles(table).filter(Expressions.equal(ts, 2024-09-29)).option(target-file-size-bytes, Long.toString(500 * 1024 * 1024)) // 目标大小500 MB.execute();