Spark系统部署与应用运行的基本流程

刘超 19天前 ⋅ 276 阅读   编辑

  首先介绍Spark的安装部署与系统架构,然后通过一个简单的Spark应用例子简介Spark应用运行的基本流程,最后讨论Spark的编程模型。Spark应用运行的详细流程将在之后详细讨论。

一、Spark安装部署

  在运行Spark应用之前,我们首先要在集群上安装部署Spark。Spark官网上提供了多个版本,包括Standalone、Mesos、YARN和Kubernetes版本。这几个版本的主要区别在于:Standalone版本的资源管理和任务调度器由Spark系统本身负责,其他版本的资源管理和任务调度器依赖于第三方框架,如YARN可以同时管理Spark任务和Hadoop MapReduce任务。为了方便探讨和理解Spark本身的系统结构和运行原理,我们选择Standalone版本安装。这里选择Spark-2.4.3版本安装部署在9台机器上,1台机器作为Master节点,8台机器作为Worker节点。由于官网和一些博客已经提供了详细的Spark安装过程,这里不再赘述。虽然Spark的版本在不断更新中,但其设计原理变化不大,因此本书的分析具有一定的通用性。

  需要注意的是,在安装时需要配置很多资源信息,包括CPU、内存等,如果想详细了解各种配置参数的含义,那么可以参考Spark配置。另外,如果没有集群环境,但是想运行Spark用户代码,则可以直接下载IntelliJ IDEA集成开发环境,在IDEA中通过Maven包管理工具添加Spark包(Package),然后直接编写Spark用户代码,并通过local(本地)模式运行。与集群版Spark的区别是,所有的Spark任务和main()函数等都运行在本地,没有网络交互等。

二、Spark系统架构
  与Hadoop MapReduce的结构类似,Spark也采用Master-Worker结构。如果一个Spark集群由4个节点组成,即1个Master节点和3个Worker节点,那么在部署Standalone版本后,Spark部署的系统架构图如下图所示。简单来说,Master节点负责管理应用和Work,Worker节点负责执行任务。

 

  我们接下来先介绍Master节点和Worker节点的具体功能,然后介绍一些Spark系统中的基本概念,以及一些实现细节。

  Master节点和Worker节点的职责如下所述。
  1、Master节点上常驻Master进程。该进程负责管理全部的Worker节点,如将Spark任务分配给Worker节点,收集Worker节点上任务的运行信息,监控Worker节点的存活状态等。
  2、Worker节点上常驻Worker进程。该进程除了与Master节点通信,还负责管理Spark任务的执行,如启动Executor来执行具体的Spark任务,监控任务运行状态等。

  启动Spark集群时(使用Spark部署包中start-all.sh脚本),Master节点上会启动Master进程,每个Worker节点上会启动Worker进程。启动Spark集群后,接下来可以提交Spark应用到集群中执行,如用户可以在Master节点上使用

