协同过滤是一种基于一组兴趣相同的用户或项目进行的推荐,它根据邻居用户(与目标用户兴趣相似的用户)的偏好信息产生对目标用户的推荐列表。
关于协同过滤的一个经典的例子就是看电影。如果你不知道哪一部电影是自己喜欢的或者评分比较高的,那么通常的做法就是问问周围的朋友,看看最近有什么好的电影推荐。而在问的时候,肯定都习惯于问跟自己口味差不多的朋友,这就是协同过滤的核心思想。因此,协同过滤是在海量数据中挖掘出小部分与你品味类似的用户,在协同过滤中,这些用户成为邻居,然后根据他们喜欢的东西组织成一个排序的目录推荐给你(如下图所示)。
下面代码读取spark的示例文件,文件中每一行包括一个用户id、商品id和评分。我们使用默认的ALS.train()方法来构建推荐模型并评估模型的均方差。
importorg.apache.spark.SparkConfimportorg.apache.spark.SparkContextimportorg.apache.spark.mllib.recommendation.ALSimportorg.apache.spark.mllib.recommendation.MatrixFactorizationModelimportorg.apache.spark.mllib.recommendation.Rating2.读取数据:首先,读取文本文件,把数据转化成rating类型,即[Int,Int,Double]的RDD;
scala>valdata=sc.textFile("../data/mllib/als/test.data")data:org.apache.spark.rdd.RDD[String]=../data/mllib/als/test.dataMapPartitionsRDD[1]attextFileat
scala>ratings.foreach{x=>println(x)}Rating(1,1,5.0)Rating(3,2,5.0)Rating(1,2,1.0)Rating(3,3,1.0)Rating(1,3,5.0)Rating(3,4,5.0)Rating(1,4,1.0)Rating(4,1,1.0)Rating(2,1,5.0)Rating(4,2,5.0)Rating(2,2,1.0)Rating(4,3,1.0)Rating(2,3,5.0)Rating(4,4,5.0)Rating(2,4,1.0)Rating(3,1,1.0)其中Rating中的第一个int是user编号,第二个int是item编号,最后的double是user对item的评分。
划分训练集和测试集,比例分别是0.8和0.2。
scala>valsplits=ratings.randomSplit(Array(0.8,0.2))splits:Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating]]=Array(MapPartitionsRDD[5]atrandomSplitat
scala>valrank=10rank:Int=10scala>valnumIterations=10numIterations:Int=10scala>valmodel=ALS.train(training,rank,numIterations,0.01)model:org.apache.spark.mllib.recommendation.MatrixFactorizationModel=org.apache.spark.mllib.recommendation.MatrixFactorizationModel@1f14d4a在MLlib中的实现有如下的参数:
可以调整这些参数,不断优化结果,使均方差变小。比如:iterations越多,lambda较小,均方差会较小,推荐结果较优。上面的例子中调用了ALS.train(ratings,rank,numIterations,0.01),我们还可以设置其他参数,调用方式如下:
valmodel=newALS().setRank(params.rank).setIterations(params.numIterations).setLambda(params.lambda).setImplicitPrefs(params.implicitPrefs).setUserBlocks(params.numUserBlocks).setProductBlocks(params.numProductBlocks).run(training)4.利用模型进行预测从test训练集中获得只包含用户和商品的数据集:
scala>valtestUsersProducts=test.map{caseRating(user,product,rate)=>|(user,product)|}usersProducts:org.apache.spark.rdd.RDD[(Int,Int)]=MapPartitionsRDD[3868]atmapat
scala>valpredictions=|model.predict(testUsersProducts).map{caseRating(user,product,rate)|=>((user,product),rate)|}predictions:org.apache.spark.rdd.RDD[((Int,Int),Double)]=MapPartitionsRDD[3877]atmapat
scala>valratesAndPreds=test.map{caseRating(user,product,rate)=>|((user,product),rate)|}.join(predictions)ratesAndPreds:org.apache.spark.rdd.RDD[((Int,Int),(Double,Double))]=MapPartitionsRDD[3881]atjoinat
scala>ratesAndPreds.foreach(println)((3,1),(1.0,-0.22756397347958202))((4,2),(5.0,4.388061223429636))((4,1),(1.0,-0.1847678805249373))比如,第一条结果记录((3,1),(1.0,-0.22756397347958202))中,(3,1)分别表示3号用户和1号商品,而1.0是实际的估计分值,-0.22756397347958202是经过推荐的预测分值。
然后计算均方差,这里的r1就是真实结果,r2就是预测结果:
scala>valMSE=ratesAndPreds.map{case((user,product),(r1,r2))=>|valerr=(r1-r2)|err*err|}.mean()MSE:Double=1.0950191019929887打印出均方差值:
scala>println("MeanSquaredError="+MSE)MeanSquaredError=1.0950191019929887我们可以看到打分的均方差值为1.09左右。由于本例的数据量很少,预测的结果和实际相比有一定的差距。上面的例子只是对测试集进行了评分,我们还可以进一步的通过调用model.recommendProducts给特定的用户推荐商品以及model.recommendUsers来给特定商品推荐潜在用户。
厦门大学软件工程系副教授,2009年毕业于中国人民大学计算机系获工学博士学位。主要研究方向是网络数据管理,车载网络,大数据分析和管理。