導航:首頁 > 編程語言 > mapreducehbase編程

mapreducehbase編程

發布時間:2022-08-24 22:30:06

㈠ 如何用MapRece程序操作hbase

先看一個標準的hbase作為數據讀取源和輸出目標的樣例:
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class, Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true);1234567

和普通的mr程序不同的是,不再用job.setMapperClass()和job.setRecerClass()來設置mapper和recer,而用TableMapReceUtil的initTableMapperJob和initTableRecerJob方法來實現。此處的TableMapReceUtil是hadoop.hbase.maprece包中的,而不是hadoop.hbase.mapred包中的。
數據輸入源是hbase的inputTable表,執行mapper.class進行map過程,輸出的key/value類型是 ImmutableBytesWritable和Put類型,最後一個參數是作業對象。需要指出的是需要聲明一個掃描讀入對象scan,進行表掃描讀取數據用,其中scan可以配置參數。
數據輸出目標是hbase的outputTable表,輸出執行的rece過程是recer.class類,操作的作業目標是job。與map比缺少輸出類型的標注,因為他們不是必要的,看過源代碼就知道maprece的TableRecordWriter中write(key,value) 方法中,key值是沒有用到的,value只能是Put或者Delete兩種類型,write方法會自行判斷並不用用戶指明。
mapper類從hbase讀取數據,所以輸入的
public class mapper extends TableMapper<KEYOUT, VALUEOUT> {
public void map(Writable key, Writable value, Context context)
throws IOException, InterruptedException {
//mapper邏輯
context.write(key, value);
}
}1234567

mapper繼承的是TableMapper類,後邊跟的兩個泛型參數指定mapper輸出的數據類型,該類型必須繼承自Writable類,例如可能用到的put和delete就可以。需要注意的是要和initTableMapperJob 方法指定的數據類型一致。該過程會自動從指定hbase表內一行一行讀取數據進行處理。
recer類將數據寫入hbase,所以輸出的
public class recer extends TableRecer<KEYIN, VALUEIN, KEYOUT> {
public void rece(Text key, Iterable<VALUEIN> values, Context context)
throws IOException, InterruptedException {
//recer邏輯
context.write(null, put or delete);
}
}1234567

recer繼承的是TableRecer類,後邊指定三個泛型參數,前兩個必須對應map過程的輸出key/value類型,第三個是 The type of the output key,write的時候可以把key寫成IntWritable什麼的都行,它是不必要的。這樣recer輸出的數據會自動插入outputTable指定的表內。
TableMapper和TableRecer的本質就是為了簡化一下書寫代碼,因為傳入的4個泛型參數里都會有固定的參數類型,所以是Mapper和Recer的簡化版本,本質他們沒有任何區別。源碼如下:
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}

public abstract class TableRecer<KEYIN, VALUEIN, KEYOUT>
extends Recer<KEYIN, VALUEIN, KEYOUT, Writable> {
}1234567

封裝了一層確實方便多了,但也多了很多局限性,就不能在map里寫hbase嗎?
我他么試了一下午,約5個小時,就想在map里讀hdfs寫hbase,莫名其妙的各種問題,邏輯上應該沒有錯,跟著別人的文章做的。最後還是通過IdentityTableRecer這個類實現了,what's a fucking afternoon!
官方對IdentityTableRecer的說明是:Convenience class that simply writes all values (which must be Put or Delete instances) passed to it out to the configured HBase table.
這是一個工具類,將map輸出的value(只能是Put或Delete)pass給HBase。看例子:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.maprece.TableMapReceUtil;
import org.apache.hadoop.hbase.maprece.IdentityTableRecer;

public class WordCount
{
public static class TokenizerMapper
extends Mapper<Object, Text, Text, Put>
{
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
Put putrow = new Put(word.toString().getBytes());
putrow.add("info".getBytes(), "name".getBytes(),"iamvalue".getBytes());
context.write(word, putrow);
}
}
}

public static void main(String[] args) throws Exception
{
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "hdfs to hbase");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Put.class);//important
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
TableMapReceUtil.initTableRecerJob("test", IdentityTableRecer.class, job);
job.setNumReceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

㈡ 怎樣實現用MapRece讀取HBase中歷史版本的數據並傳到HDFS