./bin/run-example SparkPi 10

  来提交一个名为SparkPi的应用。Master节点接收到应用后首先会通知Worker节点启动Executor,然后分配Spark计算任务(task)到Executor上执行,Executor接收到task后,为每个task启动1个线程来执行。这里有几个概念需要解释一下。
  1、Spark application,即Spark应用,指的是1个可运行的Spark程序,如WordCount.scala,该程序包含main()函数,其数据处理流程一般先从数据源读取数据,再处理数据,最后输出结果。同时,应用程序也包含了一些配置参数,如需要占用的CPU个数,Executor内存大小等。用户可以使用Spark本身提供的数据操作来实现程序,也可以通过其他框架(如Spark SQL)来实现应用,Spark SQL框架可以将SQL语句转化成Spark程序执行。
  2、Spark Driver,也就是Spark驱动程序,指实际在运行Spark应用中main()函数的进程,官方解释是“The process running the main() function of the application and creating the SparkContext”,如运行SparkPi应用main()函数而产生的进程被称为SparkPi Driver。在上图中,运行在Master节点上的Spark应用进程(通常由SparkSubmit脚本产生)就是Spark Driver,Driver独立于Master进程。如果是YARN集群,那么Driver也可能被调度到Worker节点上运行。另外,也可以在自己的PC上运行Driver,通过网络与远程的Master进程连接,但一般不推荐这样做,一个原因是需要本地安装一个与集群一样的Spark版本,另一个原因是自己的PC一般和集群不在同一个网段,Driver和Worker节点之间的通信会很慢。简单来说,我们可以在自己的IntelliJ IDEA中运行Spark应用,IDEA会启动一个进程既运行应用程序的main()函数,又运行具体计算任务task,即Driver和task共用一个进程。
  3、Executor,也称为Spark执行器,是Spark计算资源的一个单位。Spark先以Executor为单位占用集群资源,然后可以将具体的计算任务分配给Executor执行。由于Spark是由Scala语言编写的,Executor在物理上是一个JVM进程,可以运行多个线程(计算任务)。在Standalone版本中,启动Executor实际上是启动了一个名叫CoarseGrainedExectuorBackend的JVM进程。之所以起这么长的名字,是为了不与其他版本中的Executor进程名冲突,如Mesos、YARN等版本会有不同的Executor进程名。Worker进程实际只负责启停和观察Executor的执行情况。
  4、task,即Spark应用的计算任务。Driver在运行Spark应用的main()函数时,会将应用拆分为多个计算任务,然后分配给多个Executor执行。task是Spark中最小的计算单位,不能再拆分。task以线程方式运行在Executor进程中,执行具体的计算任务,如map算子、reduce算子等。由于Executor可以配置多个CPU,而1个task一般使用1个CPU,因此当Executor具有多个CPU时,可以运行多个task。例如,在上图中Worker节点1有8个CPU,启动了2个Executor,每个Executor可以并行运行4个task。Executor的总内存大小由用户配置,而且Executor的内存空间由多个task共享。

  如果上述解释不够清楚,那么我们可以用一个直观例子来理解Master、Worker、Driver、Executor、task的关系。例如,一个农场主(Master)有多片草场(Worker),农场主要把草场租给3个牧民来放马、牛、羊。假设现在有3个项目(application)需要农场主来运作:第1个牧民需要一片牧场来放100匹马,第2个牧民需要一片牧场来放50头牛,第3个牧民需要一片牧场来放80只羊。每个牧民可以看作是一个Driver,而马、牛、羊可以看作是task。为了保持资源合理利用、避免冲突,在放牧前,农场主需要根据项目需求为每个牧民划定可利用的草场范围,而且尽量让每个牧民在每个草场都有一小片可放牧的区域(Executor)。在放牧时,每个牧民(Driver)只负责管理自己的动物(task),而农场主(Master)负责监控草场(Worker)、牧民(Driver)等状况。

  回到Spark技术点讨论,这里有个问题是Spark为什么让task以线程方式运行而不以进程方式运行。在Hadoop MapReduce中,每个map/reduce task以一个Java进程(命名为Child JVM)方式运行。这样的好处是task之间相互独立,每个task独享进程资源,不会相互干扰,而且监控管理比较方便,但坏处是task之间不方便共享数据。例如,当同一个机器上的多个map task需要读取同一份字典来进行数据过滤时,需要将字典加载到每个map task进程中,则会造成重复加载、浪费内存资源的问题。另外,在应用执行过程中,需要不断启停新旧task,进程的启动和停止需要做很多初始化等工作,因此采用进程方式运行task会降低执行效率。为了数据共享和提高执行效率,Spark采用了以线程为最小的执行单位,但缺点是线程间会有资源竞争,而且Executor JVM的日志会包含多个并行task的日志,较为混乱。更多关于内存资源管理和竞争的问题将在以后进行阐述。

  在上图中还有一些实现细节。
  1、每个Worker进程上存在一个或者多个ExecutorRunner对象。每个ExecutorRunner对象管理一个Executor。Executor持有一个线程池,每个线程执行一个task。
  2、Worker进程通过持有ExecutorRunner对象来控制CoarseGrainedExecutorBackend进程的启停。
  3、每个Spark应用启动一个Driver和多个Executor,每个Executor里面运行的task都属于同一个Spark应用。

三、Spark应用例子

  了解了Spark的系统部署之后,我们接下来先给出一个Spark应用的例子,然后通过分析该应用的运行过程来学习Spark框架是如何运行应用的。

  1、用户代码基本逻辑
  我们以Spark自带的example包中的GroupByTest.scala为例,这个应用模拟了SQL中的GroupBy语句,也就是将具有相同Key的《Key,Value》 record(其简化形式为《K,V》 record)聚合在一起。输入数据由GroupByTest程序自动生成,因此需要提前设定需要生成的《K,V》 record个数、Value长度等参数。假设在Master节点上提交运行GroupByTest,具体参数和执行命令如下:

GroupByTest [numMappers] [numKVParis] [valSize] [numReducers]

