spark之MLLIB

    科技2022-07-11  113

    1、MLlib简介

    MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。具体来说,其主要包括以下几方面的内容:

    算法工具:常用的学习算法,如分类、回归、聚类和协同过滤;特征化工具:特征提取、转化、降维,和选择工具;管道(Pipeline):用于构建、评估和调整机器学习管道的工具;持久性:保存和加载算法,模型和管道;实用工具:线性代数,统计,数据处理等工具。

    2、MLlib支持的主要机器学习算法

    Spark在机器学习方面的发展非常快,目前已经支持了主流的统计和机器学习算法。纵观所有基于分布式架构的开源机器学习库,MLlib可以算是计算效率最高的。MLlib目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤。下表列出了目前MLlib支持的主要的机器学习算法:

    3、机器学习工作流(PipeLine)

    3.1、工作流的组成

    DataFrame:使用Spark SQL中的DataFrame作为数据集,它可以容纳各种数据类型。 较之 RDD,包含了 schema 信息,更类似传统数据库中的二维表格。它被 ML Pipeline 用来存储源数据。例如,DataFrame中的列可以是存储的文本,特征向量,真实标签和预测的标签等。Transformer:翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个 Transformer。它可以把 一个不包含预测标签的测试数据集 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame。技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame。

     

    Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生产一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。如一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。

    Parameter:Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。

    PipeLine:翻译为工作流或者管道。工作流将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。

    3.2、如何构建一个工作流

    定义 Pipeline 中的各个工作流阶段PipelineStage,(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和 评估器,就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。

    pipeline =Pipeline(stages=[stage1,stage2,stage3])

    在训练阶段,管道如下,以DataFrame存储的行形式的文本(Raw text)经过Tokenizer转化变成了词(Words),词经HashingTF转化变成了特征(Feature vectors),特征经LR得到了回归模型。

    测试过程的管道如下:

    3.3、构建管道实例 

    设置sparkSession

    from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("spark ML").getOrCreate()

    设置机器学习相关包

    from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF,Tokenizer

    创建训练集

    #创建DataFrame训练集 #训练集包括字段id,text,label df_train = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"])

    定义工作流,构建训练管道,得到训练模型

    #构建转化器和评估器 #定义分词器,spark自带的Tokenizer以空格分词;inputCol为输入的列名,outputCol为转化输出的列名 tokenizer=Tokenizer(inputCol="text", outputCol="words") hashTf=HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) #创建训练管道 pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) #训练模型 model=pipeline.fit(df_train)

    构建测试集及预测

    #测试DataFrame构建 df_test=spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop") ], ["id", "text"]) #测试 predict=model.transform(df_test) #显示预测结果id|text|words|features|rawPrediction|probability|prediction predict.show()

    参考

    http://dblab.xmu.edu.cn/blog/1763-2/

     

    Processed: 0.046, SQL: 8