WordCountHbaseReaderMapper類繼承了TableMapper< Text,Text>抽象類,TableMapper類專門用於完成MapRece中Map過程與Hbase表之間的操作。此時的map(ImmutableBytesWritable key,Result value,Context context)方法,第一個參數key為Hbase表的rowkey主鍵,第二個參數value為key主鍵對應的記錄集合,此處的map核心實現是遍歷key主鍵對應的記錄集合value,將其組合成一條記錄通過contentx.write(key,value)填充到< key,value>鍵值對中。
詳細源碼請參考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java
public static class WordCountHbaseReaderMapper extends
TableMapper<Text,Text>{

@Override
protected void map(ImmutableBytesWritable key,Result value,Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer("");
for(Entry<byte[],byte[]> entry:value.getFamilyMap("content".getBytes()).entrySet()){
String str = new String(entry.getValue());
//將位元組數組轉換為String類型
if(str != null){
sb.append(new String(entry.getKey()));
sb.append(":");
sb.append(str);
}
context.write(new Text(key.get()), new Text(new String(sb)));
}
}
}

3、 Recer函數實現
此處的WordCountHbaseReaderRece實現了直接輸出Map輸出的< key,value>鍵值對,沒有對其做任何處理。詳細源碼請參考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java
public static class WordCountHbaseReaderRece extends Recer<Text,Text,Text,Text>{
private Text result = new Text();
@Override
protected void rece(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
for(Text val:values){
result.set(val);
context.write(key, result);
}
}
}

4、 驅動函數實現
與WordCount的驅動類不同,在Job配置的時候沒有配置job.setMapperClass(),而是用以下方法執行Mapper類: TableMapReceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);
該方法指明了在執行job的Map過程時,數據輸入源是hbase的tablename表,通過掃描讀入對象scan對表進行全表掃描,為Map過程提供數據源輸入,通過WordCountHbaseReaderMapper.class執行Map過程,Map過程的輸出key/value類型是 Text.class與Text.class,最後一個參數是作業對象。特別注意:這里聲明的是一個最簡單的掃描讀入對象scan,進行表掃描讀取數據,其中scan可以配置參數,這里為了例子簡單不再詳述,用戶可自行嘗試。
詳細源碼請參考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java
public static void main(String[] args) throws Exception {
String tablename = "wordcount";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "Master");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 1) {
System.err.println("Usage: WordCountHbaseReader <out>");
System.exit(2);
}
Job job = new Job(conf, "WordCountHbaseReader");
job.setJarByClass(WordCountHbaseReader.class);
//設置任務數據的輸出路徑;
FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
job.setRecerClass(WordCountHbaseReaderRece.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);
//調用job.waitForCompletion(true) 執行任務,執行成功後退出;
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

5、部署運行
1)啟動Hadoop集群和Hbase服務
[hadoop@K-Master ~]$ start-dfs.sh #啟動hadoop HDFS文件管理系統
[hadoop@K-Master ~]$ start-mapred.sh #啟動hadoop MapRece分布式計算服務
[hadoop@K-Master ~]$ start-hbase.sh #啟動Hbase
[hadoop@K-Master ~]$ jps #查看進程
22003 HMaster
10611 SecondaryNameNode
22226 Jps
21938 HQuorumPeer
10709 JobTracker
22154 HRegionServer
20277 Main
10432 NameNode

㈢ maprece和hbase的關系,哪些是正確的

MapRece與HBase沒有關系:

MapRece:

MapRece是一種編程模型,用於大規模數據集的並行運算。概念"Map"和"Rece",是它們的主要思想,都是從函數式編程語言里借來的,還有從矢量編程語言里借來的特性。它極大地方便了編程人員在不會分布式並行編程的情況下,將自己的程序運行在分布式系統上。

(3)maprecehbase編程擴展閱讀:

MapRece集群中使用了大量的低端伺服器。因此,節點硬體故障和軟體錯誤是常態。因此,設計良好、容錯性高的並行計算系統不會因為節點故障而影響計算服務的質量。

任何節點的故障不應導致結果的不一致或不確定性;當任何節點失敗時,其他節點應該能夠無縫地接管失敗節點的計算任務。當故障節點被恢復時,它應該能夠自動無縫地加入集群,而不需要管理員手動配置系統。

MapRece並行計算軟體框架使用了多種有效的錯誤檢測和恢復機制,如節點自動重啟技術,使集群和計算框架對節點失效具有魯棒性,能夠有效地處理失效節點的檢測和恢復。

㈣ 在調試MapRece與HBase集成的程序時出現如下錯誤,各位前輩能否賜教,幫忙解答一下

看報錯應該是缺少zookeeper依賴jar包,看一下hadoop使用的zookeeper版本和hbase使用的是否版本一致。或者hbase使用的是不是內置的zookeeper

㈤ 如何快速地編寫和運行一個屬於自己的MapRece例子程序

大數據的時代, 到處張嘴閉嘴都是Hadoop, MapRece, 不跟上時代怎麼行? 可是對一個hadoop的新手, 寫一個屬於自己的MapRece程序還是小有點難度的, 需要建立一個maven項目, 還要搞清楚各種庫的依賴, 再加上編譯運行, 基本上頭大兩圈了吧。 這也使得很多隻是想簡單了解一下MapRece的人望而卻步。
本文會教你如何用最快最簡單的方法編寫和運行一個屬於自己的MapRece程序, let's go!
首先有兩個前提:
1. 有一個已經可以運行的hadoop 集群(也可以是偽分布系統), 上面的hdfs和maprece工作正常 (這個真的是最基本的了, 不再累述, 不會的請參考 http://hadoop.apache.org/docs/current/)
2. 集群上安裝了JDK (編譯運行時會用到)
正式開始
1. 首先登入hadoop 集群裡面的一個節點, 創建一個java源文件, 偷懶起見, 基本盜用官方的word count (因為本文的目的是教會你如何快編寫和運行一個MapRece程序, 而不是如何寫好一個功能齊全的MapRece程序)
內容如下:
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class myword {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void rece(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println('Usage: wordcount <in> <out>');
System.exit(2);
}
Job job = new Job(conf, 'word count');
job.setJarByClass(myword.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

與官方版本相比, 主要做了兩處修改
1) 為了簡單起見,去掉了開頭的 package org.apache.hadoop.examples;
2) 將類名從 WordCount 改為 myword, 以體現是我們自己的工作成果 :)
2. 拿到hadoop 運行的class path, 主要為編譯所用
運行命令
hadoop classpath