bin/run-example GroupByTest 3 4 1000 2

  该命令启动GroupByTest应用,该应用包括3个map task,每个task随机生成4个《K,V》 record,record中的Key从[0,1,2,3]中随机抽取一个产生,每个Value大小为1000 byte。由于Key是随机产生的,具有重复性,所以可以通过GroupBy将具有相同Key的record聚合在一起,这个聚合过程最终使用2个reduce task并行执行。这里虽然指定生成3个map task,但需要注意的是我们一般不需要在编写应用时指定map task的个数,因为map task的个数可以通过“输入数据大小/每个分片大小”来确定。例如,HDFS上的默认文件block大小为128MB,假设我们有1GB的文件需要处理,那么系统会自动算出需要启动1GB/128MB=8个map task。reduce task的个数一般在使用算子时通过设置partition number来间接设置。更多的例子会在以后介绍,我们这里主要关注应用的基本运行流程。

  GroupByTest具体代码如下,为了方便阅读和调试进行了一些简化。

import scala.util.Random

val spark = SparkSession.builder.appName("Group By Test").getOrCreate()
val numMappers = 3
val numKVPairs = 4
val valSize = 1000
val numReducers = 2
val input = 0 until numMappers // [0,1,2]

val pairs1 = spark.sparkContext.parallelize(input,numMappers).flatMap{
  p =>
    val ranGen = new Random
    val arr1 = new Array[(Int,Array[Byte])](numKVPairs)
    for(i <- 0 until numKVPairs){
      val byteArr = new Array[Byte](valSize)
      ranGen.nextBytes(byteArr)
      arr1(i) = (ranGen.nextInt(numKVPairs),byteArr)
    }
    arr1
}.cache()

// Enforce that everything has been calculated and in cache
println(pairs1.count())
println(pairs1.toDebugString) // 打印出形成pairs1的逻辑流程图
val results = pairs1.groupByKey(numReducers)
println(results.count())
println(results.toDebugString) // 打印出形成results的逻辑流程图

  阅读代码后,对照GroupByTest代码和下图,我们分析一下代码的具体执行流程。

 

  1、初始化SparkSession,这一步主要是初始化Spark的一些环境变量,得到Spark的一些上下文信息sparkContext,使得后面的一些操作函数(如flatMap()等)能够被编译器识别和使用,这一步同时创建GroupByTest Driver,并初始化Driver所需要的各种对象。
  2、设置参数numMappers=3,numKVPairs=4,valSize=1000,numReducers=2。
  3、使用sparkContext.parallelize(0 until numMappers,numMappers)将[0,1,2]划分为3份,也就是每一份包含一个数字p={p=0, p=1, p=2}。接下来flatMap()的计算逻辑是对于每一个数字p(如p=0),生成一个数组arr1:Array[(Int,Byte[])],数组长度为numKVPairs=4。数组中的每个元素是一个(Int,Byte[])对,其中Int为[0,3]上随机生成的整数,Byte[]是一个长度为1000的数组。因为p只有3个值,所以该程序总共生成3个arr1数组,被命名为pairs1,pairs1被声明为需要缓存到内存中。
  4、接下来执行一个action()操作pairs1.count(),来统计pairs1中所有arr1中的元素个数,执行结果应该是numMappers*numKVPairs=3×4=12。这一步除了计算count结果,还将pairs1中的3个arr1 数组缓存到内存中,便于下一步计算。需要注意的是,缓存操作在这一步才执行,因为pairs1实际在执行action()操作后才会被生成,这种延迟(lazy)计算的方式与普通Java程序有所区别。action()操作的含义是触发Spark执行数据处理流程、进行计算的操作,即需要输出结果,更详细的含义以后介绍。
  5、执行完pair1.count()后,在已经被缓存的pairs1上执行groupByKey()操作,groupByKey()操作将具有相同Key的《Int,Byte[]》 record聚合在一起,得到《Int,list (Byte[1000],Byte[1000],…,Byte[1000])》 ,总的结果被命名为results。Spark实际在执行这一步时,由多个reduce task来完成,reduce task的个数等于numReducers。
  6、最后执行results.count(),count()将results中所有record个数进行加和,得到结果4,这个结果也是pairs1中不同Key的总个数。

  在探讨GroupByTest应用如何在Spark中执行前,我们先思考一下使用Spark编程与使用普通语言(如C++/Java/Python)编写数据处理程序的不同。使用普通语言编程时,处理的数据在本地,程序也在本地进程中运行,我们可以随意定义变量、函数、控制流(分支、循环)等,编程灵活、受限较少,且程序按照既定顺序执行、输出结果。在Spark程序中,首先要声明SparkSession的环境变量才能够使用Spark提供的数据操作,然后使用Spark操作来定义数据处理流程,如flatMap(func).groupByKey()。此时,我们只是定义了数据处理流程,而并没有让Spark真正开始计算,就像在一个画布上画出了数据处理流程,包括哪些数据处理步骤,这些步骤如何连接,每步的输入和输出是什么(如flatMap()中的p=>... )。至于这些步骤和操作如何在系统中并行执行,用户并不需要关心。这有点像SQL语言,只需要声明想要得到的数据(select,where),以及如何对这些数据进行操作(GroupBy,join),至于这些操作如何实现,如何被系统执行,用户并不需要关心。在Spark中,唯一需要注意声明的数据处理流程在使用action()操作时,Spark才真正执行处理流程,如果整个程序没有action()操作,那么Spark并不会执行数据处理流程。在普通程序中,程序一步步按照顺序执行,并没有这个限制。Spark这样做与其需要分布式运行有关,更详细的内容在以后介绍。

  2、逻辑处理流程

  了解了Spark应用的计算逻辑后,我们接下来研究Spark应用如何执行的问题。Spark的实际执行流程比用户想象的要复杂,需要先建立DAG型的逻辑处理流程(Logical plan),然后根据逻辑处理流程生成物理执行计划(Physical plan),物理执行计划中包含具体的计算任务(task),最后Spark将task分配到多台机器上执行。

  为了获得GroupByTest的逻辑处理流程,我们可以使用toDebugString()方法来打印出pairs1和results的产生过程,进而分析GroupByTest的整个逻辑处理流程。在这之前,我们先分析GroupByTest产生的job个数。由于GroupByTest进行了两次action()操作:pairs1.count()和results.count(),所以会生成两个Spark作业(job),如下图所示。接下来,我们分析pairs1和results的产生过程,即这两个job是如何产生的。

 

  a、pairs1.toDebugString()的执行结果

