Spark

    科技2022-08-16  112

    文章目录

    Spark概述Spark生态系统Spark基本概念Spark运行流程RDD运行原理 使用PySparkRDD的创建和读取文件RDD操作转换操作filtermapflatMapgroupByKeyreduceByKeyunionintersectionmapValues(f)flatMapValues(f)sortByKeyKeys(),Values()join(rdd)leftOuterJoin(rdd), rightOuterJoin(rdd) 行动操作持久化持久化操作 分区分区原则设置分区分区实例综合实例 注意:

    Spark概述

    Spark来源于AMP实验室,在2009年被开发,Spark是对MapReduce的缺陷进行改进的

    特点:

    运行速度快

    内存计算,循环数据流 基于DAG的执行引擎,可以进行流水线优化

    2.支持多种语言

    Scala Java Python R

    通用性

    SQL查询:Spark SQL 流式计算:Spark Streaming 机器学习:Spark MLlib 图算法组件:Spark的GraphX

    运行模式多样

    可访问HDFS Cassandra HBase Hive等

    Spark生态系统

    在企业中用不同的技术来满足不同的应用场景可能会出现一下问题: 无法无缝共享数据,使用成本高,资源利用不充分

    而Spark满足一个软件栈满足多种应用需求

    我们常说的Spark指的是Spark Core;而不是一整个Spark生态系统,Spark Core是用来帮你完成批处理的,上面的其他组件是用来满足不同需求的

    Spark基本概念

    Spark运行流程

    添加链接描述

    RDD运行原理

    RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。

    使用PySpark

    RDD的创建和读取文件

    首先要导入pyspark模块 Spark中本地运行模式有3种,如下

    (1)local 模式:本地单线程运行; (2)local[k]模式:本地K个线程运行; (3)local[*]模式:用本地尽可能多的线程运行。

    from pyspark import SparkContext # 导入pyspark sc = SparkContext("设置模式","自定义程序名") # 设置模式 rdd1 = sc.textFile("file:///本地文件") # 导入本地文件 rdd2.1 = sc.textFile("hdfs://localhost:9000/xxxxxxxx") # 导入HDFS文件 rdd2.2 = sc.textFile("/xxxxxxxx") # 导入HDFS文件 rdd2.3 = sc.textFile("xxxx.txt") # 导入HDFS文件

    文本文件中的每一行代表一个RDD元素

    RDD操作

    转换操作

    转换操作是对RDD的类型进行逻辑上的转换而已,并没有实际进行运算; 实际的运算要用行动操作

    操作含义filter(func)筛选出满足函数func的元素,并返回一个新的数据集map(func)将每个元素传递到函数func中,并将结果返回一个新的数据集flatMap(func)与map相似,但每个输入元素都可以映射到0或者多个输出结果groupByKey()应用于(K,V)键值对的数据集,返回一个新的(K,Iterable)形式的数据集reduceByKey(func)应用于(K,V)键值对的数据集,返回一个新的(K,V)形式的数据集,其中每个值是将每个Key传递到函数func中进行聚合操作的结果;func是对值进行处理的!reduceByKey相当于是先做了groupByKey,再对值进行处理union(rdd)对一个RDD和参数RDD进行并集计算,然后返回新的RDDintersection(rdd)对RDD和另一个RDD进行交集计算,然后返回新的RDDsortBy(func)通过func来对RDD内部元素进行排序,默认为升序,False为降序distinct()去重,返回新的RDD #!/usr/bin/python from pyspark import SparkContext sc = SparkContext("local", "testSparkApi") rdd = sc.textFile("file:///home/hadoop/sparktest.txt") ''' Hadoop is good Spark is fast Spark is better '''

    filter

    filtered = rdd.filter(lambda x:"Spark" in x) result = filtered.collect() print("RDD filterApi:%s"%result) "RDD filterApi:['Spark is fast', 'Spark is better']"

    map

    maped = rdd.map(lambda x:"hahahaha"+x) # 在每个元素前面加上hahahaha result = maped.collect() print("RDD mapApi:%s"%result) "RDD mapApi:['hahahahaHadoop is good', 'hahahahaSpark is fast', 'hahahahaSpark is better']"

    flatMap

    将每个元素进行split操作(map),得到的内容再提取出来,形成一个新的元素(flat)

    fm = rdd.flatMap(lambda x:x.split()) result = fm.collect() print("RDD flatMapApi:%s"%result) "RDD flatMapApi:['Hadoop', 'is', 'good', 'Spark', 'is', 'fast', 'Spark', 'is', 'better']"

    groupByKey

    将相同键的值封装成Iterable数据类型

    fm = rdd.flatMap(lambda x: x.split()) mapped = fm.map(lambda x:(x,1)) # 将每个元素变成(K,1)的形式 print(mapped.foreach(print)) ''' ('Hadoop', 1) ('is', 1) ('good', 1) ('Spark', 1) ('is', 1) ('fast', 1) ('Spark', 1) ('is', 1) ('better', 1) ''' gbk = mapped.groupByKey() result = gbk.collect() print("RDD groupByKeyApi:%s"%result) '''RDD groupByKeyApi:[('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d63c8>), ('is', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d62b0>), ('good', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d6518>), ('Spark', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d6438>), ('fast', <pyspark.resultiterable.ResultIterable object at 0x7f6ad27d65f8>), ('better', <pyspark.resultiterable.ResultIterable object at 0x7f6ad3419c50>)] ‘’‘ # ------------------------------------ from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd = sc.parallelize([("a",1),("b",1),("c",1),("a",2),("b",2),("c",2),("a",3),("d",4),("b",5)]) result = rdd.groupByKey().mapValues(list).collect() print(result) # [('a', [1, 2, 3]), ('b', [1, 2, 5]), ('c', [1, 2]), ('d', [4])]

    reduceByKey

    将所有元素相同键的值进行聚合操作,func是对值进行操作的,结果返回新的(K,V)值是聚合操作后的结果

    fm = rdd.flatMap(lambda x: x.split()) mapped = fm.map(lambda x:(x,1)) rbk = mapped.reduceByKey(lambda a,b:a+b) result = rbk.collect() print("RDD reduceByKeyApi:%s"%result) ''' RDD reduceByKeyApi:[('Hadoop', 1), ('is', 3), ('good', 1), ('Spark', 2), ('fast', 1), ('better', 1)] '''

    union

    from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = sc.parallelize([5,6,7,8,1]) tr = rdd1.union(rdd2) result = tr.collect() print(result) # [1, 2, 3, 4, 5, 5, 6, 7, 8, 1]

    intersection

    from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = sc.parallelize([5,6,7,8,1]) tr = rdd1.intersection(rdd2) result = tr.collect() print(result) # [1, 5]

    mapValues(f)

    在不改变原有Key的基础上,对Key-value结构的RDD的Value值进行一个map操作

    from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd = sc.parallelize([("a",["apple","banana","lemon"]),("b",["grapes"])]) result = rdd.mapValues(lambda x:len(x)).collect() print(result) # [('a', 3), ('b', 1)]

    flatMapValues(f)

    对Key-value结构的RDD先执行mapValue操作(对值进行操作),在执行扁平化操作

    from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd = sc.parallelize([("a",["apple","banana","lemon"]),("b",["grapes"])]) result = rdd.flatMapValues(lambda x:x).collect() print(result) # [('a', 'apple'), ('a', 'banana'), ('a', 'lemon'), ('b', 'grapes')]

    sortByKey

    根据键来排序

    from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd = sc.parallelize([("a",1),("b",1),("c",1),("a",2),("b",2),("c",2),("a",3),("d",4),("b",5),("3",2),("1",4)]) result = rdd.groupByKey().mapValues(list).sortByKey().collect() print(result) # [('1', [4]), ('3', [2]), ('a', [1, 2, 3]), ('b', [1, 2, 5]), ('c', [1, 2]), ('d', [4])]

    Keys(),Values()

    返回RDD内部的键,值

    join(rdd)

    from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) x=sc.parallelize([("a",1),("b",2),("c",3)]) y=sc.parallelize([("a",7),("a",8),("a",0)]) result = y.join(x).collect() print(result) # [('a', (7, 1)), ('a', (8, 1)), ('a', (0, 1))] result2 = x.join(y).collect() print(result2) # [('a', (1, 7)), ('a', (1, 8)), ('a', (1, 0))]

    leftOuterJoin(rdd), rightOuterJoin(rdd)

    行动操作

    对RDD之前的所有转换进行计算,真正执行!

    操作描述count()返回数据集中的元素个数collect()以数组的形式返回数据集中的所有元素fitst()返回数据集中的第一个元素take(n)以数组的形式返回数据集中的前n个元素reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素[数值求和]foreach(func)将数据集中的每个元素传递到函数func中运行top(n)返回RDD内部元素的前n个最大值saveAsTextFile()将RDD中的元素以字符串的格式存储在文件系统中collectAsTextFile()将Key-Value结构的RDD以字典的形式返回countByKey()对每个Key键的元素进行统计

    持久化

    有时候需要多次访问一些值,但是每次访问这些值都需要再进行计算,这就很消耗时间;持久化可以把你经常访问的值保存起来; 持久化后的RDD将会被保留在计算节点的内存中,被后面的行动操作重复使用

    list = ["hadoop", "spark", "hive"] rdd = sc.parallelize(list) print(rdd.count()) #行动操作,触发一次真正的从头到尾的计算 print(",".join(rdd.collect()))#行动操作,再触发一次真正的从头到尾的计算

    持久化操作

    持久化操作能降低读写开销 persist()对一个RDD标记为持久化,但是不是立即持久化的,要通过行动操作,计算出来之后才能被持久化 .persist(MEMORY_ONLY)方法:只存储再内存中 .persist(MEMORY_AND_DISK)内存不足就存放再磁盘中 .cache()方法:会调用persist(MEMORY_ONLY) 手动把持久化的RDD从缓存中移除:.unpersist()

    list = ["hadoop", "spark", "hive"] rdd = sc.parallelize(list) rdd.cache() # 会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成 print(rdd.count()) # 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中 print(",".join(rdd.collect())) # 第二次行动操作,不需要出发从头到尾的计算,只需要重复使用上面缓存的rdd

    分区

    RDD 内部的数据集合在逻辑上(以及物理上)被划分成多个小集合,这样的每一个小集合被称为分区。   分区的优点

    增加并行度减小通信开销(连表) 对于两个大表在没分区前,可以使用中间

    分区原则

    分区个数=集群中CPU核心数目 Local模式:默认为本地机器的CPU核心数目,若有设置则按设置的来 Mesos模式:默认分区数为8 Standalong和Yarn模式:Max(集群中所有CPU核心数目总和,2)

    设置分区

    在调用textFile()和parallelize()方法时设置分区个数 sc.textFile(path, partitionNum) sc.parallelize(data, partitionNum)

      也可以在通过转换操作得到新RDD时,直接调用repartition方法

    data = sc.parallelize([1,2,3,4,5],2) print(len(data.glom().collect()) # 显示data这个RDD的分区数量 2 rdd = data.reparition(1) # 对data这个RDD进行重新分区 print(len(rdd.glom().collect()) # 显示rdd这个RDD的分区数量 1

    分区实例

    将不同的元素放到不同的分区[0-9]分别写入到part-000,part-001,part-002以此类推

    #!/usr/bin/python from pyspark import SparkContext,SparkConf def MyPartitioner(key): print("begin MyPartition......") print("Key:%d"%key) return key%10 def main(): print("in main ......") conf = SparkConf().setMaster("local").setAppName("partition") sc = SparkContext(conf=conf) data1 = sc.parallelize(range(10),5) # print("当前分区数:%d"%len(data.glom().collect())) print("-----------------") print(data1.collect()) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] map1 = data1.map(lambda x:(x,1)) repart = map1.partitionBy(10,MyPartitioner) # 进行重分区(分区个数,分区方法) print("新分区:%d"%len(repart.glom().collect())) map2 = repart.map(lambda x:x[0]) savefile = map2.saveAsTextFile("file:home/hadoop/testPartition/") print(map1.collect()) # [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)] print(repart.collect()) # [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)] print(map2.collect()) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] if __name__ == '__main__': main()

    综合实例

    from pyspark import SparkContext,SparkConf conf = SparkConf().setMaster("local").setAppName("case") sc = SparkContext(conf=conf) rdd = sc.textFile("file:home/hadoop/sparktest.txt") wordcount = rdd.flatMap(lambda x : x.strip().split(" ")).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b) result = dealed.collect() print(result) from pyspark import SparkContext,SparkConf char = ''',.?!_<>[]{}/:#"-\'''' def ffilter(s): s = s.strip() for i in s: if i in char: s = s.replace(i,' ') return s.split() conf = SparkConf().setMaster("local").setAppName("wordcountSpark") sc = SparkContext(conf=conf) rdd = sc.textFile("file:opt/module/spark2.1.1/README.md") # wordcount = rdd.flatMap(lambda x:x.strip().strip(",").strip(".").split()).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b) wordcount = rdd.flatMap(ffilter).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b) wordcount.saveAsTextFile("file:home/hadoop/wordcountSpark6/")

    注意:

    写入:(saveAsTextFile)保存的时候应该指定目录而不是文件,因为有分区的概念,一旦有多个分区,就会生成多个文件,所以要指定目录

    加载:只要指定文件目录就会自动加载

    Processed: 0.019, SQL: 9