导航:首页 > 编程语言 > mapreducehbase编程

mapreducehbase编程

发布时间:2022-08-24 22:30:06

㈠ 如何用MapRece程序操作hbase

先看一个标准的hbase作为数据读取源和输出目标的样例:
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class, Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true);1234567

和普通的mr程序不同的是,不再用job.setMapperClass()和job.setRecerClass()来设置mapper和recer,而用TableMapReceUtil的initTableMapperJob和initTableRecerJob方法来实现。此处的TableMapReceUtil是hadoop.hbase.maprece包中的,而不是hadoop.hbase.mapred包中的。
数据输入源是hbase的inputTable表,执行mapper.class进行map过程,输出的key/value类型是 ImmutableBytesWritable和Put类型,最后一个参数是作业对象。需要指出的是需要声明一个扫描读入对象scan,进行表扫描读取数据用,其中scan可以配置参数。
数据输出目标是hbase的outputTable表,输出执行的rece过程是recer.class类,操作的作业目标是job。与map比缺少输出类型的标注,因为他们不是必要的,看过源代码就知道maprece的TableRecordWriter中write(key,value) 方法中,key值是没有用到的,value只能是Put或者Delete两种类型,write方法会自行判断并不用用户指明。
mapper类从hbase读取数据,所以输入的
public class mapper extends TableMapper<KEYOUT, VALUEOUT> {
public void map(Writable key, Writable value, Context context)
throws IOException, InterruptedException {
//mapper逻辑
context.write(key, value);
}
}1234567

mapper继承的是TableMapper类,后边跟的两个泛型参数指定mapper输出的数据类型,该类型必须继承自Writable类,例如可能用到的put和delete就可以。需要注意的是要和initTableMapperJob 方法指定的数据类型一致。该过程会自动从指定hbase表内一行一行读取数据进行处理。
recer类将数据写入hbase,所以输出的
public class recer extends TableRecer<KEYIN, VALUEIN, KEYOUT> {
public void rece(Text key, Iterable<VALUEIN> values, Context context)
throws IOException, InterruptedException {
//recer逻辑
context.write(null, put or delete);
}
}1234567

recer继承的是TableRecer类,后边指定三个泛型参数,前两个必须对应map过程的输出key/value类型,第三个是 The type of the output key,write的时候可以把key写成IntWritable什么的都行,它是不必要的。这样recer输出的数据会自动插入outputTable指定的表内。
TableMapper和TableRecer的本质就是为了简化一下书写代码,因为传入的4个泛型参数里都会有固定的参数类型,所以是Mapper和Recer的简化版本,本质他们没有任何区别。源码如下:
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}

public abstract class TableRecer<KEYIN, VALUEIN, KEYOUT>
extends Recer<KEYIN, VALUEIN, KEYOUT, Writable> {
}1234567

封装了一层确实方便多了,但也多了很多局限性,就不能在map里写hbase吗?
我他么试了一下午,约5个小时,就想在map里读hdfs写hbase,莫名其妙的各种问题,逻辑上应该没有错,跟着别人的文章做的。最后还是通过IdentityTableRecer这个类实现了,what's a fucking afternoon!
官方对IdentityTableRecer的说明是:Convenience class that simply writes all values (which must be Put or Delete instances) passed to it out to the configured HBase table.
这是一个工具类,将map输出的value(只能是Put或Delete)pass给HBase。看例子:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.maprece.TableMapReceUtil;
import org.apache.hadoop.hbase.maprece.IdentityTableRecer;

public class WordCount
{
public static class TokenizerMapper
extends Mapper<Object, Text, Text, Put>
{
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
Put putrow = new Put(word.toString().getBytes());
putrow.add("info".getBytes(), "name".getBytes(),"iamvalue".getBytes());
context.write(word, putrow);
}
}
}

public static void main(String[] args) throws Exception
{
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "hdfs to hbase");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Put.class);//important
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
TableMapReceUtil.initTableRecerJob("test", IdentityTableRecer.class, job);
job.setNumReceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

㈡ 怎样实现用MapRece读取HBase中历史版本的数据并传到HDFS

