聚类是机器学习里很重要的一类方法,基本原则是将“性质相似”(这里就有相似的标准问题,比如是基于概率分布模型的相似性又或是基于距离的相似性)的对象尽可能的放在一个Cluster中而不同Cluster中对象尽可能不相似。对聚类算法而言,有三座大山需要爬过去:(1)、alargenumberofclusters,(2)、ahighfeaturedimensionality,(3)、alargenumberofdatapoints。在这三种情况下,尤其是三种情况都存在时,聚类的计算代价是非常高的,有时候聚类都无法进行下去,于是出现一种简单而又有效地方法:CanopyMethod,说简单是因为它不用什么高深的理论或推导就可以理解,说有效是因为它的实际表现确实可圈可点。
Stage1、聚类最耗费计算的地方是计算对象相似性的时候,CanopyMethod在第一阶段选择简单、计算代价较低的方法计算对象相似性,将相似的对象放在一个子集中,这个子集被叫做Canopy,通过一系列计算得到若干Canopy,Canopy之间可以是重叠的,但不会存在某个对象不属于任何Canopy的情况,可以把这一阶段看做数据预处理;
Stage2、在各个Canopy内使用传统的聚类方法(如K-means),不属于同一Canopy的对象之间不进行相似性计算。
从这个方法起码可以看出两点好处:首先,Canopy不要太大且Canopy之间重叠的不要太多的话会大大减少后续需要计算相似性的对象的个数;其次,类似于K-means这样的聚类方法是需要人为指出K的值的,通过Stage1得到的Canopy个数完全可以作为这个K值,一定程度上减少了选择K的盲目性,其中T1,T2的选择可通过交叉验证获取。
whileDisnotemptyselectelementdfromDtoinitializecanopycremovedfromDLoopthroughremainingelementsinDifdistancebetweend_iandc addcanopyctothelistofcanopiesCend 这里有几点要说明的:D指代一组数据,d_i表示D中的各个数据。 是不是还不够明白?下面用中文进行说明: 1:给我一组存放在数组里面的数据D 2:给我两个距离阈值T1,T2,且T1>T2 3:随机取D中的一个数据d作为中心,并将d从D中移除 4:计算D中所有点到d的距离distance 5:将所有distance 6:将所有distance 7:重复步骤4到6,直到D为空,形成多个canopy类 通过上面的描述,能理解T1和T2的作用了否?当与中心的距离大于T1时,这些点就不会被归入到中心所在的这个canopy类中。然当距离小于T1大于T2时,这些点会被归入到该中心所在的canopy中,但是它们并不会从D中被移除,也就是说,它们将会参与到下一轮的聚类过程中,成为新的canopy类的中心或者成员。亦即,两个Canopy类中有些成员是重叠的。这是canopy比较关键和高明的地方了,当然内在的高明之处我也讲不出来,水平不够。而当距离小于T2的时候,这些点就会被归入到该中心的canopy类中,而且会从D中被移除,也就是不会参加下一次的聚类过程了。 对传统聚类来说,例如K-means、Expectation-Maximization、GreedyAgglomerativeClustering,某个对象与Cluster的相似性是该点到Cluster中心的距离,那么聚类精度能够被很好保证的条件是: 对于每个Cluster都存在一个Canopy,它包含所有属于这个Cluster的元素。 如果这种相似性的度量为当前点与某个Cluster中离的最近的点的距离,那么聚类精度能够被很好保证的条件是: 对于每个Cluster都存在若干个Canopy,这些Canopy之间由Cluster中的元素连接(重叠的部分包含Cluster中的元素)。 数据集的Canopy划分完成后,类似于下图: (1)、将数据集向量化得到一个list后放入内存,选择两个距离阈值:T1和T2,其中T1>T2,对应上图,实线圈为T1,虚线圈为T2,T1和T2的值可以用交叉校验来确定; (2)、从list中任取一点P,用低计算成本方法快速计算点P与所有Canopy之间的距离(如果当前不存在Canopy,则把点P作为一个Canopy),如果点P与某个Canopy距离在T1以内,则将点P加入到这个Canopy; (3)、如果点P曾经与某个Canopy的距离在T2以内,则需要把点P从list中删除,这一步是认为点P此时与这个Canopy已经够近了,因此它不可以再做其它Canopy的中心了; (4)、重复步骤2、3,直到list为空结束。 并行点是比较明显的,就是生成Canopy的过程可以并行,第一阶段,各个slave可以依据存储在本地的数据,各自在本地用上述算法生成若干Canopy,最后在master机器将这些Canopy用相同算法汇总后得到最终的Canopy集合,第二阶段聚类操作就利用最终的Canopy集合进行。 用map-reduce描述就是:datanode在map阶段,利用上述算法在本地生成若干Canopy,之后通过reduce操作得到最终的Canopy集合。 正式使用Mahout之前需要做以下准备工作: 5、最后在eclipse的“File”菜单单击“Import...”,选择“ExistingMavenProjects”,Next后选择Mahout源码所在目录,将感兴趣的项目勾上,最后完成步骤即可。mahout-core、mahout-examples和mahout-math是下一步我们需要的。 Mahout聚类算法将对象以Vector的方式表示,它同时支持densevector和sparsevector,一共有三种表示方式(它们拥有共同的基类AbstractVector,里面实现了有关Vector的很多操作): (1)、DenseVector 位于mahout-math文件夹下的src/main/java中的package:org.apache.mahout.clustering.math中,它实现的时候用一个double数组表示Vector(privatedouble[]values),对于densedata可以使用它; (2)、RandomAccessSparseVector 位于mahout-math文件夹下的src/main/java中的package:org.apache.mahout.clustering.math中,它用来表示一个可以随机访问的sparsevector,只存储非零元素,数据的存储采用hash映射:OpenIntDoubleHashMap; (3)、SequentialAccessSparseVector 位于mahout-math文件夹下的src/main/java中的package:org.apache.mahout.clustering.math中,它用来表示一个顺序访问的sparsevector,同样只存储非零元素,数据的存储采用顺序映射:OrderedIntDoubleMapping; 关于OrderedIntDoubleMapping,其key为int类型,value为double类型,存储的方式让我想起了Libsvm数据表示的形式:非零元素索引:非零元素的值,这里用一个int数组存储indices,用double数组存储非零元素,要想读写某个元素,需要在indices中查找offset,由于indices应该是有序的,所以查找操作用的是二分法。 可以从Canopy.java文件及其父类中找到答案,Mahout在实现时候还是很巧妙的,一个Canopy包含的字段信息主要有: 1)、privateintid;#Canopy的id 2)、privatelongnumPoints;#Canopy中包含点的个数,这里的点都是Vector 3)、privateVectorcenter;#Canopy的重心 4)、privateVectorRadius;#Canopy的半径,这个半径是各个点的标准差,反映组内个体间的离散程度,它的计算依赖下面要说的s0、s1和s2。 它并不会真的去用一个list去存储其包含的点,因为将来的计算并不关心这些点是什么,而是与由这些点得到的三个值有关,这里用三个变量来表示: 以下是它的核心操作: 8)、publicvoidcomputeParameters();#根据s0、s1、s2计算numPoints、center和Radius,其中numPoints=(int)s0,center=s1/s0,Radius=sqrt(s2*s0-s1*s1)/s0,简单点来,假设所有点权重都是1,那么: 9)、publicvoidobserve(VectorWritablex,doubleweight);#每当有一个新的点加入当前Canopy时都需要更新s0、s1、s2的值,这个比较简单。 CanopyClustering的实现包含单机版和MR两个版本,单机版就不多说了,MR版用了两个map操作和一个reduce操作,当然是通过两个不同的job实现的,map和reduce阶段执行顺序是:CanopyMapper–>CanopyReducer–>ClusterMapper,我想对照下面这幅图来理解: (1)、首先是InputFormat,这是从HDFS读取文件后第一个要考虑的问题,mahout中提供了三种方式,都继承于FileInputFormat Format Description Key Value TextInputFormat Defaultformat;readslinesoftextfiles(默认格式,按行读取文件且不进行解析操作,基于行的文件比较有效) KeyValueInputFormat Parseslinesintokey,valpairs(同样是按照行读取,但会搜寻第一个tab字符,把行拆分为(Key,Value)pair) Everythinguptothefirsttabcharacter(第一个tab字符前的所有字符) Theremainderoftheline(该行剩下的内容) SequenceFileInputFormat AHadoop-specifichigh-performancebinaryformat(Hadoop定义的高性能二进制格式) user-defined(用户自定义) 在这里,由于使用了很多自定义的类型,如:表示vector的VectorWritable类型,表示canopy的canopy类型,且需要进行高效的数据处理,所以输入输出文件选择SequenceFileInputFormat格式。由job对象的setInputFormatClass方法来设置,如:job.setInputFormatClass(SequenceFileInputFormat.class),一般在执行聚类算法前需要调用一个job专门处理原始文件为合适的格式,比如用InputDriver,这点后面再说。 (2)、Split 一个Split块为一个map任务提供输入数据,它是InputSplit类型的,默认情况下hadoop会把文件以64MB为基数拆分为若干Block,这些Block分散在各个节点上,于是一个文件就可以被多个map并行的处理,也就是说InputSplit定义了文件是被如何切分的。 (3)、RR RecordReader类把由Split传来的数据加载后转换为适合mapper读取的(Key,Value)pair,RecordReader实例是由InputFormat决定,RR被反复调用直到Split数据处理完,RR被调用后接着就会调用Mapper的map()方法。 “RecordReader实例是由InputFormat决定”这句话怎么理解呢?比如,在CanopyClustering中,使用的是SequenceFileInputFormat,它会提供一个SequenceFileRecordReader类型,利用SequenceFile.Reader将Key和Value读取出来,这里Key和Value的类型对应Mapper的map函数的Key和Value的类型,SequenceFile的存储根据不同压缩策略分为:NONE:不压缩、RECORD:仅压缩每一个record中的value值、BLOCK:将一个block中的所有records压缩在一起,有以下存储格式: UncompressedSequenceFileHeaderRecord RecordlengthKeylengthKeyValueAsync-markereveryfew100bytesorso. Record-CompressedSequenceFileHeaderRecord RecordlengthKeylengthKeyCompressedValueAsync-markereveryfew100bytesorso. Block-CompressedSequenceFileFormatHeaderRecordBlock Compressedkey-lengthsblock-sizeCompressedkey-lengthsblockCompressedkeysblock-sizeCompressedkeysblockCompressedvalue-lengthsblock-sizeCompressedvalue-lengthsblockCompressedvaluesblock-sizeCompressedvaluesblockAsync-markereveryfew100bytesorso. (4)、CanopyMapper 1:classCanopyMapperextendsMapper setup方法在map操作执行前进行必要的初始化工作; 它的map操作很直白,就是将传来的(Key,Value)pair(以后就叫“点”吧,少写几个字)按照某种策略加入到某个Canopy中,这个策略在CanopyClusterer类里说明; 在map操作执行完后,调用cleanup操作,将中间结果写入上下文,注意这里的Key是一个固定的字符串“centroid”,将来reduce操作接收到的数据就只有这个Key,写入的value是所有Canopy的中心点(是个Vector哦)。 (5)、Combiner 可以看做是一个local的reduce操作,接受前面map的结果,处理完后发出结果,可以使用reduce类或者自己定义新类,这里的汇总操作有时候是很有意义的,因为它们都是在本地执行,最后发送出得数据量比直接发出map结果的要小,减少网络带宽的占用,对将来shuffle操作也有益。在CanopyClustering中不需要这个操作。 (6)、Partitioner&Shuffle (7)、CanopyReducer 1:publicclassCanopyReducerextendsReducer setup方法在reduce操作执行前进行必要的初始化工作,这里与mapper不同的地方是可以对阈值T1、T2(T1>T2)重新设置(这里用T3、T4表示),也就是说map阶段的阈值可以与reduce阶段的不同; reduce操作用于map操作一样的策略将局部Canopy的中心点做重新划分,最后更新各个全局Canopy的numPoints、center、radius的信息,将(Canopy标示符,Canopy对象)Pair写入上下文中。 (8)、OutputFormat 它与InputFormat类似,Hadoop会利用OutputFormat的实例把文件写在本地磁盘或HDFS上,它们都是继承自FileOutputFormat类。各个reducer会把结果写在HDFS某个目录下的单独的文件内,命名规则是part-r-xxxxx,这个是依据hadoop自动命名的,此外还会在同一目录下生成一个_SUCCESS文件,输出文件夹用FileOutputFormat.setOutputPath()设置。 到此为止构建Canopy的job结束。即CanopyMapper–>CanopyReducer阶段结束。 (9)、ClusterMapper 最后聚类阶段比较简单,只有一个map操作,以上一阶段输出的SequenceFile为输入,setup方法做一些初始化工作并从上一阶段输出目录读取文件,重建Canopy集合信息并存储在一个Canopy集合中,map操作就调用CanopyClusterer的emitPointToClosestCanopy方法实现聚类,将最终结果输出到一个SequenceFile中。 (10)、CanopyClusterer 这个类是实现Canopy算法的核心,其中: 1)、addPointToCanopies方法用来决定当前点应该加入到哪个Canopy中,在CanopyMapper和CanopyReducer中用到,流程如下: 2)、emitPointToClosestCanopy方法查找与当前点距离最近的Canopy,并将(Canopy的标示符,当前点Vector表示)输出,这个方法在聚类阶段ClusterMapper中用到。 3)、createCanopies方法用于单机生成Canopy,算法一样,实现也较简单,就不多说了。 (11)、CanopyDriver 一般都会定义这么一个driver,用来定义和配置job,组织job执行,同时提供单机版和MR版。job执行顺序是:buildClusters–>clusterData。 CanopyMapper的输入需要是(WritableComparable<>,VectorWritable)Pair,因此,一般情况下,需要对数据集进行处理以得到相应的格式,比如,在源码的/mahout-examples目录下的packageorg.apache.mahout.clustering.syntheticcontrol.canopy中有个Job.java文件提供了对CanopyClustering的一个版本: 1:privatestaticvoidrun(Pathinput,Pathoutput,DistanceMeasuremeasure,2:doublet1,doublet2)throwsIOException,InterruptedException,3:ClassNotFoundException,InstantiationException,IllegalAccessException{4:PathdirectoryContainingConvertedInput=newPath(output,5:DIRECTORY_CONTAINING_CONVERTED_INPUT);6:InputDriver.runJob(input,directoryContainingConvertedInput,7:"org.apache.mahout.math.RandomAccessSparseVector");8:CanopyDriver.run(newConfiguration(),directoryContainingConvertedInput,9:output,measure,t1,t2,true,false);10://runClusterDumper11:ClusterDumperclusterDumper=newClusterDumper(newPath(output,12:"clusters-0"),newPath(output,"clusteredPoints"));13:clusterDumper.printClusters(null);14:}利用InputDriver对数据集进行处理,将(Text,VectorWritable)Pair以sequencefile形式存储,供CanopyDriver使用。InputDriver中的作业配置如下: 1:publicstaticvoidrunJob(Pathinput,Pathoutput,StringvectorClassName)2:throwsIOException,InterruptedException,ClassNotFoundException{3:Configurationconf=newConfiguration();4:conf.set("vector.implementation.class.name",vectorClassName);5:Jobjob=newJob(conf,"InputDriverrunningoverinput:"+input);6:7:job.setOutputKeyClass(Text.class);8:job.setOutputValueClass(VectorWritable.class);9:job.setOutputFormatClass(SequenceFileOutputFormat.class);10:job.setMapperClass(InputMapper.class);11:job.setNumReduceTasks(0);12:job.setJarByClass(InputDriver.class);13:14:FileInputFormat.addInputPath(job,input);15:FileOutputFormat.setOutputPath(job,output);16:17:job.waitForCompletion(true);18:} (1)、准备若干数据集data,要求不同feature之间用空格隔开; (2)、在master的终端敲入命令:hadoopnamenode–format;start-all.sh;用于初始化namenode和启动hadoop; (3)、在HDFS上建立testdata文件夹,聚类算法会去这个文件夹加载数据集,在终端输入:hadoopdfs–mkdirtestdata; (4)、然后将各个datanode上的数据集data上传到HDFS,在终端输入hadoopdfs–putdatatestdata/ (5)、进入mahout的那些Jar文件所在路径,在终端敲入:hadoopjarmahout-examples-0.5-job.jarorg.apache.mahout.clustering.syntheticcontrol.canopy.Job; (6)、在localhost:50030查看作业执行情况,例如: 可以看到,第一个作业由InputDriver发起,输入目录是testdata,一共做了一个map操作但没有做reduce操作,第二个作业由CanopyDriver发起,做了一对mapreduce操作,这里对应Canopy生成过程,最后一个作业也由CanopyDriver发起,做了一个map操作,对应CanopyClustering过程。 (7)、将执行结果抓到本地文件夹,在终端执行:hadoopdfs–getoutputoutput,得到目录如下: Mahout中对CanopyClustering的实现是比较巧妙的,整个聚类过程用2个map操作和1个reduce操作就完成了,Canopy构建的过程可以概括为:遍历给定的点集S,设置两个阈值:T1、T2且T1>T2,选择一个点,用低成本算法计算它与其它Canpoy中心的距离,如果距离小于T1则将该点加入那个Canopy,如果距离小于T2则该点不会成为某个Canopy的中心,重复整个过程,直到S为空。