`
thecloud
  • 浏览: 882980 次
文章分类
社区版块
存档分类
最新评论

Hadoop Combiner 操作

 
阅读更多

近期看了一本书:Data-intensive Text Processing with MapReduce,是讲如何设计MR程序的,看到一个例子是Combiner的设计模式,然后就动手实现了下。具体问题如下:

现有输入数据如下:

one	3.9
one	4.0
one	3.8
two	44
two	44
two	44
three	9898
four	2323
four	2323
five	2323
six	23
six	2323
four	232
five	2323

第一列代表用户,第二列代表用户在一个网站上所停留的时间,现在想求每个用户在这个网站的平均停留时间。如果不用combine操作的话,那么其MR伪代码如下(复制书上的内容):

class Mapper
method Map(string t, integer r)
Emit(string t, integer r)
class Reducer
method Reduce(string t, integers [r1 , r2 , . . .])
sum ← 0
cnt ← 0
for all integer r ∈ integers [r1 , r2 , . . .] do
sum ← sum + r
cnt ← cnt + 1
ravg ← sum/cnt
Emit(string t, integer ravg )
如果要加combine怎么操作呢?Combiner和Reducer一样么(求最大气温的例子或许是一样的,但这里却不是,而且现实中的很多例子都不是一样的),如果一样的话那么就会变成下面的错误操作了:

Mean(1, 2, 3, 4, 5) = Mean(Mean(1, 2), Mean(3, 4, 5))
正确的伪代码如下(书上摘录):

class Mapper
method Map(string t, integer r)
Emit(string t, pair (r, 1))
class Combiner
method Combine(string t, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
sum ← 0
cnt ← 0
for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
sum ← sum + s
cnt ← cnt + c
Emit(string t, pair (sum, cnt))
class Reducer
method Reduce(string t, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
sum ← 0
cnt ← 0
for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
sum ← sum + s
cnt ← cnt + c
ravg ← sum/cnt
Emit(string t, integer ravg )
由于Combiner的输入和输出格式要一样,即Combiner的输入要和Mapper的输出格式一样,Combiner的输出要和Reducer的输入格式一样。所以上面有pairs。参考上面的伪代码编写的代码如下:

Driver:

package org.fansy.date922;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class AverageDriver3 {
	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub	
		Configuration conf1 = new Configuration();	
	    String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: AverageDriver<in> <out>");
	      System.exit(2);
	    }
	    Job job1 = new Job(conf1, "AverageDriver  job ");
	    job1.setInputFormatClass(KeyValueTextInputFormat.class);    
	    job1.setNumReduceTasks(1);
	    job1.setJarByClass(AverageDriver3.class); 
	    job1.setMapperClass(AverageM2.class);
	    job1.setMapOutputKeyClass(Text.class);
		job1.setMapOutputValueClass(TextPair.class);
		job1.setCombinerClass(AverageC3.class);
	    job1.setReducerClass(AverageR2.class);
	    job1.setOutputKeyClass(Text.class);
	    job1.setOutputValueClass(DoubleWritable.class);
	    KeyValueTextInputFormat.addInputPath(job1, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job1, new Path(otherArgs[1]));    
	    if(!job1.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }  
		System.out.println("************************");
	}
}
Mapper:
package org.fansy.date922;

import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AverageM3 extends Mapper<Text,Text,Text,TextPair>{

//	private Text newkey=new Text();
	private TextPair newvalue=new TextPair();
	private DoubleWritable r=new DoubleWritable();
	private IntWritable number=new IntWritable(1);
	public  void map(Text key,Text value,Context context)throws IOException,InterruptedException {
		// TODO Auto-generated method stub
		System.out.println(key.toString());
		double shuzhi=Double.parseDouble(value.toString());
		r.set(shuzhi);
		newvalue.set(r, number);
		context.write(key, newvalue);
	}
}
Combiner:

package org.fansy.date922;

import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class AverageC3 extends Reducer<Text,TextPair,Text,TextPair>{
	private DoubleWritable newvalued=new DoubleWritable();
	private IntWritable newvaluei=new IntWritable();
	private TextPair newvalue=new TextPair();
	public  void reduce(Text key,Iterable<TextPair> values,Context context) throws IOException,InterruptedException{
		// TODO Auto-generated method stub
		double sum= 0.0;
		int num=0;
		for(TextPair val:values){
			sum+=val.getFirst().get();
			num+=val.getSecond().get();
		}
		newvalued.set(sum);
		newvaluei.set(num);
		newvalue.set(newvalued,newvaluei);
		context.write(key, newvalue);
	}
}

Reducer:
package org.fansy.date922;

import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class AverageR3 extends Reducer<Text,TextPair,Text,DoubleWritable>{
	private DoubleWritable newvalue=new DoubleWritable();
	public  void reduce(Text key,Iterable<TextPair> values,Context context) throws IOException,InterruptedException{
		// TODO Auto-generated method stub
		double sum= 0.0;
		int num=0;
		for(TextPair val:values){
			sum+=val.getFirst().get();
			num+=val.getSecond().get();
		}
		double aver=sum/num;
		newvalue.set(aver);
		context.write(key, newvalue);
	}
}

TextPair:

package org.fansy.date922;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair> {
	private DoubleWritable first;
	private IntWritable second;
	public TextPair(){
		set(new DoubleWritable(),new IntWritable());
	}
	public  void set(DoubleWritable longWritable, IntWritable intWritable) {
		// TODO Auto-generated method stub
		this.first=longWritable;
		this.second=intWritable;
	}
	public DoubleWritable getFirst(){
		return first;
	}
	public IntWritable getSecond(){
		return second;
	}
	@Override
	public void readFields(DataInput arg0) throws IOException {
		// TODO Auto-generated method stub
		first.readFields(arg0);
		second.readFields(arg0);
	}
	@Override
	public void write(DataOutput arg0) throws IOException {
		// TODO Auto-generated method stub
		first.write(arg0);
		second.write(arg0);
	}
	@Override
	public int compareTo(TextPair o) {
		// TODO Auto-generated method stub
		int cmp=first.compareTo(o.first);
		if(cmp!=0){
			return cmp;
		}
		return second.compareTo(o.second);
	}	
}

查看终端中的显示也可以看出的确是有combine操作的:

12/09/22 15:55:45 INFO mapred.JobClient: Job complete: job_local_0001
12/09/22 15:55:45 INFO mapred.JobClient: Counters: 22
12/09/22 15:55:45 INFO mapred.JobClient:   File Output Format Counters 
12/09/22 15:55:45 INFO mapred.JobClient:     Bytes Written=65
12/09/22 15:55:45 INFO mapred.JobClient:   FileSystemCounters
12/09/22 15:55:45 INFO mapred.JobClient:     FILE_BYTES_READ=466
12/09/22 15:55:45 INFO mapred.JobClient:     HDFS_BYTES_READ=244
12/09/22 15:55:45 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=82758
12/09/22 15:55:45 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=65
12/09/22 15:55:45 INFO mapred.JobClient:   File Input Format Counters 
12/09/22 15:55:45 INFO mapred.JobClient:     Bytes Read=122
12/09/22 15:55:45 INFO mapred.JobClient:   Map-Reduce Framework
12/09/22 15:55:45 INFO mapred.JobClient:     Map output materialized bytes=118
12/09/22 15:55:45 INFO mapred.JobClient:     Map input records=14
12/09/22 15:55:45 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/09/22 15:55:45 INFO mapred.JobClient:     Spilled Records=12
12/09/22 15:55:45 INFO mapred.JobClient:     Map output bytes=231
12/09/22 15:55:45 INFO mapred.JobClient:     Total committed heap usage (bytes)=301727744
12/09/22 15:55:45 INFO mapred.JobClient:     CPU time spent (ms)=0
12/09/22 15:55:45 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/09/22 15:55:45 INFO mapred.JobClient:     Combine input records=14
12/09/22 15:55:45 INFO mapred.JobClient:     Reduce input records=6
12/09/22 15:55:45 INFO mapred.JobClient:     Reduce input groups=6
12/09/22 15:55:45 INFO mapred.JobClient:     Combine output records=6
12/09/22 15:55:45 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
12/09/22 15:55:45 INFO mapred.JobClient:     Reduce output records=6
12/09/22 15:55:45 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
12/09/22 15:55:45 INFO mapred.JobClient:     Map output records=14
************************
那本书上面其实最后还有提到一个 in-Mapper Combining的一个编程,但是看的不是很明白,伪代码如下:

class Mapper
method Initialize
S ← new AssociativeArray
C ← new AssociativeArray
method Map(string t, integer r)
S{t} ← S{t} + r
C{t} ← C{t} + 1
method Close
for all term t ∈ S do
Emit(term t, pair (S{t}, C{t}))

继续学习 MR编程中。。

分享到:
评论

相关推荐

    Hadoop Combiner使用方法详解

    主要介绍了 Hadoop Combiner使用方法详解的相关资料,希望通过本文能帮助到大家让大家理解掌握这部分内容,需要的朋友可以参考下

    Hadoop实战中文版

    3.1 HDFS 文件操作 3.1.1 基本文件命令 3.1.2 编程读写HDFS 3.2 剖析MapReduce 程序 3.2.1 Hadoop数据类型 3.2.2 Mapper 3.2.3 Reducer 3.2.4 Partitioner:重定向Mapper输出 3.2.5 Combiner:本地reduce ...

    hadoop面试题

    面试hadoop可能被问到的问题你能回答出几个 ? 1、hadoop运行的原理? 2、mapreduce的原理? 3、HDFS存储的机制?...4、举一个简单的例子说明mapreduce是怎么来运行的 ?...5、面试的人给你出一些...6、hadoop中Combiner的作用?

    Hadoop实战中文版.PDF

    30第3章 Hadoop组件 313.1 HDFS文件操作 313.1.1 基本文件命令 323.1.2 编程读写HDFS 353.2 剖析MapReduce程序 373.2.1 Hadoop数据类型 393.2.2 Mapper 403.2.3 Reducer 413.2.4 Partitioner:...

    Hadoop 2.2.0 词频统计例子

    基于Hadoop2.2.0的词频统计的例子。包含一个大概十万以上单词的测试数据文件。重写了Partitioner和Combiner,供学习之用。 访问博文 http://blog.csdn.net/zythy/article/details/17852579 以查看详细讲解。

    hadoop 权威指南(第三版)英文版

    Combiner Functions Running a Distributed MapReduce Job Hadoop Streaming Ruby Python iii Hadoop Pipes Compiling and Running 3. The Hadoop Distributed Filesystem . . . . . . . . . . . . . . . . . . . ...

    Hadoop实战(第2版)

     《Hadoop硬实战》包含: ·Hadoop和MapReduce的基本概念 ·85个实战和测试技术 ·真实的场景,实用的解决方案 ·如何整合MapReduce和R前言 致谢关于本书 第1 部分 背景和基本原理1 跳跃中的Hadoop1.1 什么...

    Hadoop实战(陆嘉恒)译

    Hadoop组件3.1 HDFS 文件操作3.1.1 基本文件命令3.1.2 编程读写HDFS3.2 剖析MapReduce 程序3.2.1 Hadoop数据类型3.2.2 Mapper3.2.3 Reducer3.2.4 Partitioner:重定向Mapper输出3.2.5 Combiner:本地reduce3.2.6 ...

    Hadoop实战

    313.1 HDFS文件操作 313.1.1 基本文件命令 323.1.2 编程读写HDFS 353.2 剖析MapReduce程序 373.2.1 Hadoop数据类型 393.2.2 Mapper 403.2.3 Reducer 413.2.4 Partitioner:重定向Mapper输出 413.2.5 Combiner:本地...

    hadoop_the_definitive_guide_3nd_edition.pdf

    Combiner Functions Running a Distributed MapReduce Job Hadoop Streaming Ruby Python import import import import import import org.apache.hadoop.fs.Path; org.apache.hadoop.io.IntWritable; org.apache....

    Hadoop实训求最高温度和最低温度的数据集

    (1)统计全球每年的最高气温和最低气温。 (2)MapReduce输出结果包含年份、最高气温、...(4)结合Combiner和自定义数据类型完成全球每年最高气温和最低气温的统计。 (5)应用ToolRunner的使用和Eclipse提交MapReduce任务。

    提高hadoop的mapreduce job效率笔记

    修改mapper和reducer数量,如何使用combiner,什么时候该选择哪个writeable等。资料里很详细说明了。

    Hadoop实例:二度人脉与好友推荐

    Hadoop实例:二度人脉与好友推荐,供大家一起共同分享学习。

    Hadoop大数据实训,求最高温度最低温度实验报告

    (1)统计全球每年的最高气温和最低气温。 (2)MapReduce输出结果包含年份、最高气温、...(4)结合Combiner和自定义数据类型完成全球每年最高气温和最低气温的统计。 (5)应用ToolRunner的使用和Eclipse提交MapReduce任务。

    hadoop_the_definitive_guide_3nd_edition

    Combiner Functions 34 Running a Distributed MapReduce Job 37 Hadoop Streaming 37 Ruby 37 Python 40 iii www.it-ebooks.info Hadoop Pipes 41 Compiling and Running 42 3. The Hadoop Distributed Filesystem ...

    JAVA使用Apache Hadoop实现大规模数据处理.txt

    最后,在`main`方法中,我们创建了一个Hadoop作业对象,并设置了相关的参数,包括输入输出路径、Mapper、Combiner和Reducer类等。通过运行这个程序,我们可以将大量的文本数据切分成单词并进行计数,从而实现了...

    MapReduce单词统计 hadoop集群

    Reduce阶段 最终处理,进行排序等自定义操作 每个阶段都会打印对应的数据处理情况,在Map阶段打印每一次读取切割之后的每个单词内容;在Combiner阶段打印单个分片里的单词次数统计结果;在Reduce阶段打印单词出现...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    技术点48 使用 combiner 技术点49 超炫的使用比较器的快速排序 6.4.4 减轻倾斜 技术点50 收集倾斜数据 技术点51 减轻reducer 阶段倾斜 6.4.5 在MapReduce 中优化用户的Java 代码 6.4.6 数据序列化 6.5...

    Hadoop 培训课程(4)MapReduce_2

    Hadoop 培训课程(4)MapReduce_2 标准和自定义计数器* Combiner和Partitioner编程** 自定义排序和分组编程** 常见的MapReduce算法** ---------------------------加深拓展---------------------- 常见大数据处理方法*

Global site tag (gtag.js) - Google Analytics