WordCountHbaseReaderMapper类继承了TableMapper< Text,Text>抽象类,TableMapper类专门用于完成MapRece中Map过程与Hbase表之间的操作。此时的map(ImmutableBytesWritable key,Result value,Context context)方法,第一个参数key为Hbase表的rowkey主键,第二个参数value为key主键对应的记录集合,此处的map核心实现是遍历key主键对应的记录集合value,将其组合成一条记录通过contentx.write(key,value)填充到< key,value>键值对中。
详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java
public static class WordCountHbaseReaderMapper extends
TableMapper<Text,Text>{

@Override
protected void map(ImmutableBytesWritable key,Result value,Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer("");
for(Entry<byte[],byte[]> entry:value.getFamilyMap("content".getBytes()).entrySet()){
String str = new String(entry.getValue());
//将字节数组转换为String类型
if(str != null){
sb.append(new String(entry.getKey()));
sb.append(":");
sb.append(str);
}
context.write(new Text(key.get()), new Text(new String(sb)));
}
}
}

3、 Recer函数实现
此处的WordCountHbaseReaderRece实现了直接输出Map输出的< key,value>键值对,没有对其做任何处理。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java
public static class WordCountHbaseReaderRece extends Recer<Text,Text,Text,Text>{
private Text result = new Text();
@Override
protected void rece(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
for(Text val:values){
result.set(val);
context.write(key, result);
}
}
}

4、 驱动函数实现
与WordCount的驱动类不同,在Job配置的时候没有配置job.setMapperClass(),而是用以下方法执行Mapper类: TableMapReceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);
该方法指明了在执行job的Map过程时,数据输入源是hbase的tablename表,通过扫描读入对象scan对表进行全表扫描,为Map过程提供数据源输入,通过WordCountHbaseReaderMapper.class执行Map过程,Map过程的输出key/value类型是 Text.class与Text.class,最后一个参数是作业对象。特别注意:这里声明的是一个最简单的扫描读入对象scan,进行表扫描读取数据,其中scan可以配置参数,这里为了例子简单不再详述,用户可自行尝试。
详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java
public static void main(String[] args) throws Exception {
String tablename = "wordcount";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "Master");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 1) {
System.err.println("Usage: WordCountHbaseReader <out>");
System.exit(2);
}
Job job = new Job(conf, "WordCountHbaseReader");
job.setJarByClass(WordCountHbaseReader.class);
//设置任务数据的输出路径;
FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
job.setRecerClass(WordCountHbaseReaderRece.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);
//调用job.waitForCompletion(true) 执行任务,执行成功后退出;
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

5、部署运行
1)启动Hadoop集群和Hbase服务
[hadoop@K-Master ~]$ start-dfs.sh #启动hadoop HDFS文件管理系统
[hadoop@K-Master ~]$ start-mapred.sh #启动hadoop MapRece分布式计算服务
[hadoop@K-Master ~]$ start-hbase.sh #启动Hbase
[hadoop@K-Master ~]$ jps #查看进程
22003 HMaster
10611 SecondaryNameNode
22226 Jps
21938 HQuorumPeer
10709 JobTracker
22154 HRegionServer
20277 Main
10432 NameNode

㈢ maprece和hbase的关系,哪些是正确的

MapRece与HBase没有关系:

MapRece:

MapRece是一种编程模型,用于大规模数据集的并行运算。概念"Map"和"Rece",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

(3)maprecehbase编程扩展阅读:

MapRece集群中使用了大量的低端服务器。因此,节点硬件故障和软件错误是常态。因此,设计良好、容错性高的并行计算系统不会因为节点故障而影响计算服务的质量。

任何节点的故障不应导致结果的不一致或不确定性;当任何节点失败时,其他节点应该能够无缝地接管失败节点的计算任务。当故障节点被恢复时,它应该能够自动无缝地加入集群,而不需要管理员手动配置系统。

MapRece并行计算软件框架使用了多种有效的错误检测和恢复机制,如节点自动重启技术,使集群和计算框架对节点失效具有鲁棒性,能够有效地处理失效节点的检测和恢复。

㈣ 在调试MapRece与HBase集成的程序时出现如下错误,各位前辈能否赐教,帮忙解答一下

看报错应该是缺少zookeeper依赖jar包,看一下hadoop使用的zookeeper版本和hbase使用的是否版本一致。或者hbase使用的是不是内置的zookeeper

㈤ 如何快速地编写和运行一个属于自己的MapRece例子程序

