`
thecloud
  • 浏览: 880863 次
文章分类
社区版块
存档分类
最新评论

HDFS导出数据到HBase的ROWVALUE设置tricks

 
阅读更多

在做Hadoop的编程时,有时会用到HBase,常常涉及到把HDFS上面的数据导入到HBase中,在这个操作中,row的设置比较重要,如果几条记录的row值一样,同时列簇也一样的话,那么后面的数据就会覆盖前面的数据,比如这样:

比如现在有这样的一个操作:

我要把下面的数据导入HBase:

1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0
如果拿第一列作为row值的话,那么最终导入的数据只有5条记录的信息了,如何才能把这21条记录全部导入HBase呢

第一个想法,把文件偏移值作为一条记录的row,这样每条记录的row就都不同了,这样可以,但是当文件偏大的时候,这样存储就会存在问题,而且row值之间相差很大,如果可以做到row是按照1递增的话,那么就可以减少很多数据了。

第二个想法,再看回原始数据,可以发现如果我们是做用户推荐的话,那么数据很可能每条记录的长度都相差不大,如果这样的话,那么是否可以用记录的偏移值除以这条记录的长度,最后再四舍五入,那么得到的row值就基本按照1递增了。

具体代码如下:

package org.fansy.date1023;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class HDFSToHBase {
	
	/**
	 * import data from hdfs to hbase
	 */

	public static class MapperClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{  
        private Put put;
        private byte[] family,qulifier1,qulifier2,qulifier3;
		
		public void setup(Context context){
			family=Bytes.toBytes(context.getConfiguration().get("FAMILY"));
			qulifier1=Bytes.toBytes(context.getConfiguration().get("QULIFIER1"));
			qulifier2=Bytes.toBytes(context.getConfiguration().get("QULIFIER2"));
			qulifier3=Bytes.toBytes(context.getConfiguration().get("QULIFIER3"));
		}
		
		public void map(LongWritable key,Text line,Context context)throws IOException,InterruptedException{  
            String[] values=line.toString().split(",");  
            if(values.length!=3){ // if there are not three args,then return  
                return ;  
            }  
            //  set the row value
            int tempRow=Math.round(Float.parseFloat(key.toString())/(line.toString().length()));
            byte [] row=Bytes.toBytes(tempRow);
              byte [] qulifier1_value = Bytes.toBytes(values[0]);  
              byte [] qulifier2_value = Bytes.toBytes(values[1]); 
              byte [] qulifier3_value = Bytes.toBytes(values[2]); 
             put=new Put(row);  
             put.add(family,qulifier1,qulifier1_value);  //  user
             context.write(new ImmutableBytesWritable(row),put);  
             put=new Put(row);  
             put.add(family,qulifier2,qulifier2_value);  //  item
             context.write(new ImmutableBytesWritable(row),put);  
             put=new Put(row);  
             put.add(family,qulifier3,qulifier3_value);  //  item
             context.write(new ImmutableBytesWritable(row),put); 
        } 
	}
	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		
		//check if there exist the table then delete the data and alter the family
		if(args.length<3){
			System.out.println("Wrong number of arguments "+args.length);
			 System.out.println("Usage: <input> <tablename> <family> <qulifier1> <qulifier2> <qulifier3>");  
		     System.exit(-1);  
		}
		String tableName=args[1];
		String family=args[2];
		if(existTable(tableName)){
			alterTable(tableName);
		}
		createTable(tableName,family);
		// import data to hbase
		HDFSToHBase.runMyJob(args);
	}
	public static void runMyJob(String[] args) {
		Configuration conf = HBaseConfiguration.create();  
		try{
		    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
		    if(otherArgs.length != 6) {  
		      System.out.println("Wrong number of arguments: " + otherArgs.length);  
		      System.out.println("Usage: <input> <tablename> <family> <qulifier1> <qulifier2> <qulifier3>");  
		      System.exit(-1);  
		    }
		    
		    String family=otherArgs[2]; 
		    String qulifier1=otherArgs[3];
		    String qulifier2=otherArgs[4];
		    String qulifier3=otherArgs[5];
		    conf.set("FAMILY", family);
		    conf.set("QULIFIER1", qulifier1);
		    conf.set("QULIFIER2", qulifier2);
		    conf.set("QULIFIER3", qulifier3);
		    
		    Job job=new Job(conf,"import data to hbase");  
		    job.setJarByClass(HDFSToHBase.class);  
		    job.setMapperClass(MapperClass.class);  
		    job.setMapOutputKeyClass(ImmutableBytesWritable.class);  
		    job.setMapOutputValueClass(Put.class);  
		    TableMapReduceUtil.initTableReducerJob(otherArgs[1], null, job);  
		    job.setNumReduceTasks(0);  
		    FileInputFormat.setInputPaths(job, otherArgs[0]);  
		    System.exit(job.waitForCompletion(true) ? 0 : 1);
		 }catch(Exception e){
		   e.printStackTrace();
		 }
	}
	
	//  check whether the given table exists if and delete the data and create a new one else create a new one
	public static boolean existTable(String tableName) throws IOException{
		boolean flag=false;

		  Configuration conf = HBaseConfiguration.create();  

		 HBaseAdmin admin=new HBaseAdmin(conf);		
		 flag=admin.isTableAvailable(Bytes.toBytes(tableName));
		 admin.close();
		 return flag;
	}
	
	//  if the table exists then delete the data and alter the family
	public static void alterTable(String tableName ) throws IOException{
		System.out.println("table "+tableName+" exists ,delete it  ...");
		Configuration conf = HBaseConfiguration.create();  
		 HBaseAdmin admin=new HBaseAdmin(conf);
		admin.disableTable(tableName);
		admin.deleteTable(tableName);
		admin.close();
	}
	
	//  if the table does not exist then create the table
	public static void createTable( String tableName,String family) throws IOException{
		 System.out.println("create the new table :"+tableName+" ...");
		 Configuration conf = HBaseConfiguration.create();  
		 HBaseAdmin admin=new HBaseAdmin(conf);
		HTableDescriptor desc=new HTableDescriptor(Bytes.toBytes(tableName));
		HColumnDescriptor coldef=new HColumnDescriptor(Bytes.toBytes(family));
		desc.addFamily(coldef);
		admin.createTable(desc);
		admin.close();
	}

}
上面的代码同时包含了先判断输入的参数(即表名)是否已存在HBase中,如果存在则删除,否则新建一个表,最后把数据通过MR导入HBase.其中的44行就是第二种想法的实现。
导入到HBase中的数据如下:


疑问:(1)如果数据文件是多个的话,那么第二种想法就会导致有些数据被覆盖。多个数据的话是否可以用一个MR 把所有的数据先放入一个文件然后在调用上面的代码?

(2)是否有其他的比较好的做法呢?




分享,成长,快乐





分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics