package com.mllib
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
object MoviesDemo {
/*
* 推荐
* 两种方式
* 基于人物
* 基于物品
*
* 最小二乘法回归,根据原有数据进行预测。把用户评分,商品评分分为两个矩阵进行相乘计算
* */
def main(args: Array[String]): Unit = {
//读取数据
val spark = SparkSession.builder()
.appName("happiness")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")//减少日志信息
val moviesData = spark.read.option("header","true")
.csv("data/movies/movies_metadata.csv")
.select("id","title")
.toDF("movieId","title")
val ratingData = spark.read.option("header","true")
.csv("data/movies/ratings_small.csv")
.select("userId","movieId","rating")
//选取需要数据
//结合数据集 第一种可以转换RDD 第二种类似联表查询 指定连接表条件
val movieRatingData = moviesData.join(ratingData,"movieId")
//转换成Rating数据类型
import spark.implicits._
val movieRating = movieRatingData.map(
line => {
Rating(line.getString(2).toInt,
line.getString(0).toInt,
line.getString(3).toDouble)
}
).rdd
//1.划分训练集和测试集
val Array(tranData,testData) = movieRating.randomSplit(Array(0.8,0.2))
val model = ALS.train(tranData,10,10)
//针对ID为1的用户 推荐10个电影
val recommendMovies = model.recommendProducts(1,10)
//获得电影标题以及ID
val movie = moviesData.map(x => (x.getString(0),x.getString(1)))
.rdd.collectAsMap()
recommendMovies.foreach(
x => println(movie(x.product.toString))
)
//为群体进行推荐
val moviePredict = model.predict(testData.map(x => (x.user,x.product)))
//moviePredict.take(10).foreach(println)
//开始计算模型
//通过平方差做计算
//先获得原来评分
val testRating = testData.map{
case Rating(user,product,rating) => {
((user, product), rating)
}
}
val result = moviePredict.map{
case Rating(user,product,rating) => {
((user, product), rating)
}
}.join(testRating)
// 预测分数 实际分数
//((434,780),(3.034357645253258,1.0))
result.take(10).foreach(println)
//开始计算平方误差
val MES = result.map{
case ((user,product),(r1,r2)) => {
val ex = r1 - r2
//开平方
ex * ex
}
}.mean()
println(s"MSE= ${MES}")
}
}
转载请注明原文地址:https://blackberry.8miu.com/read-42597.html