面试题-Spark

    科技2022-07-15  119

    Spark01

    Spark02

    1、glom算子的作用 把每个分区的元素都放到一个数组里面去

    scala> sc.parallelize(1 to 10, 4).glom().collect() res0: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5), Array(6, 7), Array(8, 9, 10))

    2、join 用RDD实现 3、left.cogroup(right).collect join底层的实现是用的cogroup,join是cogroup的简单实现,53min scala> left.cogroup(right).collect res36: Array[(String, (Iterable[String], Iterable[Any]))] = Array((xingxing,(CompactBuffer(),CompactBuffer(80))), (ruoze,(CompactBuffer(bj),CompactBuffer(30))), (pk,(CompactBuffer(sz),CompactBuffer())), (j,(CompactBuffer(sh),CompactBuffer(18)))) 4、union

    Spark03

    1、不允许使用distinct,实现去重操作【reduceByKey做】 2、foreach& foreachPartition

    sc.makeRDD(List(1,2,3,4,5)).foreach(println) sc.makeRDD(List(1,2,3,4,5),2).foreachPartition(partition =>{ println("这是一个分区") for(ele <- partition) { println(ele) } })

    3、在spark处理中遇到哪些异常 java.io.com.ruozedata. NotSerializableException with Serializable 4、【排序】Spark04-C好好看看

    4、一个普通类,实现一个排序【隐式转换实现】case class封装太low了 5、排序的终极解决方案,背都背下来 implicit val ord = Ordering[(Double,Int)].on【(String,Double,Int)】(x=>(-x._2,x._3)) (Double,Int):就是排序规则的返回值,也就是(Double,Int)要和(-x._2,x._3)对应 on里面是x对应的

    Spark04

    1、【一】变形的wc input: a,1,3 a,2,4 b,1,1 ==> output: a,3,7 b,1,1

    --------------------------思路------------------------

    a b分组 逗号进行分割 (a,(1,3)) (a,(2,4)) (b,(1,1))分组内相同index的value相加 reduceByKey((x,y)=>x+y) val sparkConf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(sparkConf) sc.parallelize(List(List("a",1,3),List("a",2,4),List("b",1,1))) .map(x => { val key = x(0) val v1 = x(1).toString.toInt val v2 = x(2).toString.toInt (key, (v1,v2)) }).reduceByKey((x,y) => { (x._1 + y._1, x._2+y._2) }).foreach(println)

    2、【二】变形的wc 需求: input: “100000,一起看|电视剧|军旅|士兵突击,1,1”, “100000,一起看|电视剧|军旅|士兵突击,1,0”, “100001,一起看|电视剧|军旅|我的团长我的团,1,1” ==> output: ((100000,一起看),(2,1)) ((100001,电视剧),(1,1)) ((100001,军旅),(1,1)) ((100001,一起看),(1,1)) ((100001,我的团长我的团),(1,1)) ((100000,军旅),(2,1)) ((100000,士兵突击),(2,1)) ((100000,电视剧),(2,1))

    sc.parallelize(List( "100000,一起看|电视剧|军旅|士兵突击,1,1", "100000,一起看|电视剧|军旅|士兵突击,1,0", "100001,一起看|电视剧|军旅|我的团长我的团,1,1" )).flatMap(x => { val splits = x.split(",") var id = splits(0) val nav = splits(1) val imp = splits(2).toInt val click = splits(3).toInt val navs = nav.split("\\|") navs.map(x => ((id,x),(imp, click))) }).reduceByKey((x,y) =>(x._1+y._1,x._2+y._2)).foreach(println) sc.stop()

    Spark05

    1、Support for running on YARN (Hadoop NextGen) was added to Spark in version 0.6.0, and improved in subsequent releases.

    Spark07

    1、dependency & 宽依赖 & 窄依赖 1-1、Join是宽依赖还是窄依赖 Join是预先处理过的(co-operatitioned)—> 窄依赖 Join是未预先处理过的—> 宽依赖 2、RDD的map()、flatmap() 返回MapPartitionsRDD reduceByKey()返回ShuffledRDD 3、map vs foreach map有返回值 foreach有返回值 4、aggregate算子

    def aggregate[U: ClassTag] (zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U seqOp:func作用到每一个分区 分区内聚合 combOp:全局聚合

    def fun3(x:Int,y:List[Int]) = x.max(y.max) def fun4(x:Int,y:Int) = x + y // 求分区内最大值,分区间求和 val rdd4: RDD[List[Int]] = sc.parallelize(List(List(1,3),List(2,4),List(3,5)),3) println(rdd4.aggregate(10)(fun3, fun4)) //40

    5、reduceByKey和groupByKey的区别 reduceByKey在map端聚合,groupByKey没有 本质原因是,两者调用combineByKeyWithClassTag时,传参不同

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn’t use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }

    combineByKeyWithClassTag默认map端聚合

    def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, “mergeCombiners must be defined”) // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException(“Cannot use map-side combining with array keys.”) } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException(“HashPartitioner cannot partition array keys.”) } }

    Spark08

    1、广播变量,没有shuffle 2、action是eagle立马执行的,transformation是lazy的操作,persist()、cache()是lazy操作 3、rdd的cache是容错的,一旦分区里的数据丢了,它会从源头重新计算 4、yarn默认有两个executor 5、collect仅用于数据量较小的测试场景。因为它会把数据都加载到driver的内存中。

    Processed: 0.016, SQL: 8