当前位置: 首页 > news >正文

辽阳哪里做网站网页与网站设计说明

辽阳哪里做网站,网页与网站设计说明,企业建设电商网站,seo公司怎么推广宣传背景 Spark 3.5.0 目前 Spark中的实现中#xff0c;对于多分区的写入默认会先排序#xff0c;这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。 分析 这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中…背景 Spark 3.5.0 目前 Spark中的实现中对于多分区的写入默认会先排序这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。 分析 这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中的dataWriter的选择; 还有一个是Spark中为什么会加上Sort 这三部分是需要结合在一起分析讨论的 V1Writes规则的重改 直接转到代码部分: object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] val newQuery prepareQuery(write, write.query)val attrMap AttributeMap(write.query.output.zip(newQuery.output))val writeFiles WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) a.withExprId(attrMap(a).exprId)}val newWrite write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}其中 prepareQuery是对满足条件的计划前加上Sort逻辑排序其中prepareQuery关键的代码如下: val requiredOrdering write.requiredOrdering.map(_.transform {case a: Attribute attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering empty2NullPlan.outputOrderingval orderingMatched isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global false, empty2NullPlan)}write.requiredOrdering中涉及到的类为InsertIntoHadoopFsRelationCommand和InsertIntoHiveTable,且这两个物理计划中的requiredOrdering实现都是 V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)getSortOrder方法关键代码如下 val sortColumns V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters 0 sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns writerBucketSpec.map(_.bucketIdExpression) sortColumns).map(SortOrder(_, Ascending))}所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0默认值为0则会加上Sort逻辑计划具体的实现可以参考SPARK-37287 如果spark.sql.maxConcurrentOutputFileWriters为0默认值为0且 sortColumns为空(大部分情况下为空除非建表是partition加上bucket),则不会加上Sort逻辑计划 FileFormatWriter 中的dataWriter的选择 在 InsertIntoHadoopFsRelationCommand和InsertIntoHiveTable 这两个物理计划中最终写入文件/数据的时候会调用到FileFormatWriter.write方法这里有个concurrentOutputWriterSpecFunc函数变量的设置 val concurrentOutputWriterSpecFunc (plan: SparkPlan) {val sortPlan createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec WriteFilesSpec(description description,committer committer,concurrentOutputWriterSpecFunc concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)设置concurrentOutputWriterSpecFunc的代码如下 private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] {val maxWriters sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled maxWriters 0 sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () sortPlan.createSorter()))} else {None}}如果 spark.sql.maxConcurrentOutputFileWriters为0默认值为0,则ConcurrentOutputWriterSpec为None 如果 spark.sql.maxConcurrentOutputFileWriters大于0且 sortColumns为空(大部分情况下为空除非建表是partition加上bucket)则为Some(ConcurrentOutputWriterSpec(maxWriters, () sortPlan.createSorter()) 其中executeWrite会调用WriteFilesExec.doExecuteWrite方法,从而调用FileFormatWriter.executeTask,这里就涉及到dataWriter选择 val dataWriter if (sparkPartitionId ! 0 !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter,默认情况下为DynamicPartitionDataSingleWriter 否则就会为DynamicPartitionDataConcurrentWriter 这两者的区别见下文 Spark中为什么会加上Sort 至于Spark在写入文件的时候会加上Sort这个是跟写入的实现有关的也就是DynamicPartitionDataSingleWriter和DynamicPartitionDataConcurrentWriter的区别: DynamicPartitionDataSingleWriter 在任何时刻只有一个writer在写文件这能保证写入的稳定性不会在写入文件的时候消耗大量的内存但是速度会慢DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件能加快写入文件的速度但是因为多个文件的同时写入可能会导致OOM 对于DynamicPartitionDataSingleWriter 会根据partition或者bucket作为最细粒度来作为writer的标准如果相邻的两条记录所属不同的partition或者bucket则会切换writer所以说如果不根据partition或者bucket排序的话会导致writer频繁的切换这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。 参考 [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter[SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort
http://www.ho-use.cn/article/10823545.html

相关文章:

  • 优秀网站设计推荐怎么把wordpress后台设置成中文
  • wordpress php最大输出变量长沙企业网站排名优化
  • 高端品牌建站wordpress预约插件
  • 中国建设银行春季招聘网站中国建设网官方网站平台
  • 做音乐网站的条件网站开发经理岗位职责
  • 郴州网站建设服务网站开发认证考试
  • 浙江省城乡建设厅网站首页wordpress用户登录显示请求失败
  • 单页网站定义做网站工程案例图片
  • 自己的电脑建网站怎么查网站注册时间
  • 网站建设没付尾款企业建站公司推荐
  • 网站管理工具微信网站怎么收款
  • 汕头企业网站模板建站住房与建设部网站首页
  • 手机网站 jquery 特效做网站需要懂代码么
  • 怎样注册网站帐号申请做网站公司天津
  • 公司网站建设30元江苏省招标投标信息网
  • 网站如何导入织梦cms公司网站asp后台维护
  • 网站结构有哪几种修改wordpress主题
  • 大连网站建设方法烟台哪儿有可以做淘宝网站的
  • 光伏电站建设的国家网站wordpress 代码行号
  • 南京网站建设培训温州网站制作要多少钱
  • 新浪云建设自己的网站360免费建站官网入口
  • 网站域名自动跳转开发商是不是建设单位
  • 重庆荣昌网站建设公司影视会员网站怎么建设
  • 做服装网站服务电商网站设计流程
  • 邢台网站制作怎么样深圳通公司网站
  • 高平网站优化公司wordpress标签拼音
  • 电商网站怎样优化网站开发 项目介绍
  • 哈尔滨专业官网建站企业中国半导体设备
  • pathon能做网站开发吗学it需要什么学历基础
  • 免费视频网站怎么赚钱wordpress移动版主题