设为首页 收藏本站
查看: 1272|回复: 0

[经验分享] 7.spark core之数据分区

[复制链接]

尚未签到

发表于 2019-1-30 11:21:11 | 显示全部楼层 |阅读模式
简介
    spark一个最重要的特性就是对数据集在各个节点的分区进行控制。控制数据分布可以减少网络开销,极大地提升整体性能。
    只有Pair RDD才有分区,非Pair RDD分区的值是None。如果RDD只被扫描一次,没必要预先分区处理;如果RDD多次在诸如连接这种基于键的操作中使用时,分区才有作用。

分区器
    分区器决定了RDD的分区个数及每条数据最终属于哪个分区。
    spark提供了两个分区器:HashPartitioner和RangePartitioner,它们都继承于org.apache.spark.Partitioner类并实现三个方法。


  • numPartitions: Int: 指定分区数
  • getPartition(key: Any): Int: 分区编号(0~numPartitions-1)
  • equals(): 检查分区器对象是否和其他分区器实例相同,判断两个RDD分区方式是否一样。

HashPartitioner分区
    HashPartitioner分区执行原理:对于给定的key,计算其hashCode,再除以分区数取余,最后的值就是这个key所属的分区ID。实现如下:

class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
RangePartitioner分区
    HashPartitioner分区可能导致每个分区中数据量的不均匀。而RangePartitioner分区则尽量保证每个分区中数据量的均匀,将一定范围内的数映射到某一个分区内。分区与分区之间数据是有序的,但分区内的元素是不能保证顺序的。
    RangePartitioner分区执行原理:


  • 计算总体的数据抽样大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1M的数据量。
  • 根据sampleSize和分区数量计算每个分区的数据抽样样本数量sampleSizePrePartition
  • 调用RangePartitioner的sketch函数进行数据抽样,计算出每个分区的样本。
  • 计算样本的整体占比以及数据量过多的数据分区,防止数据倾斜。
  • 对于数据量比较多的RDD分区调用RDD的sample函数API重新进行数据抽取。
  • 将最终的样本数据通过RangePartitoner的determineBounds函数进行数据排序分配,计算出rangeBounds。

class RangePartitioner[K: Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ = 0, s"Number of partitions cannot be negative but found $partitions.")
// 获取RDD中K类型数据的排序器
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions
if (fraction * n > sampleSizePerPartition) {
// 如果fraction乘以当前分区中的数据量大于之前计算的每个分区的抽象数据大小,那么表示当前分区抽取的数据太少了,该分区数据分布不均衡,需要重新抽取
imbalancedPartitions += idx
} else {
// 当前分区不属于数据分布不均衡的分区,计算占比权重,并添加到candidates集合中
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key  (x, weight))
}
// 将最终的抽样数据计算出rangeBounds出来
RangePartitioner.determineBounds(candidates, partitions)
}
}
}
// 下一个RDD的分区数量是rangeBounds数组中元素数量+ 1个
def numPartitions: Int = rangeBounds.length + 1
// 二分查找器,内部使用java中的Arrays类提供的二分查找方法
private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
// 根据RDD的key值返回对应的分区id。从0开始
def getPartition(key: Any): Int = {
// 强制转换key类型为RDD中原本的数据类型
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length  rangeBounds.length) {
partition = rangeBounds.length
}
}
// 根据数据排序是升序还是降序进行数据的排列,默认为升序
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
影响分区的算子操作
    影响分区的算子操作有:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、repartition()、coalesce()、sort()、mapValues()(如果父RDD有分区方式)、flatMapValues()(如果父RDD有分区方式)。
    对于执行两个RDD的算子操作,输出数据的分区方式取决于父RDD的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。不过,如果其中一个父RDD设置过分区方式,结果就采用那种分区方式;如果两个父RDD都设置过分区方式,结果RDD采用第一个父RDD的分区方式。

repartition和partitionBy的区别
    repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner。但是二者之间的区别有:


  • partitionBy只能用于Pair RDD
  • 都作用于Pair RDD时,结果也不一样
    DSC0000.jpg
    其实partitionBy的结果才是我们所预期的。repartition 其实使用了一个随机生成的数来当作 key,而不是使用原来的key。

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values
} else {
new CoalescedRDD(this, numPartitions)
}
}

repartition和coalesce的区别
    两个算子都是对RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)


  • NM并且N和M相差不多(假如N是1000,M是100),这时可以将shuffle设置为false,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。在shuffle为false的情况下,如果NM并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能。如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。

实例分析

需求
    统计用户访问其未订阅主题页面的情况。


  • 用户信息表:由(UserID,UserInfo)组成的RDD,UserInfo包含该用户所订阅的主题列表。
  • 事件表:由(UserID,LinkInfo)组成的RDD,存放着每五分钟内网站各用户访问情况。

代码实现

val sc = new SparkContext()
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...").persist
def processNewLogs(logFileName:String){
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
//RDD of (UserID,(UserInfo,LinkInfo)) pairs
val joined = usersData.join(events)
val offTopicVisits = joined.filter {
// Expand the tuple into its components
case (userId, (userInfo, linkInfo)) =>
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed opics: " + offTopicVisits)
}
缺点
    连接操作会将两个数据集中的所有键的哈希值都求出来,将哈希值相同的记录通过网络传到同一台机器上,然后再对所有键相同的记录进行连接操作。userData表数据量很大,所以这样进行哈希计算和跨节点数据混洗非常耗时。

改进代码实现

val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...")
.partionBy(new HashPartiotioner(100))
.persist()
优点
    userData表进行了重新分区,将键相同的数据都放在一个分区中。然后调用persist持久化结果数据,不用每次都计算哈希和跨节点混洗。程序运行速度显著提升。
  忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-669556-1-1.html 上篇帖子: 3.spark运行模式 下篇帖子: spark内核
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表