大数据的时代, 到处张嘴闭嘴都是Hadoop, MapRece, 不跟上时代怎么行? 可是对一个hadoop的新手, 写一个属于自己的MapRece程序还是小有点难度的, 需要建立一个maven项目, 还要搞清楚各种库的依赖, 再加上编译运行, 基本上头大两圈了吧。 这也使得很多只是想简单了解一下MapRece的人望而却步。
本文会教你如何用最快最简单的方法编写和运行一个属于自己的MapRece程序, let's go!
首先有两个前提:
1. 有一个已经可以运行的hadoop 集群(也可以是伪分布系统), 上面的hdfs和maprece工作正常 (这个真的是最基本的了, 不再累述, 不会的请参考 http://hadoop.apache.org/docs/current/)
2. 集群上安装了JDK (编译运行时会用到)
正式开始
1. 首先登入hadoop 集群里面的一个节点, 创建一个java源文件, 偷懒起见, 基本盗用官方的word count (因为本文的目的是教会你如何快编写和运行一个MapRece程序, 而不是如何写好一个功能齐全的MapRece程序)
内容如下:
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class myword {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void rece(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println('Usage: wordcount <in> <out>');
System.exit(2);
}
Job job = new Job(conf, 'word count');
job.setJarByClass(myword.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

与官方版本相比, 主要做了两处修改
1) 为了简单起见,去掉了开头的 package org.apache.hadoop.examples;
2) 将类名从 WordCount 改为 myword, 以体现是我们自己的工作成果 :)
2. 拿到hadoop 运行的class path, 主要为编译所用
运行命令
hadoop classpath

