Spark物理执行计划

刘超 2天前 ⋅ 254 阅读   编辑

  我们首先以一个典型的Spark应用为例,概览该应用对应的物理执行计划,然后讨论Spark物理执行计划生成方法,最后讨论常用数据操作生成的物理执行计划。

一、Spark物理执行计划概览

  我们详细讨论了Spark应用生成的逻辑处理流程,现在我们讨论如何将应用的逻辑处理流程转化为物理执行计划,使得应用程序可以被分布执行。

  Spark应用生成的逻辑处理流程多种多样,为了方便讨论,我们首先构建一个较为复杂但比较典型的逻辑处理流程,然后以该流程为例讨论如何将其转化为物理执行计划。如下图所示,我们构建了一个ComplexApplication应用,该应用包含map()、partitionBy()、union()和join()等多种数据操作。

 

  ComplexApplication应用的示例代码:

import org.apache.spark.HashPartitioner

// 构建一个《K,V》类型的rdd1
val data1 = Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(5,'e'),(3,'f'),(2,'g'),(1,'h'))
val rdd1 = sc.parallelize(data1,3)
// 使用HashPartitioner对rdd1进行重新划分
val partitionedRDD = rdd1.partitionBy(new HashPartitioner(3))
// 构建一个《K,V》类型的rdd2,并对rdd2中record的value进行复杂
val data2 = Array[(Int,String)]((1,"A"),(2,"B"),(3,"C"),(4,"D"))
val rdd2 = sc.parallelize(data2,2).map(x => (x._1,x._2 + "" + x._2))
// 构建一个《K,V》类型的rdd3
val data3 = Array[(Int,String)]((3,"X"),(5,"Y"),(3,"Z"),(4,"Y"))
val rdd3 = sc.parallelize(data3,2)
// 将rdd2和rdd3进行union()操作
val unionedRDD = rdd2.union(rdd3)
// 将重新划分过的rdd1与unionedRDD进行join()操作
val resultRDD = partitionedRDD.join(unionedRDD)
// 输出join()操作后的结果,包括每个record及其index
resultRDD.foreach(println)

  这篇文章的核心问题是如何将逻辑处理流程转化为物理执行计划。MapReduce、Spark等大数据处理框架的核心思想是将大的应用拆分为小的执行任务,那么面对这么复杂的数据处理流程,Spark如何将其拆分为小的执行任务呢?

  想法1:一个直观想法是将每个具体的数据操作作为一个stage,也就是将前后关联的RDD组成一个stage,上图中的每个箭头都生成一个执行任务。对于2个RDD聚合成1个RDD的情况(见上图中的ShuffledRDD、UnionRDD、CoGroupedRDD),将这3个RDD组成一个stage。这样虽然可以解决任务划分问题,但存在多个性能问题。第1性能问题是会产生很多个任务,如上图中有36个箭头,会生成36个任务,当然我们可以对ShuffleDependency进行优化,将指向child RDD中同一个分区的箭头合并为一个task,使得一个task从parent RDD中的多个分区中获取数据,但是仍然会有多达21个任务。过多的任务不仅会增加调度压力,而且会产生第2个严重的性能问题,即需要存储大量的中间数据。一般来说,每个任务需要将执行结果存到磁盘或者内存中,这样方便下一个任务读取数据、进行计算。如果每个箭头都是计算任务的话,那么存储这些中间计算结果(RDD中的数据)需要大量的内存和磁盘空间,效率较低。

  想法2:既然想法1中生成的任务过多会造成中间数据量过大,那么第2个想法是减少任务数量。仔细观察一下逻辑处理流程图会发现中间数据只是暂时有用的,中间数据(RDD)产生以后,只用于下一步计算操作(上图中的箭头),而下一步计算操作完成后,中间数据可以被删除。那么,一个大胆的想法是将这些计算操作串联起来,只用一个stage来执行这些串联的多个操作,使得上一步操作在内存中生成的数据被下一步操作处理完后能够及时回收,减少内存消耗。

  基于这个串联思想,接下来需要解决的两个问题分别是:

  第一,每个RDD包含多个分区,那么需要生成多少个任务计算?如上图所示,我们观察到RDD中每个分区的计算逻辑相同,可以独立计算,因此我们可以将每个分区上的操作串联为一个task,也就是为最后的MapPartitionsRDD的每个分区分配一个task。

  第二,如何串联操作?遇到复杂依赖关系(如ShuffleDependency)如何处理?因为某些操作,如cogroup()、join()的输入数据(RDD)可以有多个,而输出数据(RDD)一般只有一个,所以我们将串联的顺序调整为从后往前。如上图中黑色粗箭头所示,从图中最后的MapPartitionsRDD开始向前串联,当遇到ShuffleDependency时,我们采用的处理方法是将该分区所依赖的上游数据(parent RDD)及操作都纳入一个task中。然而,这个方案仍然存在性能问题,当遇到ShuffleDependency时,task包含很多数据依赖和操作,导致划分出的task可能太大,而且会出现重复计算。例如,在下图中,从rdd2到UnionRDD所有的数据和操作都被纳入task0中,造成task0的计算量过大,而且其他task会重复处理这些数据,如使用虚线表示的task1仍然需要计算rdd2=>UnionRDD中的数据。当然我们可以在task0计算完后缓存这些需要重复计算的数据,以便后续task的计算,但这样缓存会占用存储空间,而且会使得task0与其他task不能同时并行计算,降低了并行度。