1. 准备工作
先启动hadoop启动spark
集群模式启动
启动Spark集群 /export/servers/spark/sbin/start-all.sh
启动spark-shell
/export/server/spark/bin/spark-shell / –master spark://node01:7077 / –executor-memory 1g / –total-executor-cores 2
或本地模式启动 /export/servers/spark/bin/spark-shell --master local[8]
2. wordcount
先在hadoop上创建对应文件夹和文件
val res
= sc.textFile
("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap
(_.split
(" ")).map
((_,1)).reduceByKey
(_+_
)
res.collect
3. 创建RDD
val rdd1
= sc.parallelize
(List
(5,6,4,7,3,8,2,9,1,10
))
val rdd2
= sc.makeRDD
(List
(5,6,4,7,3,8,2,9,1,10
))
4. 查看该RDD的分区数量
sc.parallelize
(List
(5,6,4,7,3,8,2,9,1,10
)).partitions.length
sc.parallelize
(List
(5,6,4,7,3,8,2,9,1,10
),3
).partitions.length
sc.textFile
("hdfs://node01:8020/wordcount/input/words.txt").partitions.length
RDD分区的数据取决于哪些因素? RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利 用CPU的计算资源 但是在实际中为了更加充分的压榨CPU的计算资源,会把平行度设置为cpu核数的2~3倍 RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数有关系。
5. 分区实验
scala
> sc.parallelize
(1 to 100
).count
res0: Long
= 100
之所以会有 8 个 Tasks, 是因为在启动的时候指定的命令是 spark-shell --master local[8], 这样会生成 1 个 Executors, 这个 Executors 有 8 个 Cores, 所以默认会有 8 个 Tasks, 每个 Cores 对应一个分区, 每个分区对应一个 Tasks, 可以通过 rdd.partitions.size 来查看分区数量 同时也可以通过 spark-shell 的 WebUI 来查看 Executors 的情况 默认的分区数量是和 Cores 的数量有关的, 也可以通过如下三种方式修改或者重 新指定分区数量。
创建 RDD 时指定分区数:
scala
> val rdd1
= sc.parallelize
(1 to 100, 6
)
rdd1: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[1
] at parallelize at
<console
>:24
scala
> rdd1.partitions.size res1: Int
= 6
scala
> val rdd2
= sc.textFile
("hdfs:///dataset/wordcount.txt", 6
)
rdd2: org.apache.spark.rdd.RDD
[String
] = hdfs:///dataset/wordcount.txt
MapPartitionsRDD
[3
] at textFile at
<console
>:24
scala
> rdd2.partitions.size res2: Int
= 7
扩展:
1.启动的时候指定的CPU核数确定了一个参数值: spark.default.parallelism=指定的CPU核数(集群模式最小2) 2.对于Scala集合调用parallelize(集合,分区数)方法, 如果没有指定分区数,就使用spark.default.parallelism, 如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism) 3.对于textFile(文件,分区数) defaultMinPartitions 如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2) 如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数 rdd的分区数 对于本地文件: rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions) 对于HDFS文件: rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)
所以如果分配的核数为多个,且从文件中读取数据创建RDD,即使hdfs文件只有1个切片,最后的Spark的RDD的partition数也有可能是2
RDD的具体算子实现
RDD的Transformation算子—算子的转换
单Value类型算子双值类型算子Key-Value类型算子统计性算子 RDD的Action算子–将结果通过executor传回driver显示结果、
countfirstcollectsaveAtextFile
6. Transformation
Spark Core - transformations RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
6.1 常用的transformation操作示例(27个)
6.2 值类型valueType
6.2.1 map(func)
作用:
map(func)返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
使用说明 案例 代码 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD
scala
> var
source = sc.parallelize
(1 to 10
)
source: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[8
] at parallelize at
<console
>:24
scala
> source.collect
()
res7: Array
[Int
] = Array
(1, 2, 3, 4, 5, 6, 7, 8, 9, 10
)
scala
> val mapadd
= source.map
(_ * 2
)
mapadd: org.apache.spark.rdd.RDD
[Int
] = MapPartitionsRDD
[9
] at map at
<console
>:26
scala
> mapadd.collect
()
res8: Array
[Int
] = Array
(2, 4, 6, 8, 10, 12, 14, 16, 18, 20
)
6.2.2 filter(func)
解析
filter(func)返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
使用说明 案例 需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)
scala
> var sourceFilter
= sc.parallelize
(Array
("xiaoming",
"xiaojiang",
"xiaohe",
"dazhi"))
sourceFilter: org.apache.spark.rdd.RDD
[String
] = ParallelCollectionRDD
[10
] at parallelize at
<console
>:24
scala
> val filter
= sourceFilter.filter
(_.contains
("xiao"))
filter: org.apache.spark.rdd.RDD
[String
] = MapPartitionsRDD
[11
] at filter at
<console
>:26
scala
> sourceFilter.collect
()
res9: Array
[String
] = Array
(xiaoming, xiaojiang, xiaohe, dazhi
)
scala
> filter.collect
()
res10: Array
[String
] = Array
(xiaoming, xiaojiang, xiaohe
)
6.2.3 flatMap(func)
解析:
flatMap(func)类似于map,但是每一个输入元素可以被映射为 0或多个输出元素(所以func应该返回一个序列, 而不是单一元素)
使用说明 需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,新的RDD为原 RDD的每个元素的2倍(2,4,6,8,10)
scala
> val sourceFlat
= sc.parallelize
(1 to 5
)
sourceFlat: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[12
] at parallelize at
<console
>:24
scala
> sourceFlat.collect
()
res11: Array
[Int
] = Array
(1, 2, 3, 4, 5
)
scala
> val flatMap
= sourceFlat.flatMap
(1 to _
)
flatMap: org.apache.spark.rdd.RDD
[Int
] = MapPartitionsRDD
[13
] at flatMap at
<console
>:26
scala
> flatMap.collect
()
res12: Array
[Int
] = Array
(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5
)
6.2.4 mapPartitions(func)
多余N个元素M个分区的形式map是按照N个元素执行N次mapPartitons根据M个分区执行几次mapmapParitionWithIndex 分区的基础上增加索引的index 解析:
mapPartitions(func)类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区
6.2.5 mapPartitionsWithIndex(func)
解析
6.2.6 sample(withReplacement, fraction, seed)
使用说明 案例 需求 创建一个RDD(1-10),从中选择放回和不放回抽样
scala
> val rdd
= sc.parallelize
(1 to 10
)
rdd: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[20
] at parallelize at
<console
>:24
scala
> rdd.collect
()
res15: Array
[Int
] = Array
(1, 2, 3, 4, 5, 6, 7, 8, 9, 10
)
scala
> var sample1
= rdd.sample
(true,0.4,2
)
sample1: org.apache.spark.rdd.RDD
[Int
] = PartitionwiseSampledRDD
[21
] at sample at
<console
>:26
scala
> sample1.collect
()
res16: Array
[Int
] = Array
(1, 2, 2, 7, 7, 8, 9
)
scala
> var sample2
= rdd.sample
(false,0.2,3
)
sample2: org.apache.spark.rdd.RDD
[Int
] = PartitionwiseSampledRDD
[22
] at sample at
<console
>:26
scala
> sample2.collect
()
res17: Array
[Int
] = Array
(1, 9
)
6.2.7 Glom
解析 案例
scala
> val rdd
= sc.parallelize
(1 to 16,4
)
rdd: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[65
] at parallelize at
<console
>:24
scala
> rdd.glom
().collect
()
res25: Array
[Array
[Int
]] = Array
(Array
(1, 2, 3, 4
), Array
(5, 6, 7, 8
), Array
(9, 10, 11, 12
), Array
(13, 14, 15, 16
))
6.2.8 sortBy(func,[ascending], [numTasks])
解析根据key或value均可进行排序 案例 代码
scala
> val rdd
= sc.parallelize
(List
(1,2,3,4
))
rdd: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[21
] at parallelize at
<console
>:24
scala
> rdd.sortBy
(x
=> x
).collect
()
res11: Array
[Int
] = Array
(1, 2, 3, 4
)
scala
> rdd.sortBy
(x
=> x%3
).collect
()
res12: Array
[Int
] = Array
(3, 4, 1, 2
)
6.2.9 coalesce(numPartitions)
解析 案例 代码
rdd: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[54
] at parallelize at
<console
>:24
scala
> rdd.partitions.size
res20: Int
= 4
scala
> val coalesceRDD
= rdd.coalesce
(3
)
coalesceRDD: org.apache.spark.rdd.RDD
[Int
] = CoalescedRDD
[55
] at coalesce at
<console
>:26
scala
> coalesceRDD.partitions.size
res21: Int
= 3
6.2.10 repartition(numPartitions)
解析
案例
代码
scala
> val rdd
= sc.parallelize
(1 to 16,4
)
rdd: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[56
] at parallelize at
<console
>:24
scala
> rdd.partitions.size
res22: Int
= 4
scala
> val rerdd
= rdd.repartition
(2
)
rerdd: org.apache.spark.rdd.RDD
[Int
] = MapPartitionsRDD
[60
] at repartition at
<console
>:26
scala
> rerdd.partitions.size
res23: Int
= 2
scala
> val rerdd
= rdd.repartition
(4
)
rerdd: org.apache.spark.rdd.RDD
[Int
] = MapPartitionsRDD
[64
] at repartition at
<console
>:26
scala
> rerdd.partitions.size
res24: Int
= 4
重分区-缩减分区-增加分区
原来有2个分区,现在想增加到3个分区,是否会发生shuffle原来有3个分区,现在想减少到2个分区,是否会发生shfuule
repartition无论是增加还是缩小都会发生shuffle对于calesce在缩小分区的情况下不会发生shuffle
6.3 双值类型DoubleValueType
6.3.1 union(otherDataset)
解析 案例 需求:创建两个RDD,求并集
scala
> val rdd1
= sc.parallelize
(1 to 5
)
rdd1: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[23
] at parallelize at
<console
>:24
scala
> val rdd2
= sc.parallelize
(5 to 10
)
rdd2: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[24
] at parallelize at
<console
>:24
scala
> val rdd3
= rdd1.union
(rdd2
)
rdd3: org.apache.spark.rdd.RDD
[Int
] = UnionRDD
[25
] at union at
<console
>:28
scala
> rdd3.collect
()
res18: Array
[Int
] = Array
(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10
)
6.3.2 intersection(otherDataset)
解析 案例:创建两个RDD,求两个RDD的交集
6.3.3 distinct([numTasks]))
解析
案例
6.3.4 Subtract
解析
案例
6.3.5 zip(otherDataset)案例
解析 拉链操作代码
scala
> val rdd1
= sc.parallelize
(Array
(1,2,3
),3
)
rdd1: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[1
] at parallelize at
<console
>:24
scala
> val rdd2
= sc.parallelize
(Array
("a",
"b",
"c"),3
)
rdd2: org.apache.spark.rdd.RDD
[String
] = ParallelCollectionRDD
[2
] at parallelize at
<console
>:24
scala
> rdd1.zip
(rdd2
).collect
res1: Array
[(Int, String
)] = Array
((1,a), (2,b), (3,c))
scala
> rdd2.zip
(rdd1
).collect
res2: Array
[(String, Int
)] = Array
((a,1), (b,2), (c,3))
scala
> val rdd3
= sc.parallelize
(Array
("a",
"b",
"c"),2
)
rdd3: org.apache.spark.rdd.RDD
[String
] = ParallelCollectionRDD
[5
] at parallelize at
<console
>:24
scala
> rdd1.zip
(rdd3
).collect
java.lang.IllegalArgumentException: Can't
zip RDDs with unequal numbers of partitions: List
(3, 2
)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions
(ZippedPartitionsRDD.scala:57
)
at org.apache.spark.rdd.RDD$
$anonfun$partitions$2.apply
(RDD.scala:252
)
at org.apache.spark.rdd.RDD$
$anonfun$partitions$2.apply
(RDD.scala:250
)
at scala.Option.getOrElse
(Option.scala:121
)
at org.apache.spark.rdd.RDD.partitions
(RDD.scala:250
)
at org.apache.spark.SparkContext.runJob
(SparkContext.scala:1965
)
at org.apache.spark.rdd.RDD$
$anonfun$collect$1.apply
(RDD.scala:936
)
at org.apache.spark.rdd.RDDOperationScope$.withScope
(RDDOperationScope.scala:151
)
at org.apache.spark.rdd.RDDOperationScope$.withScope
(RDDOperationScope.scala:112
)
at org.apache.spark.rdd.RDD.withScope
(RDD.scala:362
)
at org.apache.spark.rdd.RDD.collect
(RDD.scala:935
)
... 48 elided
6.4 Key-Value值类型
6.4.1 partitionBy
解析:根据分区函数进行分区
说明
案例
6.4.2 reduceByKey(func, [numTasks])
解析:根据相同key的value进行聚合(groupBy+对value进行sum求和) 说明 案例
6.4.3 groupByKey
解析:根据key进行分组
6.4.4 reduceByKey和groupByKey的区别
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].groupByKey:按照key进行分组,直接进行shuffle。开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
如何通过groupByKey达到和reduceByKey的功能
groupByKey得到的结果Array(male,compactBuffer(5,2))groupByKey().map(t=>(t._1,t._2.sum))
6.4.5 combineByKey[C]
解析: combineByKey是Spark中一个比较核心的高级且底层函数,其他一些高阶键值对函数底层 都是用它实现的。诸如 groupByKey,reduceByKey等等
如下解释下3个重要的函数参数:
createCombiner:V=>C ,这个函数把当前的值作为参数,此时我们可以对其做些附加 操作(类型转换)并把它返回 (这一步类似于初始化操作)mergeValue:(C,V)=>C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个 操作在每个分区内进行)mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)。
案例 解释:(v)=>(v,1),得到的是(88,1),因为这是combineByKey是按照key处理value操作,acc:(Int,Int) 代表的是(88,1),其中acc._1代表的是88,acc._2代表1值,v代表是同为Fred名称的95的数值, 所以acc._1+v=88+95,即相同Key的Value相加结果,第三个参数是分区间的相同key的value 进行累加,得到Fred的88+95+91,Wilma累加和为93+95+98。
6.4.6 aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
解析:初始值zeroValue只用于分区内函数,分区间不操作 案例 解析: 例如:分一个分区,以key为1的分区为例,0先和3比较得3,3在和2比较得3,3在和4比较 得4,所以整个key为1的组最终结果为(1,4),同理,key为2的最终结果为(2,3),key 为3的为(3,8). 如果分三个分区,前两个是一个分区,中间两个是一个分区,最后两个是一个分区,第一个 分区的最终结果为(1,3),第二个分区为(1,4)(2,3),最后一个分区为(3,8), combine后为 (3,8), (1,7), (2,3)
案例1案例2
6.4.8 sortByKey([ascending], [numTasks])
解析:根据Key对Value进行排序 案例
6.4.9 join(otherDataset, [numTasks])
解析 案例
6.4.10 cogroup(otherDataset, [numTasks])
解析
案例
6.4.11 cartesian(otherDataset)
解析 案例
6.4.12 mapValues
解析 案例
7. Action
7.1 常用的action操作示例(13个)
7.1.1 reduce(func)-使用相应fun进行运算
7.1.2 collect()
7.1.3 count()
解析 案例
7.1.4 first()
7.1.5 take(n)
7.1.6 takeSample(withReplacement,num, [seed])
解析:随机数采样点的Action算子
7.1.7 takeOrdered(n)
7.1.8 aggregate (zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
解析:初始值作用于分区内和分区间 案例
7.1.9 fold(num)(func)
7.1.10 saveAsTextFile(path)
7.1.11 saveAsSequenceFile(path)
7.1.12 saveAsObjectFile(path)
7.1.13 countByKey()
7.1.14 foreach(func)
7.2 数值RDD统计操作
Spark 对包含数值数据的 RDD 提供了一些描述性的统计操作。Spark 的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些 统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以 StatsCounter 对象返回。
scala
> var rdd1
= sc.makeRDD
(1 to 100
)
rdd1: org.apache.spark.rdd.RDD
[Int
] = ParallelCollectionRDD
[42
] at makeRDD at
<console
>:32
scala
> rdd1.sum
()
res34: Double
= 5050.0
scala
> rdd1.max
()
res35: Int
= 100