保存打出的结果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:
/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/*:/usr/lib/gphd/hadoop/.//*:/usr/lib/gphd/hadoop-hdfs/./:/usr/lib/gphd/hadoop-hdfs/lib/*:/usr/lib/gphd/hadoop-hdfs/.//*:/usr/lib/gphd/hadoop-yarn/lib/*:/usr/lib/gphd/hadoop-yarn/.//*:/usr/lib/gphd/hadoop-maprece/lib/*:/usr/lib/gphd/hadoop-maprece/.//*::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-core.jar:/usr/lib/gphd/pxf/pxf-api.jar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexd.jar::/usr/lib/gphd/zookeeper/zookeeper.jar:/usr/lib/gphd/hbase/lib/hbase-common.jar:/usr/lib/gphd/hbase/lib/hbase-protocol.jar:/usr/lib/gphd/hbase/lib/hbase-client.jar:/usr/lib/gphd/hbase/lib/hbase-thrift.jar:/usr/lib/gphd/hbase/lib/htrace-core-2.01.jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-service.jar:/usr/lib/gphd/hive/lib/libthrift-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-metastore.jar:/usr/lib/gphd/hive/lib/libfb303-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-common.jar:/usr/lib/gphd/hive/lib/hive-exec.jar:/usr/lib/gphd/hive/lib/postgresql-jdbc.jar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/*:

3. 编译
运行命令
javac -classpath xxx ./myword.java

xxx部分就是上一步里面取到的class path
运行完此命令后, 当前目录下会生成一些.class 文件, 例如:
myword.class myword$IntSumRecer.class myword$TokenizerMapper.class
4. 将class文件打包成.jar文件
运行命令
jar -cvf myword.jar ./*.class

至此, 目标jar 文件成功生成
5. 准备一些文本文件, 上传到hdfs, 以做word count的input
例子:
随意创建一些文本文件, 保存到mapred_test 文件夹
运行命令
hadoop fs -put ./mapred_test/

确保此文件夹成功上传到hdfs 当前用户根目录下
6. 运行我们的程序
运行命令
hadoop jar ./myword.jar myword mapred_test output

顺利的话, 此命令会正常进行, 一个MapRece job 会开始工作, 输出的结果会保存在 hdfs 当前用户根目录下的output 文件夹里面。
至此大功告成!
如果还需要更多的功能, 我们可以修改前面的源文件以达到一个真正有用的MapRece job。
但是原理大同小异, 练手的话, 基本够了。
一个抛砖引玉的简单例子, 欢迎板砖。
转载

㈥ 如何执行hbase 的maprece job

package test;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;

public class Htable {

/**
* @param args
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
Configuration hbaseConf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(hbaseConf);
HTableDescriptor htableDescriptor = new HTableDescriptor("table"
.getBytes()); //set the name of table
htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters
admin.createTable(htableDescriptor); //create a table
HTable table = new HTable(hbaseConf, "table"); //get instance of table.
for (int i = 0; i < 3; i++) { //for is number of rows
Put putRow = new Put(("row" + i).getBytes()); //the ith row
putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1"
.getBytes()); //set the name of column and value.
putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2"
.getBytes());
putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3"
.getBytes());
table.put(putRow);
}
for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters
for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result
String column = new String(entry.getKey());
String value = new String(entry.getValue());
System.out.println(column+","+value);
}
}
admin.disableTable("table".getBytes()); //disable the table
admin.deleteTable("table".getBytes()); //drop the tbale
}
}

㈦ 如何在Maprece加载多个Hbase表

如果你自己用“调”api,来读写hbase的话,我觉得具体考虑的话是任务能否最终实现的问题了,毕竟maprece所做的工作很多,它自己的master,zookeeper,hbase的master之间的通信,计算任务的rece和mapping,细节太多,考虑到maprece通常处理的数

㈧ Hadoop和MapRece究竟分别是做什么用的

Hadoop是用来开发分布式程序的架构,是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。

MapRece是用来做大规模并行数据处理的数据模型。方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。


(8)maprecehbase编程扩展阅读

Hadoop是一个能够让用户轻松架构和使用的分布式计算平台。用户可以轻松地在Hadoop上开发和运行处理海量数据的应用程序。主要有以下几个优点 :

1、高可靠性。Hadoop按位存储和处理数据的能力值得人们信赖 。

2、高扩展性。Hadoop是在可用的计算机集簇间分配数据并完成计算任务的,这些集簇可以方便地扩展到数以千计的节点中 。

3、高效性。Hadoop能够在节点之间动态地移动数据,并保证各个节点的动态平衡,因此处理速度非常快 。

4、高容错性。Hadoop能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配。

5、低成本。与一体机、商用数据仓库以及QlikView、Yonghong Z-Suite等数据集市相比,hadoop是开源的,项目的软件成本因此会大大降低。


㈨ maprece怎么导入hbace

1、先看一个标准的hbase作为数据读取源和输出源的样例:

[java] view plain在CODE上查看代码片派生到我的代码片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true);

首先创建配置信息和作业对象,设置作业的类。这些和正常的maprece一样,

[java] view plain在CODE上查看代码片派生到我的代码片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true);
唯一不一样的就是数据源的说明部分,Tabl<span style="color: rgb(64, 64, 64); font-family: Verdana, Geneva, sans-serif; font-size: 17px; background-color: rgb(245, 253, 255);"><strong>eMapReceUtil的initTableMapperJob和initTableRecerJob方法来实现。</strong></span>

用如上代码:
数据输入源是hbase的inputTable表,执行mapper.class进行map过程,输出的key/value类型是ImmutableBytesWritable和Put类型,最后一个参数是作业对象。需要指出的是需要声明一个扫描读入对象scan,进行表扫描读取数据用,其中scan可以配置参数,这里为了例子简单不再详述。
数据输出目标是hbase的outputTable表,输出执行的rece过程是recer.class类,操作的作业目标是job。与map比缺少输出类型的标注,因为他们不是必要的,看过源代码就知道maprece的TableRecordWriter中write(key,value)方法中,key值是没有用到的。value只能是Put或者Delete两种类型,write方法会自行判断并不用用户指明。

接下来就是mapper类:

[java] view plain在CODE上查看代码片派生到我的代码片
public class mapper extends
TableMapper<KEYOUT, VALUEOUT> {

public void map(Writable key, Writable value, Context context)
throws IOException, InterruptedException {
//mapper逻辑
context.write(key, value);
}

}
}
继承的是hbase中提供的TableMapper类,其实这个类也是继承的MapRece类。后边跟的两个泛型参数指定类型是mapper输出的数据类型,该类型必须继承自Writable类,例如可能用到的put和delete就可以。需要注意的是要和initTableMapperJob方法指定的数据类型一直。该过程会自动从指定hbase表内一行一行读取数据进行处理。

然后recer类:

[java] view plain在CODE上查看代码片派生到我的代码片
public class countUniteRedcuer extends
TableRecer<KEYIN, VALUEIN, KEYOUT> {
public void rece(Text key, Iterable<VALUEIN> values, Context context)
throws IOException, InterruptedException {
//recer逻辑
context.write(null, put or delete);
}
}

recer继承的是TableRecer类。后边指定三个泛型参数,前两个必须对应map过程的输出key/value类型,第三个必须是put或者delete。write的时候可以把key写null,它是不必要的。这样recer输出的数据会自动插入outputTable指定的表内。

2、有时候我们需要数据源是hdfs的文本,输出对象是hbase。这时候变化也很简单:

[java] view plain
<span style="background-color:transparent; line-height:1.3em; color:rgb(64,64,64); font-family:Verdana,Geneva,sans-serif; font-size:1.18em">你会</span><span style="background-color:transparent; font-family:Verdana,Geneva,sans-serif; font-size:1.18em; line-height:1.3em">发现只需要像平常的maprece的作业声明过程一样,指定mapper的执行类和输出key/value类型,指定FileInputFormat.setInputPaths的数据源路径,输出声明不变。便完成了从hdfs文本读取数据输出到hbase的命令声明过程。 mapper和recer如下:</span>
[java] view plain在CODE上查看代码片派生到我的代码片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);

job.setMapperClass(mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, path);

TableMapReceUtil.initTableRecerJob(tableName,
recer.class, job);

[java] view plain在CODE上查看代码片派生到我的代码片
public class mapper extends Mapper<LongWritable,Writable,Writable,Writable> {
public void map(LongWritable key, Text line, Context context) {
//mapper逻辑
context.write(k, one);
}
}
public class redcuer extends
TableRecer<KEYIN, VALUEIN, KEYOUT> {
public void rece(Writable key, Iterable<Writable> values, Context context)
throws IOException, InterruptedException {
//recer逻辑
context.write(null, put or delete);
}
}
[java] view plain
<span style="background-color:transparent; line-height:1.3em; color:rgb(64,64,64); font-family:Verdana,Geneva,sans-serif; font-size:1.18em">mapper还依旧继承原来的MapRece类中的Mapper即可。同样注意这前后数据类型的key/value一直性。</span>

[java] view plain
<span style="background-color:transparent; line-height:1.3em; color:rgb(64,64,64); font-family:Verdana,Geneva,sans-serif; font-size:1.18em">3、最后就是从hbase中的表作为数据源读取,hdfs作为数据输出,简单的如下:</span>

[java] view plain在CODE上查看代码片派生到我的代码片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
job.setOutputKeyClass(Writable.class);
job.setOutputValueClass(Writable.class);
FileOutputFormat.setOutputPath(job, Path);
job.waitForCompletion(true);

mapper和recer简单如下:

[java] view plain在CODE上查看代码片派生到我的代码片Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByCl www.jdjdzj.com ass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true); Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true);
唯一不一样的就是数据源的说明部分,Tabl<span style="color: rgb(64, 64, 64); font-family: Verdana, Geneva, sans-serif; font-size: 17px; background-color: rgb(245, 253, 255);"><strong>eMapReceUtil的initTableMapperJob和initTableRecerJob方法来实现。</strong></span> public class mapper extends
TableMapper<KEYOUT, VALUEOUT> {

public void map(Writable key, Writable value, Context context)
throws IOException, InterruptedException {
//mapper逻辑
context.write(key, value);
}

}
} public class countUniteRedcuer extends
TableRecer<KEYIN, VALUEIN, KEYOUT> {
public void rece(Text key, Iterable<VALUEIN> values, Context context)
throws IOException, InterruptedException {
//recer逻辑
context.write(null, put or delete);
}
}

阅读全文

与mapreducehbase编程相关的资料

热点内容
解密摄影pdf 浏览:72
算法编程中级题目 浏览:249
c语言编译器毕业设计 浏览:715
医保卡申请app哪个好 浏览:944
阿里云服务器上传源码 浏览:602
营销管理科特勒pdf 浏览:696
愿望清单app哪个好 浏览:459
安卓外放声音怎么解决 浏览:195
脉脉app干什么用的 浏览:360
拽姐是哪个app 浏览:860
云服务器删除了还有吗 浏览:234
macbook可以用单片机嘛 浏览:309
南阳php招聘 浏览:816
去哪里找按摩师很漂亮的app 浏览:821
86x99用简便算法计算 浏览:832
php截图flash 浏览:276
卸载联想app哪个好 浏览:721
php文字转图片 浏览:332
豆客后台怎么加密码 浏览:576
jpg转换pdf破解版 浏览:980