保存打出的結果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:
/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/*:/usr/lib/gphd/hadoop/.//*:/usr/lib/gphd/hadoop-hdfs/./:/usr/lib/gphd/hadoop-hdfs/lib/*:/usr/lib/gphd/hadoop-hdfs/.//*:/usr/lib/gphd/hadoop-yarn/lib/*:/usr/lib/gphd/hadoop-yarn/.//*:/usr/lib/gphd/hadoop-maprece/lib/*:/usr/lib/gphd/hadoop-maprece/.//*::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-core.jar:/usr/lib/gphd/pxf/pxf-api.jar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexd.jar::/usr/lib/gphd/zookeeper/zookeeper.jar:/usr/lib/gphd/hbase/lib/hbase-common.jar:/usr/lib/gphd/hbase/lib/hbase-protocol.jar:/usr/lib/gphd/hbase/lib/hbase-client.jar:/usr/lib/gphd/hbase/lib/hbase-thrift.jar:/usr/lib/gphd/hbase/lib/htrace-core-2.01.jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-service.jar:/usr/lib/gphd/hive/lib/libthrift-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-metastore.jar:/usr/lib/gphd/hive/lib/libfb303-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-common.jar:/usr/lib/gphd/hive/lib/hive-exec.jar:/usr/lib/gphd/hive/lib/postgresql-jdbc.jar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/*:

3. 編譯
運行命令
javac -classpath xxx ./myword.java

xxx部分就是上一步裡面取到的class path
運行完此命令後, 當前目錄下會生成一些.class 文件, 例如:
myword.class myword$IntSumRecer.class myword$TokenizerMapper.class
4. 將class文件打包成.jar文件
運行命令
jar -cvf myword.jar ./*.class

至此, 目標jar 文件成功生成
5. 准備一些文本文件, 上傳到hdfs, 以做word count的input
例子:
隨意創建一些文本文件, 保存到mapred_test 文件夾
運行命令
hadoop fs -put ./mapred_test/

確保此文件夾成功上傳到hdfs 當前用戶根目錄下
6. 運行我們的程序
運行命令
hadoop jar ./myword.jar myword mapred_test output

順利的話, 此命令會正常進行, 一個MapRece job 會開始工作, 輸出的結果會保存在 hdfs 當前用戶根目錄下的output 文件夾裡面。
至此大功告成!
如果還需要更多的功能, 我們可以修改前面的源文件以達到一個真正有用的MapRece job。
但是原理大同小異, 練手的話, 基本夠了。
一個拋磚引玉的簡單例子, 歡迎板磚。
轉載

㈥ 如何執行hbase 的maprece job

package test;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;

public class Htable {

/**
* @param args
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
Configuration hbaseConf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(hbaseConf);
HTableDescriptor htableDescriptor = new HTableDescriptor("table"
.getBytes()); //set the name of table
htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters
admin.createTable(htableDescriptor); //create a table
HTable table = new HTable(hbaseConf, "table"); //get instance of table.
for (int i = 0; i < 3; i++) { //for is number of rows
Put putRow = new Put(("row" + i).getBytes()); //the ith row
putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1"
.getBytes()); //set the name of column and value.
putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2"
.getBytes());
putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3"
.getBytes());
table.put(putRow);
}
for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters
for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result
String column = new String(entry.getKey());
String value = new String(entry.getValue());
System.out.println(column+","+value);
}
}
admin.disableTable("table".getBytes()); //disable the table
admin.deleteTable("table".getBytes()); //drop the tbale
}
}

㈦ 如何在Maprece載入多個Hbase表

如果你自己用「調」api,來讀寫hbase的話,我覺得具體考慮的話是任務能否最終實現的問題了,畢竟maprece所做的工作很多,它自己的master,zookeeper,hbase的master之間的通信,計算任務的rece和mapping,細節太多,考慮到maprece通常處理的數

㈧ Hadoop和MapRece究竟分別是做什麼用的

Hadoop是用來開發分布式程序的架構,是一個由Apache基金會所開發的分布式系統基礎架構。用戶可以在不了解分布式底層細節的情況下,開發分布式程序。

MapRece是用來做大規模並行數據處理的數據模型。方便了編程人員在不會分布式並行編程的情況下,將自己的程序運行在分布式系統上。


(8)maprecehbase編程擴展閱讀

Hadoop是一個能夠讓用戶輕松架構和使用的分布式計算平台。用戶可以輕松地在Hadoop上開發和運行處理海量數據的應用程序。主要有以下幾個優點 :

1、高可靠性。Hadoop按位存儲和處理數據的能力值得人們信賴 。

2、高擴展性。Hadoop是在可用的計算機集簇間分配數據並完成計算任務的,這些集簇可以方便地擴展到數以千計的節點中 。

3、高效性。Hadoop能夠在節點之間動態地移動數據,並保證各個節點的動態平衡,因此處理速度非常快 。

4、高容錯性。Hadoop能夠自動保存數據的多個副本,並且能夠自動將失敗的任務重新分配。

5、低成本。與一體機、商用數據倉庫以及QlikView、Yonghong Z-Suite等數據集市相比,hadoop是開源的,項目的軟體成本因此會大大降低。


㈨ maprece怎麼導入hbace

1、先看一個標準的hbase作為數據讀取源和輸出源的樣例:

[java] view plain在CODE上查看代碼片派生到我的代碼片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true);

首先創建配置信息和作業對象,設置作業的類。這些和正常的maprece一樣,

[java] view plain在CODE上查看代碼片派生到我的代碼片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true);
唯一不一樣的就是數據源的說明部分,Tabl<span style="color: rgb(64, 64, 64); font-family: Verdana, Geneva, sans-serif; font-size: 17px; background-color: rgb(245, 253, 255);"><strong>eMapReceUtil的initTableMapperJob和initTableRecerJob方法來實現。</strong></span>

用如上代碼:
數據輸入源是hbase的inputTable表,執行mapper.class進行map過程,輸出的key/value類型是ImmutableBytesWritable和Put類型,最後一個參數是作業對象。需要指出的是需要聲明一個掃描讀入對象scan,進行表掃描讀取數據用,其中scan可以配置參數,這里為了例子簡單不再詳述。
數據輸出目標是hbase的outputTable表,輸出執行的rece過程是recer.class類,操作的作業目標是job。與map比缺少輸出類型的標注,因為他們不是必要的,看過源代碼就知道maprece的TableRecordWriter中write(key,value)方法中,key值是沒有用到的。value只能是Put或者Delete兩種類型,write方法會自行判斷並不用用戶指明。

接下來就是mapper類:

[java] view plain在CODE上查看代碼片派生到我的代碼片
public class mapper extends
TableMapper<KEYOUT, VALUEOUT> {

public void map(Writable key, Writable value, Context context)
throws IOException, InterruptedException {
//mapper邏輯
context.write(key, value);
}

}
}
繼承的是hbase中提供的TableMapper類,其實這個類也是繼承的MapRece類。後邊跟的兩個泛型參數指定類型是mapper輸出的數據類型,該類型必須繼承自Writable類,例如可能用到的put和delete就可以。需要注意的是要和initTableMapperJob方法指定的數據類型一直。該過程會自動從指定hbase表內一行一行讀取數據進行處理。

然後recer類:

[java] view plain在CODE上查看代碼片派生到我的代碼片
public class countUniteRedcuer extends
TableRecer<KEYIN, VALUEIN, KEYOUT> {
public void rece(Text key, Iterable<VALUEIN> values, Context context)
throws IOException, InterruptedException {
//recer邏輯
context.write(null, put or delete);
}
}

recer繼承的是TableRecer類。後邊指定三個泛型參數,前兩個必須對應map過程的輸出key/value類型,第三個必須是put或者delete。write的時候可以把key寫null,它是不必要的。這樣recer輸出的數據會自動插入outputTable指定的表內。

2、有時候我們需要數據源是hdfs的文本,輸出對象是hbase。這時候變化也很簡單:

[java] view plain
<span style="background-color:transparent; line-height:1.3em; color:rgb(64,64,64); font-family:Verdana,Geneva,sans-serif; font-size:1.18em">你會</span><span style="background-color:transparent; font-family:Verdana,Geneva,sans-serif; font-size:1.18em; line-height:1.3em">發現只需要像平常的maprece的作業聲明過程一樣,指定mapper的執行類和輸出key/value類型,指定FileInputFormat.setInputPaths的數據源路徑,輸出聲明不變。便完成了從hdfs文本讀取數據輸出到hbase的命令聲明過程。 mapper和recer如下:</span>
[java] view plain在CODE上查看代碼片派生到我的代碼片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);

job.setMapperClass(mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, path);

TableMapReceUtil.initTableRecerJob(tableName,
recer.class, job);

[java] view plain在CODE上查看代碼片派生到我的代碼片
public class mapper extends Mapper<LongWritable,Writable,Writable,Writable> {
public void map(LongWritable key, Text line, Context context) {
//mapper邏輯
context.write(k, one);
}
}
public class redcuer extends
TableRecer<KEYIN, VALUEIN, KEYOUT> {
public void rece(Writable key, Iterable<Writable> values, Context context)
throws IOException, InterruptedException {
//recer邏輯
context.write(null, put or delete);
}
}
[java] view plain
<span style="background-color:transparent; line-height:1.3em; color:rgb(64,64,64); font-family:Verdana,Geneva,sans-serif; font-size:1.18em">mapper還依舊繼承原來的MapRece類中的Mapper即可。同樣注意這前後數據類型的key/value一直性。</span>

[java] view plain
<span style="background-color:transparent; line-height:1.3em; color:rgb(64,64,64); font-family:Verdana,Geneva,sans-serif; font-size:1.18em">3、最後就是從hbase中的表作為數據源讀取,hdfs作為數據輸出,簡單的如下:</span>

[java] view plain在CODE上查看代碼片派生到我的代碼片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
job.setOutputKeyClass(Writable.class);
job.setOutputValueClass(Writable.class);
FileOutputFormat.setOutputPath(job, Path);
job.waitForCompletion(true);

mapper和recer簡單如下:

[java] view plain在CODE上查看代碼片派生到我的代碼片Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByCl www.jdjdzj.com ass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true); Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReceUtil.initTableRecerJob(outputTable, recer.class, job);
job.waitForCompletion(true);
唯一不一樣的就是數據源的說明部分,Tabl<span style="color: rgb(64, 64, 64); font-family: Verdana, Geneva, sans-serif; font-size: 17px; background-color: rgb(245, 253, 255);"><strong>eMapReceUtil的initTableMapperJob和initTableRecerJob方法來實現。</strong></span> public class mapper extends
TableMapper<KEYOUT, VALUEOUT> {

public void map(Writable key, Writable value, Context context)
throws IOException, InterruptedException {
//mapper邏輯
context.write(key, value);
}

}
} public class countUniteRedcuer extends
TableRecer<KEYIN, VALUEIN, KEYOUT> {
public void rece(Text key, Iterable<VALUEIN> values, Context context)
throws IOException, InterruptedException {
//recer邏輯
context.write(null, put or delete);
}
}

閱讀全文

與mapreducehbase編程相關的資料

熱點內容
條件加密 瀏覽:628
androidstudio設置中文 瀏覽:641
汽車換壓縮機能提升製冷 瀏覽:628
安卓開發配什麼電腦 瀏覽:607
linux下php模塊 瀏覽:78
阿里雲伺服器終端在哪裡 瀏覽:146
app紙有什麼用 瀏覽:223
cuteftp命令 瀏覽:506
最開始的編程語言是什麼 瀏覽:759
at遠程命令 瀏覽:492
雲伺服器哪家好點 瀏覽:213
android系統源碼閱讀 瀏覽:931
dumpjava分析工具 瀏覽:680
怎麼下載cpu源碼 瀏覽:156
代碼加密怎麼取消 瀏覽:890
編譯原理代碼在哪裡運行 瀏覽:586
解密攝影pdf 瀏覽:76
演算法編程中級題目 瀏覽:253
c語言編譯器畢業設計 瀏覽:719
醫保卡申請app哪個好 瀏覽:950