Spark来源于AMP实验室,在2009年被开发,Spark是对MapReduce的缺陷进行改进的
特点:
运行速度快内存计算,循环数据流 基于DAG的执行引擎,可以进行流水线优化
2.支持多种语言
Scala Java Python R
通用性SQL查询:Spark SQL 流式计算:Spark Streaming 机器学习:Spark MLlib 图算法组件:Spark的GraphX
运行模式多样可访问HDFS Cassandra HBase Hive等
在企业中用不同的技术来满足不同的应用场景可能会出现一下问题: 无法无缝共享数据,使用成本高,资源利用不充分
而Spark满足一个软件栈满足多种应用需求
我们常说的Spark指的是Spark Core;而不是一整个Spark生态系统,Spark Core是用来帮你完成批处理的,上面的其他组件是用来满足不同需求的
添加链接描述
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。
首先要导入pyspark模块 Spark中本地运行模式有3种,如下
(1)local 模式:本地单线程运行; (2)local[k]模式:本地K个线程运行; (3)local[*]模式:用本地尽可能多的线程运行。
from pyspark import SparkContext # 导入pyspark sc = SparkContext("设置模式","自定义程序名") # 设置模式 rdd1 = sc.textFile("file:///本地文件") # 导入本地文件 rdd2.1 = sc.textFile("hdfs://localhost:9000/xxxxxxxx") # 导入HDFS文件 rdd2.2 = sc.textFile("/xxxxxxxx") # 导入HDFS文件 rdd2.3 = sc.textFile("xxxx.txt") # 导入HDFS文件文本文件中的每一行代表一个RDD元素
转换操作是对RDD的类型进行逻辑上的转换而已,并没有实际进行运算; 实际的运算要用行动操作
操作含义filter(func)筛选出满足函数func的元素,并返回一个新的数据集map(func)将每个元素传递到函数func中,并将结果返回一个新的数据集flatMap(func)与map相似,但每个输入元素都可以映射到0或者多个输出结果groupByKey()应用于(K,V)键值对的数据集,返回一个新的(K,Iterable)形式的数据集reduceByKey(func)应用于(K,V)键值对的数据集,返回一个新的(K,V)形式的数据集,其中每个值是将每个Key传递到函数func中进行聚合操作的结果;func是对值进行处理的!reduceByKey相当于是先做了groupByKey,再对值进行处理union(rdd)对一个RDD和参数RDD进行并集计算,然后返回新的RDDintersection(rdd)对RDD和另一个RDD进行交集计算,然后返回新的RDDsortBy(func)通过func来对RDD内部元素进行排序,默认为升序,False为降序distinct()去重,返回新的RDD #!/usr/bin/python from pyspark import SparkContext sc = SparkContext("local", "testSparkApi") rdd = sc.textFile("file:///home/hadoop/sparktest.txt") ''' Hadoop is good Spark is fast Spark is better '''将每个元素进行split操作(map),得到的内容再提取出来,形成一个新的元素(flat)
fm = rdd.flatMap(lambda x:x.split()) result = fm.collect() print("RDD flatMapApi:%s"%result) "RDD flatMapApi:['Hadoop', 'is', 'good', 'Spark', 'is', 'fast', 'Spark', 'is', 'better']"将相同键的值封装成Iterable数据类型
fm = rdd.flatMap(lambda x: x.split()) mapped = fm.map(lambda x:(x,1)) # 将每个元素变成(K,1)的形式 print(mapped.foreach(print)) ''' ('Hadoop', 1) ('is', 1) ('good', 1) ('Spark', 1) ('is', 1) ('fast', 1) ('Spark', 1) ('is', 1) ('better', 1) ''' gbk = mapped.groupByKey() result = gbk.collect() print("RDD groupByKeyApi:%s"%result) '''RDD groupByKeyApi:[('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d63c8>), ('is', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d62b0>), ('good', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d6518>), ('Spark', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d6438>), ('fast', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d65f8>), ('better', <pyspark.resultiterable.ResultIterable object at 0x7f6ad3419c50>)] ‘’‘ # ------------------------------------ from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd = sc.parallelize([("a",1),("b",1),("c",1),("a",2),("b",2),("c",2),("a",3),("d",4),("b",5)]) result = rdd.groupByKey().mapValues(list).collect() print(result) # [('a', [1, 2, 3]), ('b', [1, 2, 5]), ('c', [1, 2]), ('d', [4])]将所有元素相同键的值进行聚合操作,func是对值进行操作的,结果返回新的(K,V)值是聚合操作后的结果
fm = rdd.flatMap(lambda x: x.split()) mapped = fm.map(lambda x:(x,1)) rbk = mapped.reduceByKey(lambda a,b:a+b) result = rbk.collect() print("RDD reduceByKeyApi:%s"%result) ''' RDD reduceByKeyApi:[('Hadoop', 1), ('is', 3), ('good', 1), ('Spark', 2), ('fast', 1), ('better', 1)] '''在不改变原有Key的基础上,对Key-value结构的RDD的Value值进行一个map操作
from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd = sc.parallelize([("a",["apple","banana","lemon"]),("b",["grapes"])]) result = rdd.mapValues(lambda x:len(x)).collect() print(result) # [('a', 3), ('b', 1)]对Key-value结构的RDD先执行mapValue操作(对值进行操作),在执行扁平化操作
from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd = sc.parallelize([("a",["apple","banana","lemon"]),("b",["grapes"])]) result = rdd.flatMapValues(lambda x:x).collect() print(result) # [('a', 'apple'), ('a', 'banana'), ('a', 'lemon'), ('b', 'grapes')]根据键来排序
from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd = sc.parallelize([("a",1),("b",1),("c",1),("a",2),("b",2),("c",2),("a",3),("d",4),("b",5),("3",2),("1",4)]) result = rdd.groupByKey().mapValues(list).sortByKey().collect() print(result) # [('1', [4]), ('3', [2]), ('a', [1, 2, 3]), ('b', [1, 2, 5]), ('c', [1, 2]), ('d', [4])]返回RDD内部的键,值
对RDD之前的所有转换进行计算,真正执行!
操作描述count()返回数据集中的元素个数collect()以数组的形式返回数据集中的所有元素fitst()返回数据集中的第一个元素take(n)以数组的形式返回数据集中的前n个元素reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素[数值求和]foreach(func)将数据集中的每个元素传递到函数func中运行top(n)返回RDD内部元素的前n个最大值saveAsTextFile()将RDD中的元素以字符串的格式存储在文件系统中collectAsTextFile()将Key-Value结构的RDD以字典的形式返回countByKey()对每个Key键的元素进行统计有时候需要多次访问一些值,但是每次访问这些值都需要再进行计算,这就很消耗时间;持久化可以把你经常访问的值保存起来; 持久化后的RDD将会被保留在计算节点的内存中,被后面的行动操作重复使用
list = ["hadoop", "spark", "hive"] rdd = sc.parallelize(list) print(rdd.count()) #行动操作,触发一次真正的从头到尾的计算 print(",".join(rdd.collect()))#行动操作,再触发一次真正的从头到尾的计算持久化操作能降低读写开销 persist()对一个RDD标记为持久化,但是不是立即持久化的,要通过行动操作,计算出来之后才能被持久化 .persist(MEMORY_ONLY)方法:只存储再内存中 .persist(MEMORY_AND_DISK)内存不足就存放再磁盘中 .cache()方法:会调用persist(MEMORY_ONLY) 手动把持久化的RDD从缓存中移除:.unpersist()
list = ["hadoop", "spark", "hive"] rdd = sc.parallelize(list) rdd.cache() # 会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成 print(rdd.count()) # 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中 print(",".join(rdd.collect())) # 第二次行动操作,不需要出发从头到尾的计算,只需要重复使用上面缓存的rddRDD 内部的数据集合在逻辑上(以及物理上)被划分成多个小集合,这样的每一个小集合被称为分区。 分区的优点
增加并行度减小通信开销(连表) 对于两个大表在没分区前,可以使用中间分区个数=集群中CPU核心数目 Local模式:默认为本地机器的CPU核心数目,若有设置则按设置的来 Mesos模式:默认分区数为8 Standalong和Yarn模式:Max(集群中所有CPU核心数目总和,2)
在调用textFile()和parallelize()方法时设置分区个数 sc.textFile(path, partitionNum) sc.parallelize(data, partitionNum)
也可以在通过转换操作得到新RDD时,直接调用repartition方法
data = sc.parallelize([1,2,3,4,5],2) print(len(data.glom().collect()) # 显示data这个RDD的分区数量 2 rdd = data.reparition(1) # 对data这个RDD进行重新分区 print(len(rdd.glom().collect()) # 显示rdd这个RDD的分区数量 1将不同的元素放到不同的分区[0-9]分别写入到part-000,part-001,part-002以此类推
#!/usr/bin/python from pyspark import SparkContext,SparkConf def MyPartitioner(key): print("begin MyPartition......") print("Key:%d"%key) return key%10 def main(): print("in main ......") conf = SparkConf().setMaster("local").setAppName("partition") sc = SparkContext(conf=conf) data1 = sc.parallelize(range(10),5) # print("当前分区数:%d"%len(data.glom().collect())) print("-----------------") print(data1.collect()) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] map1 = data1.map(lambda x:(x,1)) repart = map1.partitionBy(10,MyPartitioner) # 进行重分区(分区个数,分区方法) print("新分区:%d"%len(repart.glom().collect())) map2 = repart.map(lambda x:x[0]) savefile = map2.saveAsTextFile("file:home/hadoop/testPartition/") print(map1.collect()) # [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)] print(repart.collect()) # [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)] print(map2.collect()) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] if __name__ == '__main__': main()写入:(saveAsTextFile)保存的时候应该指定目录而不是文件,因为有分区的概念,一旦有多个分区,就会生成多个文件,所以要指定目录
加载:只要指定文件目录就会自动加载