scala> println(pairs1.toDebugString) // 打印出形成pairs1的逻辑流程图
(3) MapPartitionsRDD[1] at flatMap at :32 [Memory Deserialized 1x Replicated]
 |       CachedPartitions: 3; MemorySize: 24.8 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  ParallelCollectionRDD[0] at parallelize at :32 [Memory Deserialized 1x Replicated]

  第一行的“(3) MapPartitionsRDD[1]”表示的是pairs1,即pairs1的类型是MapPartitions-RDD,编号为[1],共有3个分区(partition),这是因为pairs1中包含了3个数组。由于设置了pairs1.cache,所以pairs1中的3个分区在计算时会被缓存,其类型是CachedPartitions。那么pairs1是怎么生成的呢?我们看到描述“MapPartitionsRDD[1] at flatMap at :32”,即pairs1是由flatMap()函数生成的,对照程序代码,可以发现确实是input.parallelize().flatMap()生成的。接着出现了“ParallelCollectionRDD[0]”,根据描述是由input.parallelize()函数生成的,编号为[0],因此,我们可以得到结论:input.parallelize()得到一个ParallelCollectionRDD,然后经过flatMap()得到pairs1:MapPartitionsRDD。

  备注:去掉cache时,不会打印分区类型,如下

scala> :paste
// Entering paste mode (ctrl-D to finish)

val pairs1 = spark.sparkContext.parallelize(input,numMappers).flatMap{
  p =>
    val ranGen = new Random
    val arr1 = new Array[(Int,Array[Byte])](numKVPairs)
    for(i <- 0 until numKVPairs){
      val byteArr = new Array[Byte](valSize)
      ranGen.nextBytes(byteArr)
      arr1(i) = (ranGen.nextInt(numKVPairs),byteArr)
    }
    arr1
}
// Exiting paste mode, now interpreting.

pairs1: org.apache.spark.rdd.RDD[(Int, Array[Byte])] = MapPartitionsRDD[6] at flatMap at :32

scala> println(pairs1.count())
12

scala> println(pairs1.toDebugString) // 打印出形成pairs1的逻辑流程图
(3) MapPartitionsRDD[6] at flatMap at :32 []
 |  ParallelCollectionRDD[5] at parallelize at :32 []

  b、results.toDebugString()的执行结果

scala> println(results.toDebugString) // 打印出形成results的逻辑流程图
(2) ShuffledRDD[2] at groupByKey at :28 []
 +-(3) MapPartitionsRDD[1] at flatMap at :32 []
    |      CachedPartitions: 3; MemorySize: 24.8 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |  ParallelCollectionRDD[0] at parallelize at :32 []

  同样,第1行的“(2) ShuffledRDD[2]”表示的是results,即results的类型是ShuffledRDD,由groupByKey()产生,共有两个分区(partition),这是因为在groupByKey()中,设置了partition number=numReducers=2。接着出现了“MapPartitionsRDD [1]”,这个就是之前生成的pairs1。接下来的ParallelCollectionRDD由input.parallelize()生成。

  我们可以将上述过程画成逻辑处理流程图,如下图所示。