辽阳哪里做网站,网页与网站设计说明,企业建设电商网站,seo公司怎么推广宣传背景
Spark 3.5.0 目前 Spark中的实现中#xff0c;对于多分区的写入默认会先排序#xff0c;这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。
分析
这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中…背景
Spark 3.5.0 目前 Spark中的实现中对于多分区的写入默认会先排序这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。
分析
这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中的dataWriter的选择; 还有一个是Spark中为什么会加上Sort 这三部分是需要结合在一起分析讨论的
V1Writes规则的重改
直接转到代码部分:
object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] val newQuery prepareQuery(write, write.query)val attrMap AttributeMap(write.query.output.zip(newQuery.output))val writeFiles WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) a.withExprId(attrMap(a).exprId)}val newWrite write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}其中 prepareQuery是对满足条件的计划前加上Sort逻辑排序其中prepareQuery关键的代码如下: val requiredOrdering write.requiredOrdering.map(_.transform {case a: Attribute attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering empty2NullPlan.outputOrderingval orderingMatched isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global false, empty2NullPlan)}write.requiredOrdering中涉及到的类为InsertIntoHadoopFsRelationCommand和InsertIntoHiveTable,且这两个物理计划中的requiredOrdering实现都是
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)getSortOrder方法关键代码如下 val sortColumns V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters 0 sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns writerBucketSpec.map(_.bucketIdExpression) sortColumns).map(SortOrder(_, Ascending))}所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0默认值为0则会加上Sort逻辑计划具体的实现可以参考SPARK-37287 如果spark.sql.maxConcurrentOutputFileWriters为0默认值为0且 sortColumns为空(大部分情况下为空除非建表是partition加上bucket),则不会加上Sort逻辑计划
FileFormatWriter 中的dataWriter的选择
在 InsertIntoHadoopFsRelationCommand和InsertIntoHiveTable 这两个物理计划中最终写入文件/数据的时候会调用到FileFormatWriter.write方法这里有个concurrentOutputWriterSpecFunc函数变量的设置 val concurrentOutputWriterSpecFunc (plan: SparkPlan) {val sortPlan createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec WriteFilesSpec(description description,committer committer,concurrentOutputWriterSpecFunc concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)设置concurrentOutputWriterSpecFunc的代码如下 private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] {val maxWriters sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled maxWriters 0 sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () sortPlan.createSorter()))} else {None}}如果 spark.sql.maxConcurrentOutputFileWriters为0默认值为0,则ConcurrentOutputWriterSpec为None 如果 spark.sql.maxConcurrentOutputFileWriters大于0且 sortColumns为空(大部分情况下为空除非建表是partition加上bucket)则为Some(ConcurrentOutputWriterSpec(maxWriters, () sortPlan.createSorter())
其中executeWrite会调用WriteFilesExec.doExecuteWrite方法,从而调用FileFormatWriter.executeTask,这里就涉及到dataWriter选择 val dataWriter if (sparkPartitionId ! 0 !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter,默认情况下为DynamicPartitionDataSingleWriter 否则就会为DynamicPartitionDataConcurrentWriter 这两者的区别见下文
Spark中为什么会加上Sort
至于Spark在写入文件的时候会加上Sort这个是跟写入的实现有关的也就是DynamicPartitionDataSingleWriter和DynamicPartitionDataConcurrentWriter的区别:
DynamicPartitionDataSingleWriter 在任何时刻只有一个writer在写文件这能保证写入的稳定性不会在写入文件的时候消耗大量的内存但是速度会慢DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件能加快写入文件的速度但是因为多个文件的同时写入可能会导致OOM
对于DynamicPartitionDataSingleWriter 会根据partition或者bucket作为最细粒度来作为writer的标准如果相邻的两条记录所属不同的partition或者bucket则会切换writer所以说如果不根据partition或者bucket排序的话会导致writer频繁的切换这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。
参考
[SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter[SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort