Spark(9) -- SparkCore(3) -- RDD基础练习

    科技2022-07-10  123

    1. 准备工作

    先启动hadoop启动spark 集群模式启动 启动Spark集群 /export/servers/spark/sbin/start-all.sh 启动spark-shell

    /export/server/spark/bin/spark-shell / –master spark://node01:7077 / –executor-memory 1g / –total-executor-cores 2

    或本地模式启动 /export/servers/spark/bin/spark-shell --master local[8]

    2. wordcount

    先在hadoop上创建对应文件夹和文件

    val res = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt") .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) # 上面的代码不会立即执行,因为都是Transformation转换操作 # 下面的代码才会真正的提交并执行,因为是Action动作/行动操作 res.collect

    3. 创建RDD

    val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)) val rdd2 = sc.makeRDD(List(5,6,4,7,3,8,2,9,1,10))

    4. 查看该RDD的分区数量

    sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).partitions.length #没有指定分区数,默认值是2 sc.parallelize(List(5,6,4,7,3,8,2,9,1,10),3).partitions.length#指定了分区数为3 sc.textFile("hdfs://node01:8020/wordcount/input/words.txt").partitions.length #2 RDD分区的数据取决于哪些因素?   RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利 用CPU的计算资源 但是在实际中为了更加充分的压榨CPU的计算资源,会把平行度设置为cpu核数的2~3倍 RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数有关系。

    5. 分区实验

    scala> sc.parallelize(1 to 100).count res0: Long = 100

     之所以会有 8 个 Tasks, 是因为在启动的时候指定的命令是 spark-shell --master local[8], 这样会生成 1 个 Executors, 这个 Executors 有 8 个 Cores, 所以默认会有 8 个 Tasks, 每个 Cores 对应一个分区, 每个分区对应一个 Tasks, 可以通过 rdd.partitions.size 来查看分区数量  同时也可以通过 spark-shell 的 WebUI 来查看 Executors 的情况  默认的分区数量是和 Cores 的数量有关的, 也可以通过如下三种方式修改或者重 新指定分区数量。

    创建 RDD 时指定分区数:

    scala> val rdd1 = sc.parallelize(1 to 100, 6) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24 scala> rdd1.partitions.size res1: Int = 6 scala> val rdd2 = sc.textFile("hdfs:///dataset/wordcount.txt", 6) rdd2: org.apache.spark.rdd.RDD[String] = hdfs:///dataset/wordcount.txt MapPartitionsRDD[3] at textFile at <console>:24 scala> rdd2.partitions.size res2: Int = 7 扩展:

    1.启动的时候指定的CPU核数确定了一个参数值:  spark.default.parallelism=指定的CPU核数(集群模式最小2) 2.对于Scala集合调用parallelize(集合,分区数)方法,  如果没有指定分区数,就使用spark.default.parallelism,  如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism) 3.对于textFile(文件,分区数) defaultMinPartitions  如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2)  如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数 rdd的分区数  对于本地文件:   rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)  对于HDFS文件:   rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)

     所以如果分配的核数为多个,且从文件中读取数据创建RDD,即使hdfs文件只有1个切片,最后的Spark的RDD的partition数也有可能是2

    RDD的具体算子实现

    RDD的Transformation算子—算子的转换 单Value类型算子双值类型算子Key-Value类型算子统计性算子 RDD的Action算子–将结果通过executor传回driver显示结果、 countfirstcollectsaveAtextFile

    6. Transformation

    Spark Core - transformations  RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

    6.1 常用的transformation操作示例(27个)

    6.2 值类型valueType

    6.2.1 map(func)

    作用: map(func)返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 使用说明 案例 代码 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24 scala> source.collect() res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> val mapadd = source.map(_ * 2) mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26 scala> mapadd.collect() res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

    6.2.2 filter(func)

    解析 filter(func)返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 使用说明 案例 需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串) scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi")) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> val filter = sourceFilter.filter(_.contains("xiao")) filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26 scala> sourceFilter.collect() res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi) scala> filter.collect() res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

    6.2.3 flatMap(func)

    解析: flatMap(func)类似于map,但是每一个输入元素可以被映射为 0或多个输出元素(所以func应该返回一个序列, 而不是单一元素) 使用说明 需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,新的RDD为原 RDD的每个元素的2倍(2,4,6,8,10) scala> val sourceFlat = sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> sourceFlat.collect() res11: Array[Int] = Array(1, 2, 3, 4, 5) scala> val flatMap = sourceFlat.flatMap(1 to _) flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26 scala> flatMap.collect() res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

    6.2.4 mapPartitions(func)

    多余N个元素M个分区的形式map是按照N个元素执行N次mapPartitons根据M个分区执行几次mapmapParitionWithIndex 分区的基础上增加索引的index 解析: mapPartitions(func)类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区

    6.2.5 mapPartitionsWithIndex(func)

    解析

    6.2.6 sample(withReplacement, fraction, seed)

    使用说明 案例 需求 创建一个RDD(1-10),从中选择放回和不放回抽样 scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24 scala> rdd.collect() res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> var sample1 = rdd.sample(true,0.4,2) sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26 scala> sample1.collect() res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9) scala> var sample2 = rdd.sample(false,0.2,3) sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26 scala> sample2.collect() res17: Array[Int] = Array(1, 9)

    6.2.7 Glom

    解析 案例 scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24 scala> rdd.glom().collect() res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))

    6.2.8 sortBy(func,[ascending], [numTasks])

    解析根据key或value均可进行排序 案例 代码 scala> val rdd = sc.parallelize(List(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24 scala> rdd.sortBy(x => x).collect() res11: Array[Int] = Array(1, 2, 3, 4) scala> rdd.sortBy(x => x%3).collect() res12: Array[Int] = Array(3, 4, 1, 2)

    6.2.9 coalesce(numPartitions)

    解析 案例 代码 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24 scala> rdd.partitions.size res20: Int = 4 scala> val coalesceRDD = rdd.coalesce(3) coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26 scala> coalesceRDD.partitions.size res21: Int = 3

    6.2.10 repartition(numPartitions)

    解析

    案例

    代码

    scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24 scala> rdd.partitions.size res22: Int = 4 scala> val rerdd = rdd.repartition(2) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26 scala> rerdd.partitions.size res23: Int = 2 scala> val rerdd = rdd.repartition(4) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[64] at repartition at <console>:26 scala> rerdd.partitions.size res24: Int = 4 重分区-缩减分区-增加分区 原来有2个分区,现在想增加到3个分区,是否会发生shuffle原来有3个分区,现在想减少到2个分区,是否会发生shfuule repartition无论是增加还是缩小都会发生shuffle对于calesce在缩小分区的情况下不会发生shuffle

    6.3 双值类型DoubleValueType

    6.3.1 union(otherDataset)

    解析 案例 需求:创建两个RDD,求并集 scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(5 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24 scala> val rdd3 = rdd1.union(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at <console>:28 scala> rdd3.collect() res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)

    6.3.2 intersection(otherDataset)

    解析 案例:创建两个RDD,求两个RDD的交集

    6.3.3 distinct([numTasks]))

    解析

    案例

    6.3.4 Subtract

    解析

    案例

    6.3.5 zip(otherDataset)案例

    解析 拉链操作代码 #需求:创建两个RDD,并将两个RDD组合到一起形成一个(k,v)RDD #(1)创建第一个RDD scala> val rdd1 = sc.parallelize(Array(1,2,3),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24 #(2)创建第二个RDD(与1分区数相同) scala> val rdd2 = sc.parallelize(Array("a","b","c"),3) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24 #(3)第一个RDD组合第二个RDD并打印 scala> rdd1.zip(rdd2).collect res1: Array[(Int, String)] = Array((1,a), (2,b), (3,c)) #(4)第二个RDD组合第一个RDD并打印 scala> rdd2.zip(rdd1).collect res2: Array[(String, Int)] = Array((a,1), (b,2), (c,3)) #(5)创建第三个RDD(与1,2分区数不同) scala> val rdd3 = sc.parallelize(Array("a","b","c"),2) rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24 #(6)第一个RDD组合第三个RDD并打印 scala> rdd1.zip(rdd3).collect java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(3, 2) at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) ... 48 elided

    6.4 Key-Value值类型

    6.4.1 partitionBy

    解析:根据分区函数进行分区

    说明

    案例

    6.4.2 reduceByKey(func, [numTasks])

    解析:根据相同key的value进行聚合(groupBy+对value进行sum求和) 说明 案例

    6.4.3 groupByKey

    解析:根据key进行分组

    6.4.4 reduceByKey和groupByKey的区别

    reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].groupByKey:按照key进行分组,直接进行shuffle。开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。

    如何通过groupByKey达到和reduceByKey的功能

    groupByKey得到的结果Array(male,compactBuffer(5,2))groupByKey().map(t=>(t._1,t._2.sum))

    6.4.5 combineByKey[C]

    解析: combineByKey是Spark中一个比较核心的高级且底层函数,其他一些高阶键值对函数底层 都是用它实现的。诸如 groupByKey,reduceByKey等等

    如下解释下3个重要的函数参数:

    createCombiner:V=>C ,这个函数把当前的值作为参数,此时我们可以对其做些附加 操作(类型转换)并把它返回 (这一步类似于初始化操作)mergeValue:(C,V)=>C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个 操作在每个分区内进行)mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)。

    案例 解释:(v)=>(v,1),得到的是(88,1),因为这是combineByKey是按照key处理value操作,acc:(Int,Int) 代表的是(88,1),其中acc._1代表的是88,acc._2代表1值,v代表是同为Fred名称的95的数值, 所以acc._1+v=88+95,即相同Key的Value相加结果,第三个参数是分区间的相同key的value 进行累加,得到Fred的88+95+91,Wilma累加和为93+95+98。

    6.4.6 aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

    解析:初始值zeroValue只用于分区内函数,分区间不操作 案例 解析:  例如:分一个分区,以key为1的分区为例,0先和3比较得3,3在和2比较得3,3在和4比较 得4,所以整个key为1的组最终结果为(1,4),同理,key为2的最终结果为(2,3),key 为3的为(3,8).  如果分三个分区,前两个是一个分区,中间两个是一个分区,最后两个是一个分区,第一个 分区的最终结果为(1,3),第二个分区为(1,4)(2,3),最后一个分区为(3,8), combine后为 (3,8), (1,7), (2,3) 案例1案例2

    6.4.8 sortByKey([ascending], [numTasks])

    解析:根据Key对Value进行排序 案例

    6.4.9 join(otherDataset, [numTasks])

    解析 案例

    6.4.10 cogroup(otherDataset, [numTasks])

    解析

    案例

    6.4.11 cartesian(otherDataset)

    解析 案例

    6.4.12 mapValues

    解析 案例

    7. Action

    7.1 常用的action操作示例(13个)

    7.1.1 reduce(func)-使用相应fun进行运算

    7.1.2 collect()

    7.1.3 count()

    解析 案例

    7.1.4 first()

    7.1.5 take(n)

    7.1.6 takeSample(withReplacement,num, [seed])

    解析:随机数采样点的Action算子

    7.1.7 takeOrdered(n)

    7.1.8 aggregate (zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

    解析:初始值作用于分区内和分区间 案例

    7.1.9 fold(num)(func)

    7.1.10 saveAsTextFile(path)

    7.1.11 saveAsSequenceFile(path)

    7.1.12 saveAsObjectFile(path)

    7.1.13 countByKey()

    7.1.14 foreach(func)

    7.2 数值RDD统计操作

    Spark 对包含数值数据的 RDD 提供了一些描述性的统计操作。Spark 的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些 统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以 StatsCounter 对象返回。

    scala> var rdd1 = sc.makeRDD(1 to 100) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at makeRDD at <console>:32 scala> rdd1.sum() res34: Double = 5050.0 scala> rdd1.max() res35: Int = 100
    Processed: 0.008, SQL: 8