`
thecloud
  • 浏览: 877852 次
文章分类
社区版块
存档分类
最新评论

Hadoop 实现协同过滤 (example in <Mahout in action> chapter 6) Part 2

 
阅读更多
这部分内容接Hadoop 实现协同过滤 (example in <Mahout in action> chapter 6) Part 1 的实现

<style type="text/css"> <!-- @page {margin:2cm} td p {margin-bottom:0cm} pre.cjk {font-family:"文泉驿微米黑",monospace} pre.ctl {font-family:"Lohit Hindi",monospace} p {margin-bottom:0.21cm} pre.cjk {font-family:"文泉驿微米黑",monospace} pre.ctl {font-family:"Lohit Hindi",monospace} p {margin-bottom:0.21cm} --> </style>

第四个MR:MR4map不做任何事情;MR4reduce输出就是把MR(31)MR(32)的相同的itemID整合一下而已(注意此处的输入为两个路径):如下:

101  {107:1.0,106:2.0,105:2.0,104:4.0,103:4.0,102:3.0,101:5.0} [5 1 4 2 3] [4.0 5.0 5.0 2.0 2.5]
...
WiKiDriver4.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;

public class WiKiDriver4 {

	/**
	 * @param args
	 * @throws IOException 
	 * @throws InterruptedException 
	 * @throws ClassNotFoundException 
	 */
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf1 = new Configuration();
	    String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
	    if (otherArgs.length != 3) {
	      System.err.println("Usage: WiKiDriver4 <in1><in2> <out>");
	      System.exit(2);
	    }
	    Job job1 = new Job(conf1, "wiki  job four");
	    job1.setNumReduceTasks(1);
	    job1.setJarByClass(WiKiDriver4.class);
	    job1.setInputFormatClass(SequenceFileInputFormat.class);
	    job1.setMapperClass(WikiMapper4.class);
	    job1.setMapOutputKeyClass(IntWritable.class);
		job1.setMapOutputValueClass(VectorOrPrefWritable.class);	
	    job1.setReducerClass(WiKiReducer4.class);
	    job1.setOutputKeyClass(IntWritable.class);
	   job1.setOutputValueClass(VectorAndPrefsWritable.class);
	    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
	    SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0]));
	    SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[1]));
	    SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[2]));   
	    if(!job1.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }
	}
}
WiKiMapper4.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;

public class WikiMapper4 extends Mapper<IntWritable ,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable> {

	public void map(IntWritable key,VectorOrPrefWritable value,Context context) throws IOException, InterruptedException{
		context.write(key, value);
	}
}
WiKiReducer4.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.Vector;

public class WiKiReducer4 extends Reducer<IntWritable,VectorOrPrefWritable,IntWritable,VectorAndPrefsWritable> {
		public void reduce(IntWritable key, Iterable<VectorOrPrefWritable> values,Context context) throws IOException, InterruptedException{
			List<Long> userfs=new ArrayList<Long>();
			List<Float> prefs=new ArrayList<Float>();
			Vector v=null;
			for(VectorOrPrefWritable value:values){
				if(value.getVector()!=null){
					v=value.getVector();
				}else{
					userfs.add(value.getUserID());
					prefs.add(value.getValue());
				 }
			}
			context.write(key, new VectorAndPrefsWritable(v,userfs,prefs));
	//		System.out.println("key ,itemid:"+key.toString()+", information:"+v+","+userfs+","+prefs);
		} 
}
第五个MR:

<style type="text/css"> <!-- @page {margin:2cm} pre.cjk {font-family:"文泉驿微米黑",monospace} pre.ctl {font-family:"Lohit Hindi",monospace} p {margin-bottom:0.21cm} --> </style>

map:针对MR4的输出的每一行中的每一个用户,用这个用户的评分值(value)去乘以项目之间的相似度向量,比如针对第一条记录中的用户3,则有 
Vectorforuser3=[1.0 2.0 2.0 4.0 4.0 3.0 5.0]* 2.5  
map的输出为 key : 3    value :  Vectorforuser3;
map的输出应该如下所示:

alluserids:[5, 1, 4, 2, 3]
,userid:5,vector:{107:4.0,106:8.0,105:8.0,104:16.0,103:16.0,102:12.0,101:20.0}
,userid:1,vector:{107:5.0,106:10.0,105:10.0,104:20.0,103:20.0,102:15.0,101:25.0}
,userid:4,vector:{107:5.0,106:10.0,105:10.0,104:20.0,103:20.0,102:15.0,101:25.0}
,userid:2,vector:{107:2.0,106:4.0,105:4.0,104:8.0,103:8.0,102:6.0,101:10.0}
,userid:3,vector:{107:2.5,106:5.0,105:5.0,104:10.0,103:10.0,102:7.5,101:12.5}
。。。
<style type="text/css"> <!-- @page {margin:2cm} pre.cjk {font-family:"文泉驿微米黑",monospace} pre.ctl {font-family:"Lohit Hindi",monospace} p {margin-bottom:0.21cm} --> </style>

Combine : 针对map的输出,把相同 key(userID)的向量对应相加,得到的向量和即为该userID的对各个项目的评分;
combine的输出应该如下所示:

userid:1,vecotr:{107:5.0,106:18.0,105:15.5,104:33.5,103:39.0,102:31.5,101:44.0}
userid:2,vecotr:{107:4.0,106:20.5,105:15.5,104:36.0,103:41.5,102:32.5,101:45.5}
。。。
<style type="text/css"> <!-- @page {margin:2cm} pre.cjk {font-family:"文泉驿微米黑",monospace} pre.ctl {font-family:"Lohit Hindi",monospace} p {margin-bottom:0.21cm} --> </style>

Reduce:针对combine的输出,把用户已经评价过分的项目筛选掉,然后按照评分值的大小有大到小排序输出,即为用户推荐项目;
最后的输出为:

1	[104:33.5,106:18.0,105:15.5,107:5.0]
2	[106:20.5,105:15.5,107:4.0]
3	[103:26.5,102:20.0,106:17.5]
4	[102:37.0,105:26.0,107:9.5]
5	[107:11.5]
WiKiDriver5.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;

public class WiKiDriver5 {

	/**
	 * @param args
	 * @throws IOException 
	 * @throws InterruptedException 
	 * @throws ClassNotFoundException 
	 */
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf1 = new Configuration();
	    String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: WiKiDriver5 <in> <out>");
	      System.exit(2);
	    }
	    Job job1 = new Job(conf1, "wiki  job five");
	    job1.setNumReduceTasks(1);
	    job1.setJarByClass(WiKiDriver5.class);
	    job1.setInputFormatClass(SequenceFileInputFormat.class);
	    job1.setMapperClass(WikiMapper5.class);
	    job1.setMapOutputKeyClass(VarLongWritable.class);
		job1.setMapOutputValueClass(VectorWritable.class);
		
		job1.setCombinerClass(WiKiCombiner5.class);
	    job1.setReducerClass(WiKiReducer5.class);
	    job1.setOutputKeyClass(VarLongWritable.class);
	    job1.setOutputValueClass(RecommendedItemsWritable.class);
	//   job1.setOutputFormatClass(SequenceFileOutputFormat.class);
	    SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0]));

	    FileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1]));   
	    if(!job1.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }
	}
}
WiKiMapper5.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class WikiMapper5 extends Mapper<IntWritable ,VectorAndPrefsWritable,VarLongWritable,VectorWritable>{
	
	public void map(IntWritable key,VectorAndPrefsWritable vectorAndPref,Context context) throws IOException, InterruptedException{
		Vector coo=vectorAndPref.getVector();
		List<Long> userIds=vectorAndPref.getUserIDs();
		List<Float> prefValues=vectorAndPref.getValues();
		//System.out.println("alluserids:"+userIds);
		for(int i=0;i<userIds.size();i++){
			long userID=userIds.get(i);
			float prefValue=prefValues.get(i);
			Vector par=coo.times(prefValue);
			context.write(new VarLongWritable(userID), new VectorWritable(par));
			//System.out.println(",userid:"+userID+",vector:"+par);  //  if the user id = 3 is the same as my paper then is right
		}
	//	System.out.println();	
	}
}
WiKiCombiner5.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class WiKiCombiner5 extends Reducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable> {
		public void reduce(VarLongWritable key, Iterable<VectorWritable> values,Context context) throws IOException, InterruptedException{
			Vector partial=null;
			for(VectorWritable v:values){
				partial=partial==null?v.get():partial.plus(v.get());
			}
			context.write(key, new VectorWritable(partial));
			System.out.println("userid:"+key.toString()+",vecotr:"+partial);//   here also should be the same as my paper's result
		}
}
WiKiReducer5.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.*;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.impl.common.FastMap;
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class WiKiReducer5 extends Reducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable> {
	
	private int recommendationsPerUser=RECOMMENDATIONSPERUSER;
	private String path=JOB1OUTPATH;
	
	private static FastMap<Integer,String> map=new FastMap<Integer,String>();
	public void setup(Context context) throws IOException{
		Configuration conf=new Configuration();
		FileSystem fs=FileSystem.get(URI.create(path), conf);
		Path tempPath=new Path(path);
		SequenceFile.Reader reader=null;
		try {
			reader=new SequenceFile.Reader(fs, tempPath, conf);
			Writable key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
			Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); 
		//	long position = reader.getPosition();  
			while (reader.next(key, value)) {  
				map.put(Integer.parseInt(key.toString()), value.toString());
		//	    System.out.println(key.toString()+","+value.toString());
			//    position = reader.getPosition(); // beginning of next record  
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}  
	}
	
	public void reduce(VarLongWritable key, Iterable<VectorWritable> values,Context context) throws IOException, InterruptedException{
		
			int userID=(int)key.get();
			Vector rev=null;
			for(VectorWritable vec:values){
				rev=rev==null? vec.get():rev.plus(vec.get());
			}
			Queue<RecommendedItem>topItems=new PriorityQueue<RecommendedItem>(
					recommendationsPerUser+1,
					Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance())
					);
			Iterator<Vector.Element>recommendationVectorIterator=
					rev.iterateNonZero();
			while(recommendationVectorIterator.hasNext()){
				Vector.Element e=recommendationVectorIterator.next();
				int index=e.index();
				System.out.println("Vecotr.element.indxe:"+index);  //  test here  find the index is item id or not  ** test result : index is item
				if(!hasItem(userID,String.valueOf(index))){
					float value=(float) e.get();
					if(topItems.size()<recommendationsPerUser){
						//  here only set index
						topItems.add(new GenericRecommendedItem(index,value));
					}else if(value>topItems.peek().getValue()){
						topItems.add(new GenericRecommendedItem(index,value));
						topItems.poll();
					}
				}
			}
			List<RecommendedItem>recom=new ArrayList<RecommendedItem>(topItems.size());
			recom.addAll(topItems);
			Collections.sort(recom,ByValueRecommendedItemComparator.getInstance());
			context.write(key, new RecommendedItemsWritable(recom));		
		}
	
	public static boolean hasItem(int user,String item){  // to check whether the user has rate the item
		boolean flag=false;
		String items=map.get(user);
		if(items.contains(item)){
			flag=true;
		}
		return flag;
	}
}

最后一个reducer的编写也是费了一番功夫:基本思路:在Reducer的setup函数中读取SequenceFile的数据,这个数据是MR1的输出数据,用来排除用户已经评价过的项目。

其实在编写这些代码的时候 查了好多mahout的API,因为好多类都是在Mahout上面的,要了解它的用法才行,在最后一个Reducer中我也用了一个FastMap,这个类也是Mahout的,应该用他提供的一些类会运行的更加快吧。

最后说下算法吧:

在《Mahout in Action》中最初的算法是这样的:


用相似度矩阵点乘用户对项目的评价向量得到用户对项目的评分(其中的U3对对101项目的评分应该是2.5,我怀疑书上印错了);

但用代码实现的时候书上建议这样做:

这样做可以提高效率。





分享,快乐,成长


分享到:
评论

相关推荐

    hadoop mapreduce wordcount

    &lt;groupId&gt;org.apache.hadoop&lt;/groupId&gt; &lt;artifactId&gt;hadoop-common&lt;/artifactId&gt; &lt;version&gt;2.7.2&lt;/version&gt; &lt;/dependency&gt; &lt;dependency&gt; &lt;groupId&gt;org.apache.hadoop&lt;/groupId&gt; &lt;artifactId&gt;hadoop-client&lt;/...

    基于hadoop和协同过滤算法实现商品推荐系统源码(课程设计项目).zip

    基于hadoop和协同过滤算法实现商品推荐系统源码(课程设计项目).zip基于hadoop和协同过滤算法实现商品推荐系统源码(课程设计项目).zip基于hadoop和协同过滤算法实现商品推荐系统源码(课程设计项目).zip基于hadoop和...

    029070-99999-1901.txt

    &lt;groupId&gt;org.apache.hadoop&lt;/groupId&gt; &lt;artifactId&gt;hadoop-client&lt;/artifactId&gt; &lt;version&gt;2.7.3&lt;/version&gt; &lt;scope&gt;test&lt;/scope&gt; &lt;/dependency&gt; &lt;dependency&gt; &lt;groupId&gt;org.apache.hadoop&lt;/groupId&gt; ...

    基于hadoop和协同过滤算法实现商品推荐系统源码(毕设项目).zip

    基于hadoop和协同过滤算法实现商品推荐系统源码(毕设项目).zip基于hadoop和协同过滤算法实现商品推荐系统源码(毕设项目).zip基于hadoop和协同过滤算法实现商品推荐系统源码(毕设项目).zip基于hadoop和协同过滤算法...

    基于Hadoop的协同过滤视频推荐系统源码(课程设计).zip

    基于Hadoop的协同过滤视频推荐系统源码(课程设计).zip 基于Hadoop的协同过滤视频推荐系统源码(课程设计).zip 基于Hadoop的协同过滤视频推荐系统源码(课程设计).zip 基于Hadoop的协同过滤视频推荐系统源码(课程设计)....

    hadoop k-means算法实现(可直接命令行运行)

    +"&lt;input&gt; &lt;output&gt; &lt;centerFile&gt; &lt;tempFile&gt; &lt;iterTimes&gt; &lt;threshold&gt; &lt;K&gt; &lt;vNum&gt; &lt;numReduce&gt;\n" +"\t&lt;input&gt;:输入文件路径\n" +"\t&lt;output&gt;:输出文件路径\n" +"\t&lt;centerFile&gt;:初始中心路径\n" +"\t...

    Hadoop环境下的分布式协同过滤算法设计与实现

    以开源项目Hadoop为实验平台,论证传统协同过滤算法无法适应云平台;从相似度和预测偏好两方面,借鉴共词分析法,将传统协同过滤算法改进为适应Hadoop平台的分布式协同过滤算法;实现顺序组合式MapRe-duce协同过滤任务,并...

    Hadoop_in_Action

    PART 2 - Hadoop in Action CHAPTER 4 Writing basic MapReduce programs CHAPTER 5 Advanced MapReduce CHAPTER 6 Programming practices CHAPTER 7 Cookbook CHAPTER 8 Managing Hadoop PART 3 - Hadoop Gone Wild...

    hadoop-lzo-0.4.20.jar

    hadoop支持LZO压缩配置 将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/ core-site.xml增加配置支持LZO压缩 &lt;?xml version="1.0" encoding="UTF-8"?&gt; &lt;?xml-stylesheet type="text/xsl...

    Mahout in Action

    PART 2 CLUSTERING .............................................................115 7 ■ Introduction to clustering 117 8 ■ Representing data 130 9 ■ Clustering algorithms in Mahout 145 10 ■ ...

    Hadoop In Action2

    Hadoop In Action 中文第二版 卷二 rar

    hadoop命令

    -mv &lt;src&gt; &lt;dst&gt; //移动多个文件到目标目录 -cp &lt;src&gt; &lt;dst&gt; //复制多个文件到目标目录 -rm(r) //删除文件(夹) -put &lt;localsrc&gt; &lt;dst&gt; //本地文件复制到hdfs -copyFromLocal //同put -moveFromLocal //从本地文件...

    hadoop,spark,hbase,zookeeper,kafka配置文件

    hadoop,spark,hbase,zookeeper,kafka配置文件。 例如: &lt;?xml version="1.0" encoding="UTF-8"?&gt;... &lt;value&gt;file:/home/bigData/bigdata/hadoop/tmp&lt;/value&gt; &lt;/property&gt; &lt;/configuration&gt;

    基于Hadoop的协同过滤视频推荐系统+源代码+文档说明

    基于Hadoop的协同过滤视频推荐系统+源代码+文档说明 -------- 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载...

    hadoop-lzo-0.4.15.jar

    hadoop2 lzo 文件 ,编译好的64位 hadoop-lzo-0.4.15.jar 文件 ,在mac 系统下编译的,用法:解压后把hadoop-lzo-0.4.15.jar 放到你的hadoop 安装路径下的lib 下,把里面lib/Mac_OS_X-x86_64-64 下的所有文件 拷到 ...

    基于Hadoop和协同过滤(物品)的推荐程序+源代码+文档说明

    基于Hadoop和协同过滤(物品)的推荐程序+源代码+文档说明 -------- 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心...

    Hadoop.in.Practice.2nd.Edition

    Title: Hadoop in Practice, 2nd Edition Author: Alex Holmes Length: 512 pages Edition: 2 Language: English Publisher: Manning Publications Publication Date: 2014-10-12 ISBN-10: 1617292222 ISBN-13: ...

    Hadoop权威指南中文版(第二版)+Hadoop in Action

    Hadoop权威指南中文版(第二版)+Hadoop in Action(英文版) + pro Hadoop(英文版)

Global site tag (gtag.js) - Google Analytics