那几个网站可以做h5,济宁网,网站换稳定服务器,广州注册公司地址要求一、RDD 分区器简介
Spark 分区器的父类是 Partitioner 抽象类分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区#xff0c;进而决定了 Reduce 的个数只有 Key-Value 类型的 RDD 才有分区器#xff0c;非 Key-Value 类型的 RDD 分区的值是 No…一、RDD 分区器简介
Spark 分区器的父类是 Partitioner 抽象类分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区进而决定了 Reduce 的个数只有 Key-Value 类型的 RDD 才有分区器非 Key-Value 类型的 RDD 分区的值是 None每个 RDD 的分区索引的范围0~(numPartitions - 1)
二、HashPartitioner 默认的分区器对于给定的 key计算其 hashCode 并除以分区个数取余获得数据所在的分区索引 class HashPartitioner(partitions: Int) extends Partitioner {require(partitions 0, sNumber of partitions ($partitions) cannot be negative.)def numPartitions: Int partitionsdef getPartition(key: Any): Int key match {case null 0case _ Utils.nonNegativeMod(key.hashCode, numPartitions)}override def equals(other: Any): Boolean other match {case h: HashPartitioner h.numPartitions numPartitionscase _ false}override def hashCode: Int numPartitions
}三、RangePartitioner 将一定范围内的数据映射到一个分区中尽量保证每个分区数据均匀而且分区间有序 class RangePartitioner[K: Ordering: ClassTag, V](partitions: Int, rdd: RDD[_ : Product2[K, V]], private var ascending: Boolean true) extends Partitioner {// We allow partitions 0, which happens when sorting an empty RDD under the default settings.require(partitions 0, sNumber of partitions cannot be negative but found $partitions.)private var ordering implicitly[Ordering[K]]// An array of upper bounds for the first (partitions - 1) partitionsprivate var rangeBounds: Array[K] {...}def numPartitions: Int rangeBounds.length 1private var binarySearch: ((Array[K], K) Int) CollectionsUtils.makeBinarySearch[K]def getPartition(key: Any): Int {val k key.asInstanceOf[K]var partition 0if (rangeBounds.length 128) {// If we have less than 128 partitions naive searchwhile(partition rangeBounds.length ordering.gt(k, rangeBounds(partition))) {partition 1}} else {// Determine which binary search method to use only once.partition binarySearch(rangeBounds, k)// binarySearch either returns the match location or -[insertion point]-1if (partition 0) {partition -partition-1}if (partition rangeBounds.length) {partition rangeBounds.length}}if (ascending) {partition} else {rangeBounds.length - partition}}override def equals(other: Any): Boolean other match {...}override def hashCode(): Int {...}throws(classOf[IOException])private def writeObject(out: ObjectOutputStream): Unit Utils.tryOrIOException {...}throws(classOf[IOException])private def readObject(in: ObjectInputStream): Unit Utils.tryOrIOException {...}
}四、自定义 Partitioner
/**1.继承 Partitioner 抽象类2.重写 numPartitions: Int 和 getPartition(key: Any): Int 方法
*/
object TestRDDPartitioner {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(partition)val sc new SparkContext(conf)val rdd sc.makeRDD(List((nba, xxxxxxxxxxx),(cba, xxxxxxxxxxx),(nba, xxxxxxxxxxx),(ncaa, xxxxxxxxxxx),(cuba, xxxxxxxxxxx)))val partRdd rdd.partitionBy(new MyPartitioner)partRdd.saveAsTextFile(output)}
}class MyPartitioner extends Partitioner {// 重写返回分区数量的方法override def numPartitions: Int 3// 重写根据数据的key返回数据所在的分区索引的方法override def getPartition(key: Any): Int {key match {case nba 0case cba 1case _ 2}}}