def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values
} else {
new CoalescedRDD(this, numPartitions)
}
}
val sc = new SparkContext()
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...").persist
def processNewLogs(logFileName:String){
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
//RDD of (UserID,(UserInfo,LinkInfo)) pairs
val joined = usersData.join(events)
val offTopicVisits = joined.filter {
// Expand the tuple into its components
case (userId, (userInfo, linkInfo)) =>
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed opics: " + offTopicVisits)
} 缺点
连接操作会将两个数据集中的所有键的哈希值都求出来,将哈希值相同的记录通过网络传到同一台机器上,然后再对所有键相同的记录进行连接操作。userData表数据量很大,所以这样进行哈希计算和跨节点数据混洗非常耗时。
改进代码实现
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...")
.partionBy(new HashPartiotioner(100))
.persist() 优点
userData表进行了重新分区,将键相同的数据都放在一个分区中。然后调用persist持久化结果数据,不用每次都计算哈希和跨节点混洗。程序运行速度显著提升。
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。