协同过滤是一种基于一组兴趣相同的用户或项目进行的推荐,它根据邻居用户(与目标用户兴趣相似的用户)的偏好信息产生对目标用户的推荐列表。
关于协同过滤的一个经典的例子就是看电影。如果你不知道哪一部电影是自己喜欢的或者评分比较高的,那么通常的做法就是问问周围的朋友,看看最近有什么好的电影推荐。而在问的时候,肯定都习惯于问跟自己口味差不多的朋友,这就是协同过滤的核心思想。因此,协同过滤是在海量数据中挖掘出小部分与你品味类似的用户,在协同过滤中,这些用户成为邻居,然后根据他们喜欢的东西组织成一个排序的目录推荐给你(如下图所示)。
importorg.apache.spark.ml.evaluation.RegressionEvaluatorimportorg.apache.spark.ml.recommendation.ALS2.根据数据结构创建读取规范:
创建一个Rating类型,即[Int,Int,Float,Long];然后建造一个把数据中每一行转化成Rating类的函数。
scala>caseclassRating(userId:Int,movieId:Int,rating:Float,timestamp:Long)definedclassRatingscala>defparseRating(str:String):Rating={|valfields=str.split("::")|assert(fields.size==4)|Rating(fields(0).toInt,fields(1).toInt,fields(2).toFloat,fields(3).toLong)|}parseRating:(str:String)Rating3.读取数据:导入implicits,读取MovieLens数据集,把数据转化成Rating类型;
scala>importspark.implicits._importspark.implicits._scala>valratings=spark.sparkContext.textFile("file:///usr/local/spark/data/mllib/als/sample_movielens_ratings.txt").map(parseRating).toDF()ratings:org.apache.spark.sql.DataFrame=[userId:int,movieId:int...2morefields]然后,我们把数据打印看一下:
scala>ratings.show()+------+-------+------+----------+|userId|movieId|rating|timestamp|+------+-------+------+----------+|0|2|3.0|1424380312||0|3|1.0|1424380312||0|5|2.0|1424380312||0|9|4.0|1424380312||0|11|1.0|1424380312||0|12|2.0|1424380312||0|15|1.0|1424380312||0|17|1.0|1424380312||0|19|1.0|1424380312||0|21|1.0|1424380312||0|23|1.0|1424380312||0|26|3.0|1424380312||0|27|1.0|1424380312||0|28|1.0|1424380312||0|29|1.0|1424380312||0|30|1.0|1424380312||0|31|1.0|1424380312||0|34|1.0|1424380312||0|37|1.0|1424380312||0|41|2.0|1424380312|+------+-------+------+----------+onlyshowingtop20rows3.构建模型把MovieLens数据集划分训练集和测试集
scala>valArray(training,test)=ratings.randomSplit(Array(0.8,0.2))training:org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]=[userId:int,movieId:int...2morefields]test:org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]=[userId:int,movieId:int...2morefields]使用ALS来建立推荐模型,这里我们构建了两个模型,一个是显性反馈,一个是隐性反馈
scala>valalsExplicit=newALS().setMaxIter(5).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")alsExplicit:org.apache.spark.ml.recommendation.ALS=als_05fe5d65ffc3scala>valalsImplicit=newALS().setMaxIter(5).setRegParam(0.01).setImplicitPrefs(true).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")alsImplicit:org.apache.spark.ml.recommendation.ALS=als_7e9b959fbdae在ML中的实现有如下的参数:
可以调整这些参数,不断优化结果,使均方差变小。比如:imaxIter越大,regParam越小,均方差会越小,推荐结果较优。
接下来,把推荐模型放在训练数据上训练:
scala>valmodelExplicit=alsExplicit.fit(training)modelExplicit:org.apache.spark.ml.recommendation.ALSModel=als_05fe5d65ffc3scala>valmodelImplicit=alsImplicit.fit(training)modelImplicit:org.apache.spark.ml.recommendation.ALSModel=als_7e9b959fbdae4.模型预测使用训练好的推荐模型对测试集中的用户商品进行预测评分,得到预测评分的数据集
scala>valpredictionsExplicit=modelExplicit.transform(test)predictionsExplicit:org.apache.spark.sql.DataFrame=[userId:int,movieId:int...3morefields]scala>valpredictionsImplicit=modelImplicit.transform(test)predictionsImplicit:org.apache.spark.sql.DataFrame=[userId:int,movieId:int...3morefields]我们把结果输出,对比一下真实结果与预测结果:
通过计算模型的均方根误差来对模型进行评估,均方根误差越小,模型越准确:
scala>valevaluator=newRegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")evaluator:org.apache.spark.ml.evaluation.RegressionEvaluator=regEval_bc9d91ae7b1ascala>valrmseExplicit=evaluator.evaluate(predictionsExplicit)rmseExplicit:Double=1.6995189118765517scala>valrmseImplicit=evaluator.evaluate(predictionsImplicit)rmseImplicit:Double=1.8011620822359165打印出两个模型的均方根误差:
scala>println(s"Explicit:Root-mean-squareerror=$rmseExplicit")Explicit:Root-mean-squareerror=1.6995189118765517scala>println(s"Implicit:Root-mean-squareerror=$rmseImplicit")Implicit:Root-mean-squareerror=1.8011620822359165可以看到打分的均方差值为1.69和1.80左右。由于本例的数据量很少,预测的结果和实际相比有一定的差距。
厦门大学软件工程系副教授,2009年毕业于中国人民大学计算机系获工学博士学位。主要研究方向是网络数据管理,车载网络,大数据分析和管理。