以前在学习HBase的时候,也曾想过是否可以在一个MR中同时写入两个表,但以前在网上找的时候都找不到比较相关的答案,这两天又重新找了下,居然有类似的实现,然后就自己参考着写了下,基本可以运行(本文参考:http://www.wildnove.com/2011/07/19/tutorial-hadoop-and-hbase-multitableoutputformat/),下面就详细说下思路:
原始数据如下:
fansy,22,blog.csdu.net/fansy1990
tom,25,blog.csdu.net/tom1987
kate,23,blog.csdu.net/kate1989
jake,20,blog.csdu.net/jake1992
john,35,blog.csdu.net/john1977
ben,30,blog.csdu.net/ben1982
第一列代表name,dierlie代表age,disanlie代表webPage;要做的事情是把name和age存入表1,name和webPage存入表2;下面贴代码:
ImportToHB.java:
package org.fansy.multipletables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* write to multiple tables
* @author fansy
*
*/
public class ImportToHB extends Configured implements Tool{
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new ImportToHB(), args);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
if(args.length!=7){
System.err.println("wrong args length:"+args.length);
// System.out.println();
System.out.println("Usage: <input> <table1> <table1-fam> <table1-qua> "+
"<table2> <table2-fam> <table2-qua>");
System.exit(-1);
}
Configuration conf=new Configuration();
conf.set("TABLE1", args[1]);
conf.set("T1-FAM", args[2]);
conf.set("T1-QUA", args[3]);
conf.set("TABLE2", args[4]);
conf.set("T2-FAM", args[5]);
conf.set("T2-QUA", args[6]);
Job job = new Job(conf);
job.setJarByClass(ImportToHB.class);
job.setMapperClass(MapperHB.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Writable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(MultiTableOutputFormat.class);
job.setNumReduceTasks(0);
if(job.waitForCompletion(true)){
return 0;
}
return -1;
}
}
MapperHB.java:
package org.fansy.multipletables;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperHB extends Mapper<LongWritable,Text,ImmutableBytesWritable,Writable>{
private byte[] table1;
private byte[] table2;
private byte[] t1_fam;
private byte[] t1_qua;
private byte[] t2_fam;
private byte[] t2_qua;
public void setup(Context context){
table1=Bytes.toBytes(context.getConfiguration().get("TABLE1"));
table2=Bytes.toBytes(context.getConfiguration().get("TABLE2"));
t1_fam=Bytes.toBytes(context.getConfiguration().get("T1-FAM"));
t1_qua=Bytes.toBytes(context.getConfiguration().get("T1-QUA"));
t2_fam=Bytes.toBytes(context.getConfiguration().get("T2-FAM"));
t2_qua=Bytes.toBytes(context.getConfiguration().get("T2-QUA"));
}
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String[] info=value.toString().split(",");
if(info.length!=3){
return;
}
String name=info[0];
String age=info[1];
String webPage=info[2];
// write to the first table row = name+age, value=age;
ImmutableBytesWritable putTable = new ImmutableBytesWritable(table1);
Put put = new Put(Bytes.toBytes(name+","+age));
put.add(t1_fam,t1_qua,Bytes.toBytes(age));
context.write(putTable, put);
// write to the second table row=name+webPage,value=webPage
putTable = new ImmutableBytesWritable(table2);
put = new Put(Bytes.toBytes(name+","+webPage));
put.add(t2_fam,t2_qua,Bytes.toBytes(webPage));
context.write(putTable, put);
}
}
上面的代码只用了一个Mapper,同时写入两个HBase表中。这里的要点是设置Mapper的输出key和value的类型,按照上面的代码类型为:ImmutableBytesWritable和Writable,而且在job的声明处要设置输出类型:job.setOutputFormatClass(MultiTableOutputFormat.class);
如何运行上面的程序?
(1)在HBase中创建两张表:
create 'table1','info'
create 'table2','info'
(2)ImportToHB的输入参数如下:
hdfs://master:9000/user/fansy/input/info.dat table1 info age table2 info webPage
(3)直接在eclipse中运行
运行后在HBase中察看输出的数据如下:
分享,快乐,成长
转载请注明出处:http://blog.csdn.net/fansy1990
分享到:
相关推荐
从HDFS中读文件,用groupby进行sort,然后写入Hbase中
博客配套文件,详细演示了如何通过MR程序的方式bulkload数据到hbase,代码可直接用于生产环境。
创建一个要写入的hbase表:a)启动hbase shell $ hbase shell b)创建表create'/ user / chanumolu / sensor',{NAME =>'data'},{NAME =>'alert'},{ NAME =>'stats'} #执行: 第1步:MVN全新安装 步骤2:启动流...
java操作Hbase之从Hbase中读取数据写入hdfs中源码,附带全部所需jar包,欢迎下载学习。
1. 请用java集合的代码描述HBase的表结构 2. 请简述HBase中数据写入最后导致Region分裂的全过程 3. 如果设计一个笔记的表,表中要求有笔记的属性和笔记的内容,怎么做 4. HBase部署时如何指定多个zookeeper 5. HBase...
hbase表结构设计,新建表,查询表语句,删除表数据,删除表的例子。
HBase 是一个开源的、分布式的、版本化的 NoSQL 数据库(也即非关系型数据库),它利用 Hadoop 分布式文件系统(Hadoop Distributed File System,HDFS)提供分布式数据存储。与传统的关系型数据库类似,HBase 也以...
HBase 是一个开源的、分布式的、版本化的 NoSQL 数据库(也即非关系型数据库),它利用 Hadoop 分布式文件系统(Hadoop Distributed File System,HDFS)提供分布式数据存储。与传统的关系型数据库类似,HBase 也以...
Hbase笔记 —— 利用JavaAPI的方式操作Hbase数据库(往hbase的表中批量插入数据)
最近看了hbase的源码根据源码写了一些scala调动hbase表的API,话不多说直接上代码!...并且在scala,maven项目中,还有创建一个resources包(这个网上多的是)主要是放core-site,xml和hdfs-site.xml以及hbase-site.xml
一款强大的HBase表管理系统,目前系统集成的功能有,命名空间管理,表管理,列簇管理,标签机制,快照管理,以及一些常见的统计指标展示等,另外,系统还内置了HBaseSQL的功能,欢迎大家下载。 一款强大的HBase表...
java 利用 sping-data-hadoop HbaseTemplate 操作hbase find get execute 等方法 可以直接运行
批量执行hbase shell 命令 #!/bin/bash source /etc/profile ...以上这篇hbase-shell批量命令执行脚本的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持软件开发网。
HBase开启审计日志
删除Hbase中某个表的一列值 命令 java -jar deleteOneColumn.jar(这个文件的路径) '表名' '列簇名' '列名'
简单的介绍了habse存储数据的样子和简单的hbase shell 使用
HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。 HBase是Google Bigtable的开源实现,类似Google Bigtable...
使用spark读取hbase中的数据,并插入到mysql中
hbase从HBase表中导入数据到Hbase表中将fruit表中的一部分数据,通过MR迁入到fruit_mr表中从HDFS中导入数据到Hbase表中根据HDFS中的数据导入到fruit_hdfs表中
python3 使用 thrift 操作hbase 安装hbase-thirft后有一个Hbase报错 使用这个修改完成的Hbase类替换掉原来的Hbase类问题全部解决 主要是因为python版本兼容性带来的问题