在对RDD数据进行分区时,默认使用的是HashPartitioner
该函数对key进行哈希,然后对分区总数取模,取模结果相同的就会被分到同一个partition中
HashPartitioner分区逻辑: key.hashcode % 分区总数 = 分区号如果嫌HashPartitioner功能单一,可以自定义partitioner
需求
后期要想根据rdd的key的长度进行分区,相同key的长度进入到同一个分区中代码开发
TestPartitionerMain 主类
package cn.linann.partitioner import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} //todo:使用自己实现的自定义分区 object TestPartitionerMain { def main(args: Array[String]): Unit = { //1、构建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("TestPartitionerMain").setMaster("local[2]") //2、构建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、构建数据源 val data: RDD[String] = sc.parallelize(List("hadoop","hdfs","hive","spark","flume","kafka","flink","azkaban")) //4、获取每一个元素的长度,封装成一个元组 val wordLengthRDD: RDD[(String, Int)] = data.map(x=>(x,x.length)) //5、对应上面的rdd数据进行自定义分区 val result: RDD[(String, Int)] = wordLengthRDD.partitionBy(new MyPartitioner(3)) //6、保存结果数据到文件 result.saveAsTextFile("./data") sc.stop() } }自定义分区MyPartitioner
package cn.linann.partitioner import org.apache.spark.Partitioner //自定义分区 class MyPartitioner(num:Int) extends Partitioner{ //指定rdd的总的分区数 override def numPartitions: Int = { num } //消息按照key的某种规则进入到指定的分区号中 override def getPartition(key: Any): Int ={ //这里的key就是单词 val length: Int = key.toString.length length match { case 4 =>0 case 5 =>1 case 6 =>2 case _ =>0 } } }(1) 通过在driver中调用 ==SparkContext.accumulator(initialValue) ==方法,创建出存有初始值的累加器。返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值initialValue 的类型。
(2) spark闭包(函数序列化)里的excutor代码可以使用累加器的 add 方法增加累加器的值。
(3) driver程序可以调用累加器的 value 属性来访问累加器的值。
代码
object AccumulatorTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("accumulator") val sc = new SparkContext(conf) //创建accumulator并初始化为0 val accumulator = sc.accumulator(0); //读取一个有10条记录的文件 val linesRDD = sc.textFile("/words.txt") val result = linesRDD.map(s => { accumulator.add(1) //有一条数据就增加1 s }) result.collect(); //触发action操作 println("words lines is :" + accumulator.value) sc.stop() } } //输出结果: words lines is : 10spark是分布式执行引擎,其核心抽象是弹性分布式数据集RDD,其代表了分布在不同节点的数据。Spark的计算是在executor上分布式执行的,故用户开发的关于RDD的map,flatMap,reduceByKey等transformation 操作(闭包)有如下执行过程:
(1)代码中对象在driver本地序列化(2)对象序列化后传输到远程executor节点(3)远程executor节点反序列化对象(4)最终远程节点执行故对象在执行中需要序列化通过网络传输,则必须经过序列化过程。
(1) 如果函数中使用了该类对象,该类要实现序列化
类 extends Serializable(2) 如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要实现序列化
(3) 对于不能序列化的成员变量使用“@transient”标注,告诉编译器不需要序列化
(4) 也可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。
(5) 可以把对象的创建直接在该函数中构建这样避免需要序列化
一个application就是一个应用程序,包含了客户端所有的代码和计算资源
一个action操作对应一个DAG有向无环图,即一个action操作就是一个job
一个job中包含了大量的宽依赖,按照宽依赖进行stage划分,一个job产生了很多个stage
一个stage中有很多分区,一个分区就是一个task,即一个stage中有很多个task
总结
一个application包含了很多个job一个job包含了很多个stage一个stage包含了很多个task可以把spark程序提交到yarn中去运行,此时spark任务所需要的计算资源由yarn中的老大ResourceManager去分配
官网资料地址
http://spark.apache.org/docs/2.3.3/running-on-yarn.html环境准备
1、安装hadoop集群
2、安装spark环境
注意这里不需要安装spark集群
只需要解压spark安装包到任意一台服务器
修改文件 spark-env.sh
export JAVA_HOME=/opt/bigdata/jdk export HADOOP_CONF_DIR=/opt/bigdata/hadoop/etc/hadoop按照Spark应用程序中的driver分布方式不同,Spark on YARN有两种模式: yarn-client模式、yarn-cluster模式。
如果运行出现错误,可能是虚拟内存不足,可以添加参数
vim yarn-site.xml <!--容器是否会执行物理内存限制默认为True--> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!--容器是否会执行虚拟内存限制 默认为True--> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>yarn-cluster模式
yarn-client模式
yarn-cluster模式
spark程序的Driver程序在YARN中运行,运行结果不能在客户端显示,并且客户端可以在启动应用程序后消失应用的。最好运行那些将结果最终保存在外部存储介质(如HDFS、Redis、Mysql),客户端的终端显示的仅是作为YARN的job的简单运行状况。yarn-client模式
spark程序的Driver运行在Client上,应用程序运行结果会在客户端显示,所有适合运行结果有输出的应用程序(如spark-shell)总结
最大的区别就是Driver端的位置不一样。 yarn-cluster: Driver端运行在yarn集群中,与ApplicationMaster进程在一起。 yarn-client: Driver端运行在提交任务的客户端,与ApplicationMaster进程没关系,经常 用于进行测试collect算子操作的作用
1、它是一个action操作,会触发任务的运行2、它会把RDD的数据进行收集之后,以数组的形式返回给Driver端总结:
默认Driver端的内存大小为1G,由参数 spark.driver.memory 设置
如果某个rdd的数据量超过了Driver端默认的1G内存,对rdd调用collect操作,这里会出现Driver端的内存溢出,所有这个collect操作存在一定的风险,实际开发代码一般不会使用。
实际企业中一般都会把该参数调大,比如5G/10G等
可以在代码中修改该参数,如下
new SparkConf().set("spark.driver.memory","5G")–executor-memory
表示每一个executor进程需要的内存大小,它决定了后期操作数据的速度–total-executor-cores
表示任务运行需要总的cpu核数,它决定了任务并行运行的粒度总结
后期对于spark程序的优化,可以从这2个参数入手,无论你把哪一个参数调大,对程序运行的效率来说都会达到一定程度的提升 加大计算资源它是最直接、最有效果的优化手段。 在计算资源有限的情况下,可以考虑其他方面,比如说代码层面,JVM层面等