SparkCore算子

刘超 3天前 ⋅ 5326 阅读   编辑

目录

版本 分类 函数 数据依赖关系
transform map OneToOneDependency 


  mapValues
filter
filterByRange
flatMap
flatMapValues
sample
sampleByKey
glom
zipWithIndex
zipWithUniqueId
mapPartitions
mapPartitionsWithIndex
union(一般情况下) RangeDependency
union(特殊情况下) ManyToOneDependency
coalesce(shuffle=false)
zio
zipPartitions
cartesian ManyToManyDependency
partitionBy 单一ShuffleDependency
groupByKey
reduceByKey
aggregateByKey
combineByKey
foldByKey
sortByKey
coalesce(shuffle=true)
repartition
repartitionAndSortWithinPartitions
sortBy
distinct
cogroup 多ShuffleDependency
groupWith
join
intersection
subtract
subtractByKey
mapWith
zipWithIndex
withColumn  
makeRDD
 
reduceByKey  
action    
reduce(func)
collect()
count()
take(n)
first()
saveAsTextFile(path)
saveAsSequenceFile(path)
foreach(func)
countByKey()

 


1、map
  a、说明:使用func对rdd1中每个record进行处理,输出一个新的record
  b、用法:rdd2 = rdd1.map(func)
  c、示例:

scala> // 数据源是一个呗=被划分为3份的《K,V》数组
scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[0] at parallelize at :24
scala> // 对于每个record,如r = (1,'a'),使用r._1得到其key值,加上下划线,然后使用r._2加上其Value值
scala> val resultRDD = inputRDD.map(r => r._1 + "_" + r._2)
resultRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at :26
scala> // 输出RDD包含的record
scala> resultRDD.foreach(println)
3_f
2_g
1_h
3_c
4_d
2_e
1_a
2_b

  d、处理流程图:

 

2、filter
  a、说明:对rdd1中的每个record进行func操作,如果结果为true,则保留这个record,所有保留的record将形成新的rdd2
  b、用法:rdd2 = rdd1.filter(func)
  c、示例:

scala> // 输出rdd1中key为偶数的record
scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[5] at parallelize at :24
scala> val resultRDD = inputRDD.filter(r => r._1 % 2 == 0)
resultRDD: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[6] at filter at :26
scala> resultRDD.foreach(println)
(2,b)
(4,d)
(2,e)
(2,g)

  d、处理流程图:

 

3、filterByRange

  a、说明:对rdd1中的数据进行过滤,只保留[lower,upper]之间的record
  b、用法:rdd2 = rdd1.filterByRange(func)
c、示例:

scala> // 输出rdd1中key在[2,4]中的record
scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[7] at parallelize at :24
scala> val resultRDD = inputRDD.filterByRange(2,4)
resultRDD: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[8] at filterByRange at :26
scala> resultRDD.foreach(println)
(3,c)
(2,b)
(3,f)
(4,d)
(2,g)
(2,e)

  d、处理流程图:

 

3、flatMap
  a、说明:对rdd1中每个元素(如list)执行func操作,得到新元素,然后将所有新元素组合得到rdd2。例如,rdd1中某个分区中包含两个元素list(1,2)和list(3,4),func是对list中的每个元素加1,那么最后的到的rdd2中该分区的元素是(2,3,4,5)
  b、用法:rdd2 = rdd1.flatMap(func)
  c、示例: 

scala> // 数据源是3个字符串
scala> val inputRDD = sc.parallelize(Array[String]("how do you do","are you ok","thanks","bye bye","I'm ok"),3)
inputRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at :24
scala> // 使用flatMap()对字符串进行分词,得到一组单词
scala> val resultRDD = inputRDD.flatMap(x => x.split(" "))
resultRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at :26
scala> resultRDD.foreach(println)
how
do
you
do
bye
bye
are
you
ok
thanks
I'm
ok

  d、处理流程图:

 

4、flatMapValues
  a、说明:与flatMap()相同,但只针对rdd1中record 《K,V》中的Value进行flatMapValues()操作
  b、用法:rdd2 = rdd1.flatMapValues(func)
  c、示例:

scala> val inputRDD = sc.parallelize(Array[(Int,String)]((1,"how do you do"),(2,"are you ok"),(4,"thanks"),(5,"bye bye"),(2,"I'm ok")),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[11] at parallelize at :24
scala> val resultRDD = inputRDD.flatMapValues(x => x.split(" "))
resultRDD: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[12] at flatMapValues at :26
scala> resultRDD.foreach(println)
(1,how)
(1,do)
(1,you)
(1,do)
(2,are)
(2,you)
(2,ok)
(4,thanks)
(5,bye)
(5,bye)
(2,I'm)
(2,ok)

  d、处理流程图:
 

4、

4、mapPartitions
  mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为:
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。
举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext) {
val cur = iter.next
res.::=(pre, cur)
pre = cur }
res.iterator
}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

  上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。
5、mapPartitionsWithIndex

  def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]):

  RDD[U]函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。

var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
var rdd2 = rdd1.mapPartitionsWithIndex{
(x,iter) => {
var result = List[String]()
var i = 0
while(iter.hasNext){
i += iter.next()
}
result.::(x + "|" + i).iterator

}
}
//rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)

6、mapWith
  mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

  第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;
  第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。
举例:把partition index 乘以10加2,作为新的RDD的元素。

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)
x.mapWith(a => a * 10)((b, a) => (b,a + 2)).collect
结果:
(1,2)
(2,2)
(3,2)
(4,12)
(5,12)
(6,12)
(7,22)
(8,22)
(9,22)
(10,22)

7、flatMapWith
  flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

举例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)

8、coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

  该函数用于将RDD进行重分区,使用HashPartitioner。
  第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;以下面的例子来看:

scala> var data = sc.parallelize(1 to 12, 3)
scala> data.collect
scala> data.partitions.size
scala> var rdd1 = data.coalesce(1)
scala> rdd1.partitions.size
scala> var rdd1 = data.coalesce(4)
scala> rdd1.partitions.size
res2: Int = 1 //如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便
scala> var rdd1 = data.coalesce(4,true)
scala> rdd1.partitions.size
res3: Int = 4

9、repartition

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

  该函数其实就是coalesce函数第二个参数为true的实现

scala> var data = sc.parallelize(1 to 12, 3)
scala> data.collect
scala> data.partitions.size
scala> var rdd1 = data. repartition(1)
scala> rdd1.partitions.size
scala> var rdd1 = data. repartition(4)
scala> rdd1.partitions.size
res3: Int = 4

10、randomSplit

  def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

  该函数根据weights权重,将一个RDD切分成多个RDD。该权重参数为一个Double数组  第二个参数为random的种子,基本可忽略。

scala> var rdd = sc.makeRDD(1 to 12,12)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21

scala> rdd.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23,
MapPartitionsRDD[18] at randomSplit at :23,
MapPartitionsRDD[19] at randomSplit at :23,
MapPartitionsRDD[20] at randomSplit at :23)

//这里注意:randomSplit的结果是一个RDD数组
scala> splitRDD.size
res8: Int = 4
//由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD,
//把原来的rdd按照权重0.5, 0.1, 0.2, 0.2,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。
//注意,权重的总和加起来为1,否则会不正常
scala> splitRDD(0).collect
res10: Array[Int] = Array(1, 4)

scala> splitRDD(1).collect
res11: Array[Int] = Array(3)

scala> splitRDD(2).collect
res12: Array[Int] = Array(5, 9)

scala> splitRDD(3).collect
res13: Array[Int] = Array(2, 6, 7, 8, 10)

11、glom

def glom(): RDD[Array[T]]

  该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。

scala>  var rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at :24
scala> rdd.partitions.size
res13: Int = 3
scala> rdd.foreach(println)
4
7
5
8
6
9
10
1
2
3
scala> rdd.glom().collect
res16: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
//glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组

12、union并集

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
rdd3.collect

13、distinct
  去重

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
//去重输出
rdd3.distinct.collect

14、intersection交集

def distinct()
def distinct(numPartitions: Int)

示例如下

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求交集
val rdd4 = rdd1.intersection(rdd2)
rdd4.collect

15、subtract

def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
该函数返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。

val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求差集
val rdd4 = rdd1.subtract(rdd2)
rdd4.collect

16、subtractByKey

def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]

subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。
参数numPartitions用于指定结果的分区数
参数partitioner用于指定分区函数

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
scala> rdd1.subtractByKey(rdd2).collect
res13: Array[(String, String)] = Array((B,2))

17、groupbyKey
  说明:groupByKey([numTasks])是数据分组操作,是针对单个RDD,在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
val rdd5 = rdd4.groupByKey
rdd5.collect

18、reduceByKey

顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
举例:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
val rdd6 = rdd4.reduceByKey(_ + _)
rdd6.collect()

18、sortByKey

将List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按名称排序
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
//false降序
val rdd5 = rdd4.sortByKey(false)
rdd5.collect

20、zip

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21

scala> rdd1.zip(rdd2).collect
res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))

scala> rdd2.zip(rdd1).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))

scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21
scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
//如果两个RDD分区数不同,则抛出异常

21、zipPartitions

zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。
该函数有好几种实现,可分为三类:

参数是一个RDD
def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息

映射方法f参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21

//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{
| (x,iter) => {
| var result = List[String]()
| while(iter.hasNext){
| result ::= ("part_" + x + "|" + iter.next())
| }
| result.iterator
|
| }
| }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)

//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{
| (x,iter) => {
| var result = List[String]()
| while(iter.hasNext){
| result ::= ("part_" + x + "|" + iter.next())
| }
| result.iterator
|
| }
| }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)

//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){
| (rdd1Iter,rdd2Iter) => {
| var result = List[String]()
| while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
| result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
| }
| result.iterator
| }
| }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)


参数是两个RDD
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21

scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21

//rdd3中个分区元素分布
scala> rdd3.mapPartitionsWithIndex{
| (x,iter) => {
| var result = List[String]()
| while(iter.hasNext){
| result ::= ("part_" + x + "|" + iter.next())
| }
| result.iterator
|
| }
| }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)

//三个RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){
| (rdd1Iter,rdd2Iter,rdd3Iter) => {
| var result = List[String]()
| while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
| result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())
| }
| result.iterator
| }
| }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27

scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)

参数是三个RDD
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

用法同上面,只不过这里又多了个一个RDD而已。

22、zipWithIndex

定义

  def zipWithIndex(): RDD[(T, Long)]

含义

  该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对

示例

scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[36] at makeRDD at <console>:24
scala> val resultRDD=rdd2.zipWithIndex()
resultRDD: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[37] at zipWithIndex at <console>:25
scala> resultRDD.partitions.size
res19: Int = 2
scala> resultRDD.foreach(println)
(R,2)
(A,0)
(D,3)
(B,1)
(F,4)

23、zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]
该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
每个分区中第一个元素的唯一ID值为:该分区索引号,
每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
看下面的例子:
scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
//rdd1有两个分区,
scala> rdd1.zipWithUniqueId().collect
res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//总分区数为2
//第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5

• 键值转换
• partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]
该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21
scala> rdd1.partitions.size
res20: Int = 2

//查看rdd1中每个分区的元素
scala> rdd1.mapPartitionsWithIndex{
| (partIdx,iter) => {
| var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
| while(iter.hasNext){
| var part_name = "part_" + partIdx;
| var elem = iter.next()
| if(part_map.contains(part_name)) {
| var elems = part_map(part_name)
| elems ::= elem
| part_map(part_name) = elems
| } else {
| part_map(part_name) = List[(Int,String)]{elem}
| }
| }
| part_map.iterator
|
| }
| }.collect
res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))
//(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中

//使用partitionBy重分区
scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23

scala> rdd2.partitions.size
res23: Int = 2

//查看rdd2中每个分区的元素
scala> rdd2.mapPartitionsWithIndex{
| (partIdx,iter) => {
| var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
| while(iter.hasNext){
| var part_name = "part_" + partIdx;
| var elem = iter.next()
| if(part_map.contains(part_name)) {
| var elems = part_map(part_name)
| elems ::= elem
| part_map(part_name) = elems
| } else {
| part_map(part_name) = List[(Int,String)]{elem}
| }
| }
| part_map.iterator
| }
| }.collect
res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))
//(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中

24、mapValues

  a、说明:对于rdd1中每个record 《K,V》,使用func对Value进行处理,得到新的record
  b、用法:rdd2 = rdd1.mapValues(func)
  c、示例:

scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[0] at parallelize at :24
scala> // 对于每个record,如r = (1,'a'),在其Value值后加上“_1”
scala> val resultRDD = inputRDD.mapValues(x => x + "_1")
resultRDD: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[4] at mapValues at :26
scala> resultRDD.foreach(println)
(3,f_1)
(1,a_1)
(3,c_1)
(4,d_1)
(2,b_1)
(2,g_1)
(2,e_1)
(1,h_1)

  d、处理流程图: