導航:首頁 > 編程語言 > hadoopmapreduce編程

hadoopmapreduce編程

發布時間:2024-11-24 01:15:36

『壹』 hadoop和maprece是一種什麼關系

hadoop是依據maprece的原理,用java語言實現的分布式處理機制。

Hadoop是一個能夠對大量數據進行分布式處理的軟體框架,實現了Google的MapRece編程模型和框架,能夠把應用程序分割成許多的小的工作單元,並把這些單元放到任何集群節點上執行。

MapRece是Hadoop中的一個數據運算核心模塊,MapRece通過JobClient生成任務運行文件,並在JobTracker進行調度指派TaskTracker完成任務。


(1)hadoopmaprece編程擴展閱讀

1、MapRece分布式計算框架原型:

MapRece分布式計算模型是由Google提出,主要用於搜索領域,解決海量數據的計算問題Apache對其做了開源實現,整合在hadoop中實現通用分布式數據計算。

MR由兩個階段組成:Map和Rece,用戶只需要實現map()和rece()兩個函數,即可實現分布式計算,非常簡單。大大簡化了分布式並發處理程序的開發。

Map階段就是進行分段處理。

Rece階段就是進行匯總處理。匯總之後還可以進行數據的一系列美化操作,然後再輸出。

2、MapRece組件介紹:

JobClient:用於把用戶的作業任務生成Job的運行包,並存放到HDFS中。

JobinProgress:把Job運行包分解成MapTask和ReceTask並存放於TaskTracker中。

JobTracker(Master):進行調度管理TaskTracker執行任務。

TaskTracker(Slave):執行分配下來的Map計算或Rece計算任務。

『貳』 如何快速地編寫和運行一個屬於自己的MapRece例子程序

大數據的時代, 到處張嘴閉嘴都是Hadoop, MapRece, 不跟上時代怎麼行? 可是對一個hadoop的新手, 寫一個屬於自己的MapRece程序還是小有點難度的, 需要建立一個maven項目, 還要搞清楚各種庫的依賴, 再加上編譯運行, 基本上頭大兩圈了吧。 這也使得很多隻是想簡單了解一下MapRece的人望而卻步。
本文會教你如何用最快最簡單的方法編寫和運行一個屬於自己的MapRece程序, let's go!
首先有兩個前提:
1. 有一個已經可以運行的hadoop 集群(也可以是偽分布系統), 上面的hdfs和maprece工作正常 (這個真的是最基本的了, 不再累述, 不會的請參考 http://hadoop.apache.org/docs/current/)
2. 集群上安裝了JDK (編譯運行時會用到)
正式開始
1. 首先登入hadoop 集群裡面的一個節點, 創建一個java源文件, 偷懶起見, 基本盜用官方的word count (因為本文的目的是教會你如何快編寫和運行一個MapRece程序, 而不是如何寫好一個功能齊全的MapRece程序)
內容如下:
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class myword {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void rece(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println('Usage: wordcount <in> <out>');
System.exit(2);
}
Job job = new Job(conf, 'word count');
job.setJarByClass(myword.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

與官方版本相比, 主要做了兩處修改
1) 為了簡單起見,去掉了開頭的 package org.apache.hadoop.examples;
2) 將類名從 WordCount 改為 myword, 以體現是我們自己的工作成果 :)
2. 拿到hadoop 運行的class path, 主要為編譯所用
運行命令
hadoop classpath

保存打出的結果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:
/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/*:/usr/lib/gphd/hadoop/.//*:/usr/lib/gphd/hadoop-hdfs/./:/usr/lib/gphd/hadoop-hdfs/lib/*:/usr/lib/gphd/hadoop-hdfs/.//*:/usr/lib/gphd/hadoop-yarn/lib/*:/usr/lib/gphd/hadoop-yarn/.//*:/usr/lib/gphd/hadoop-maprece/lib/*:/usr/lib/gphd/hadoop-maprece/.//*::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-core.jar:/usr/lib/gphd/pxf/pxf-api.jar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexd.jar::/usr/lib/gphd/zookeeper/zookeeper.jar:/usr/lib/gphd/hbase/lib/hbase-common.jar:/usr/lib/gphd/hbase/lib/hbase-protocol.jar:/usr/lib/gphd/hbase/lib/hbase-client.jar:/usr/lib/gphd/hbase/lib/hbase-thrift.jar:/usr/lib/gphd/hbase/lib/htrace-core-2.01.jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-service.jar:/usr/lib/gphd/hive/lib/libthrift-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-metastore.jar:/usr/lib/gphd/hive/lib/libfb303-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-common.jar:/usr/lib/gphd/hive/lib/hive-exec.jar:/usr/lib/gphd/hive/lib/postgresql-jdbc.jar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/*:

3. 編譯
運行命令
javac -classpath xxx ./myword.java

xxx部分就是上一步裡面取到的class path
運行完此命令後, 當前目錄下會生成一些.class 文件, 例如:
myword.class myword$IntSumRecer.class myword$TokenizerMapper.class
4. 將class文件打包成.jar文件
運行命令
jar -cvf myword.jar ./*.class

至此, 目標jar 文件成功生成
5. 准備一些文本文件, 上傳到hdfs, 以做word count的input
例子:
隨意創建一些文本文件, 保存到mapred_test 文件夾
運行命令
hadoop fs -put ./mapred_test/

確保此文件夾成功上傳到hdfs 當前用戶根目錄下
6. 運行我們的程序
運行命令
hadoop jar ./myword.jar myword mapred_test output

順利的話, 此命令會正常進行, 一個MapRece job 會開始工作, 輸出的結果會保存在 hdfs 當前用戶根目錄下的output 文件夾裡面。
至此大功告成!
如果還需要更多的功能, 我們可以修改前面的源文件以達到一個真正有用的MapRece job。
但是原理大同小異, 練手的話, 基本夠了。
一個拋磚引玉的簡單例子, 歡迎板磚。
轉載

『叄』 如何使用python為Hadoop編寫一個簡單的MapRece程序

MichaelG.Noll在他的Blog中提到如何在Hadoop中用Python編寫MapRece程序,韓國的gogamza在其Bolg中也提到如何用C編汪瞎寫MapRece程序(我稍微修改了一下原程序,因為他的Map對單詞切分使用tab鍵)。我合並他們兩人的文章,也讓國內的Hadoop用戶能夠使用別的語言來編寫MapRece程序。首先您得配好您的Hadoop集群,這方面的介紹網上比較多,這兒給個鏈接(Hadoop學習筆記二安裝部署)。HadoopStreaming幫助返鋒我們用非Java的編程語言使用MapRece,Streaming用STDIN(標准輸入)和STDOUT(標准輸出)來和我們編寫的Map和Rece進行數據的交換數據。任何能夠使用STDIN和STDOUT都可以用來編寫MapRece程序,比如我們用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。我們還是使用Hadoop的例子WordCount來做示範如何編寫MapRece,在WordCount的例子中漏陵晌我們要解決計算在一批文檔中每一個單詞的出現頻率。首先我們在Map程序中會接受到這批文檔每一行的數據,然後我們編寫的Map程序把這一行按空格切開成一個數組。並對這個數組遍歷按"1"用標準的輸出輸出來,代表這個單詞出現了一次。在Rece中我們來統計單詞的出現頻率。PythonCodeMap:mapper.py#!/usr/bin/envpythonimportsys#={}#inputcomesfromSTDIN(standardinput)forlineinsys.stdin:#=line.strip()#=filter(lambdaword:word,line.split())#:#writetheresultstoSTDOUT(standardoutput);##Recestep,i.e.theinputforrecer.py##tab-delimited;thetrivialwordcountis1print'%s\t%s'%(word,1)Rece:recer.py#!/usr/bin/#={}#.stdin:#=line.strip()#parsetheinputwegotfrommapper.pyword,count=line.split()#convertcount(currentlyastring)tointtry:count=int(count)word2count[word]=word2count.get(word,0)+countexceptValueError:#countwasnotanumber,sosilently#ignore/discardthislinepass#sortthewordslexigraphically;##thisstepisNOTrequired,wejustdoitsothatour##wordcountexamplessorted_word2count=sorted(word2count.items(),key=itemgetter(0))#writetheresultstoSTDOUT(standardoutput)forword,countinsorted_word2count:print'%s\t%s'%(word,count)CCodeMap:Mapper.c#include#include#include#include#defineBUF_SIZE2048#defineDELIM"\n"intmain(intargc,char*argv[]){charbuffer[BUF_SIZE];while(fgets(buffer,BUF_SIZE-1,stdin)){intlen=strlen(buffer);if(buffer[len-1]=='\n')buffer[len-1]=0;char*querys=index(buffer,'');char*query=NULL;if(querys==NULL)continue;querys+=1;/*nottoinclude'\t'*/query=strtok(buffer,"");while(query){printf("%s\t1\n",query);query=strtok(NULL,"");}}return0;}h>h>h>h>Rece:Recer.c#include#include#include#include#defineBUFFER_SIZE1024#defineDELIM"\t"intmain(intargc,char*argv[]){charstrLastKey[BUFFER_SIZE];charstrLine[BUFFER_SIZE];intcount=0;*strLastKey='\0';*strLine='\0';while(fgets(strLine,BUFFER_SIZE-1,stdin)){char*strCurrKey=NULL;char*strCurrNum=NULL;strCurrKey=strtok(strLine,DELIM);strCurrNum=strtok(NULL,DELIM);/*necessarytocheckerrorbut.*/if(strLastKey[0]=='\0'){strcpy(strLastKey,strCurrKey);}if(strcmp(strCurrKey,strLastKey)){printf("%s\t%d\n",strLastKey,count);count=atoi(strCurrNum);}else{count+=atoi(strCurrNum);}strcpy(strLastKey,strCurrKey);}printf("%s\t%d\n",strLastKey,count);/*flushthecount*/return0;}h>h>h>h>首先我們調試一下源碼:chmod+xmapper.pychmod+xrecer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./recer.pybar1foo3labs1quux2g++Mapper.c-oMapperg++Recer.c-oRecerchmod+xMapperchmod+xRecerecho"foofooquuxlabsfoobarquux"|./Mapper|./你可能看到C的輸出和Python的不一樣,因為Python是把他放在詞典里了.我們在Hadoop時,會對這進行排序,然後相同的單詞會連續在標准輸出中輸出.在Hadoop中運行程序首先我們要下載我們的測試文檔wget頁面中摘下的用php編寫的MapRece程序,供php程序員參考:Map:mapper.php#!/usr/bin/php$word2count=array();//inputcomesfromSTDIN(standardinput)while(($line=fgets(STDIN))!==false){//$line=strtolower(trim($line));//$words=preg_split('/\W/',$line,0,PREG_SPLIT_NO_EMPTY);//increasecountersforeach($wordsas$word){$word2count[$word]+=1;}}//writetheresultstoSTDOUT(standardoutput)////Recestep,i.e.theinputforrecer.pyforeach($word2countas$word=>$count){//tab-delimitedecho$word,chr(9),$count,PHP_EOL;}?>Rece:mapper.php#!/usr/bin/php$word2count=array();//inputcomesfromSTDINwhile(($line=fgets(STDIN))!==false){//$line=trim($line);//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line);//convertcount(currentlyastring)toint$count=intval($count);//sumcountsif($count>0)$word2count[$word]+=$count;}//sortthewordslexigraphically////thissetisNOTrequired,wejustdoitsothatour////wordcountexamplesksort($word2count);//writetheresultstoSTDOUT(standardoutput)foreach($word2countas$word=>$count){echo$word,chr(9),$count,PHP_EOL;}?>作者:馬士華發表於:2008-03-05

『肆』 hadoop maprece可以完成下圖功能嗎需要怎麼實現

把A作為map輸出的key,value為自己實現的類似vectorwritable的類存 a c
rece時候有sum1,sum2……sumN分別為vectorwritable相同index的和,最後各除values.size
rece的輸出value也為v e c to r w ri ta b le

『伍』 如何在Hadoop上編寫MapRece程序

1. 概述

1970年,IBM的研究員E.F.Codd博士在刊物《Communication of the ACM》上發表了一篇名為「A Relational Model of Data for Large Shared Data Banks」的論文,提出了關系模型的概念,標志著關系資料庫的誕生,隨後幾十年,關系資料庫及其結構化查詢語言SQL成為程序員必須掌握的基本技能之一。
2005年4月,Jeffrey Dean和Sanjay Ghemawat在國際會議OSDI上發表「MapRece: Simplified Data Processing on Large Cluster」,標志著google的大規模數據處理系統MapRece公開。受這篇論文的啟發,當年秋天,Hadoop 由 Apache Software Foundation 公司作為 Lucene 的子項目 Nutch 的一部分正式被引入,2006 年 3 月份,MapRece 和 Nutch Distributed File System (NDFS) 分別被納入稱為 Hadoop 的項目中。如今,Hadoop已經被超過50%的互聯網公司使用,其他很多公司正准備使用Hadoop來處理海量數據,隨著Hadoop越來越受歡迎,也許在將來的某段時間,Hadoop會成為程序員必須掌握的技能之一,如果真是這樣的話,學會如何在Hadoop上編寫MapRece程序便是學習Hadoop的開始。
本文介紹了在Hadoop上編寫MapRece程序的基本方法,包括MapRece程序的構成,不同語言開發MapRece的方法等。
2. Hadoop 作業構成
2.1 Hadoop作業執行流程
用戶配置並將一個Hadoop作業提到Hadoop框架中,Hadoop框架會把這個作業分解成一系列map tasks 和rece tasks。Hadoop框架負責task分發和執行,結果收集和作業進度監控。
下圖給出了一個作業從開始執行到結束所經歷的階段和每個階段被誰控制(用戶 or Hadoop框架)。

下圖詳細給出了用戶編寫MapRee作業時需要進行那些工作以及Hadoop框架自動完成的工作:

在編寫MapRece程序時,用戶分別通過InputFormat和OutputFormat指定輸入和輸出格式,並定義Mapper和Recer指定map階段和rece階段的要做的工作。在Mapper或者Recer中,用戶只需指定一對key/value的處理邏輯,Hadoop框架會自動順序迭代解析所有key/value,並將每對key/value交給Mapper或者Recer處理。表面上看來,Hadoop限定數據格式必須為key/value形式,過於簡單,很難解決復雜問題,實際上,可以通過組合的方法使key或者value(比如在key或者value中保存多個欄位,每個欄位用分隔符分開,或者value是個序列化後的對象,在Mapper中使用時,將其反序列化等)保存多重信息,以解決輸入格式較復雜的應用。
2.2 用戶的工作
用戶編寫MapRece需要實現的類或者方法有:
(1) InputFormat介面
用戶需要實現該介面以指定輸入文件的內容格式。該介面有兩個方法

1
2
3
4
5
6
7
8
9
10
11

public interface InputFormat<K, V> {

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

RecordReader<K, V> getRecordReader(InputSplit split,

JobConf job,

Reporter reporter) throws IOException;

}


其中getSplits函數將所有輸入數據分成numSplits個split,每個split交給一個map task處理。getRecordReader函數提供一個用戶解析split的迭代器對象,它將split中的每個record解析成key/value對。
Hadoop本身提供了一些InputFormat:

(2)Mapper介面
用戶需繼承Mapper介面實現自己的Mapper,Mapper中必須實現的函數是

1
2
3
4
5
6
7
8
9

void map(K1 key,

V1 value,

OutputCollector<K2,V2> output,

Reporter reporter

) throws IOException


其中,<K1 V1>是通過Inputformat中的RecordReader對象解析處理 的,OutputCollector獲取map()的輸出結果,Reporter保存了當前task處理進度。
Hadoop本身提供了一些Mapper供用戶使用:

(3)Partitioner介面
用戶需繼承該介面實現自己的Partitioner以指定map task產生的key/value對交給哪個rece task處理,好的Partitioner能讓每個rece task處理的數據相近,從而達到負載均衡。Partitioner中需實現的函數是
getPartition( K2 key, V2 value, int numPartitions)
該函數返回<K2 V2>對應的rece task ID。
用戶如果不提供Partitioner,Hadoop會使用默認的(實際上是個hash函數)。
(4)Combiner
Combiner使得map task與rece task之間的數據傳輸量大大減小,可明顯提高性能。大多數情況下,Combiner與Recer相同。
(5)Recer介面
用戶需繼承Recer介面實現自己的Recer,Recer中必須實現的函數是

1
2
3
4
5
6
7
8
9

void rece(K2 key,

Iterator<V2> values,

OutputCollector<K3,V3> output,

Reporter reporter

) throws IOException


Hadoop本身提供了一些Recer供用戶使用:

(6)OutputFormat
用戶通過OutputFormat指定輸出文件的內容格式,不過它沒有split。每個rece task將其數據寫入自己的文件,文件名為part-nnnnn,其中nnnnn為rece task的ID。
Hadoop本身提供了幾個OutputFormat:
3. 分布式緩存
Haoop中自帶了一個分布式緩存,即DistributedCache對象,方便map task之間或者rece task之間共享一些信息,比如某些實際應用中,所有map task要讀取同一個配置文件或者字典,則可將該配置文件或者字典放到分布式緩存中。
4. 多語言編寫MapRece作業
Hadoop採用java編寫,因而Hadoop天生支持java語言編寫作業,但在實際應用中,有時候,因要用到非java的第三方庫或者其他原因,要採用C/C++或者其他語言編寫MapRece作業,這時候可能要用到Hadoop提供的一些工具。
如果你要用C/C++編寫MpaRece作業,可使用的工具有Hadoop Streaming或者Hadoop Pipes。
如果你要用Python編寫MapRece作業,可以使用Hadoop Streaming或者Pydoop。
如果你要使用其他語言,如shell,php,ruby等,可使用Hadoop Streaming。
關於Hadoop Streaming編程,可參見我的這篇博文:《Hadoop Streaming編程》(http://dongxicheng.org/maprece/hadoop-streaming-programming/ )
關於Pydoop編程,可參見其官方網站:http://sourceforge.net/projects/pydoop/
關於Hadoop pipes編程,可參見《Hadoop Tutorial 2.2 — Running C++ Programs on Hadoop》。
5. 編程方式比較
(1)java。 Hadoop支持的最好最全面的語言,而且提供了很多工具方便程序員開發。
(2)Hadoop Streaming。 它最大的優點是支持多種語言,但效率較低,rece task需等到map 階段完成後才能啟動;它不支持用戶自定義InputFormat,如果用戶想指定輸入文件格式,可使用java語言編寫或者在命令行中指定分隔符;它採用標准輸入輸出讓C/C++與java通信,因而只支持text數據格式。
(3)Hadoop Pipes。 專門為C/C++語言設計,由於其採用了socket方式讓C/C++與java通信,因而其效率較低(其優勢在於,但作業需要大量,速度很快)。它支持用戶(用C/C++)編寫RecordReader。
(4)Pydoop。它是專門方便python程序員編寫MapRece作業設計的,其底層使用了Hadoop Streaming介面和libhdfs庫。
6. 總結
Hadoop使得分布式程序的編寫變得異常簡單,很多情況下,用戶只需寫map()和rece()兩個函數即可(InputFormat,Outputformat可用系統預設的)。正是由於Hadoop編程的簡單性,越來越多的公司或者研究單位開始使用Hadoop。

『陸』 有沒有關於maprece編程的書籍推薦

maprece編程書籍推薦一:《MapRece設計模式》


將各種有價值的MapRece設計模式匯集在一起,形成一本獨特的合集,可以幫讀者節省大量的時間和精力,無論讀者身處哪個領域,使用哪種編程語言,使用什麼開發框架。
書中對每一種模式都會詳細解釋其使用的上下文、可能存在的陷阱及使用的注意事項,以幫助讀者在對大數據問題架構建模時避免常見的設計錯誤。本書還提供了MapRece的一個完整綜述,解釋其起源和實現,並說明設計模式如此重要的原因。書中的所有示例代碼都是基於Hadoop平台編寫的。
maprece編程書籍推薦二:《Hadoop MapRece實戰手冊》

閱讀全文

與hadoopmapreduce編程相關的資料

熱點內容
mac白色文件夾問號 瀏覽:718
怎麼申請郵箱的伺服器 瀏覽:13
c項目兩個工程怎麼編譯 瀏覽:645
知乎app有什麼作用 瀏覽:451
單片機帶的比較器 瀏覽:391
程序員都是精英 瀏覽:19
10種編程語言 瀏覽:749
綿陽學駕駛手機上下什麼app 瀏覽:129
python如何模擬網頁操作 瀏覽:40
單片機多文件編譯方法 瀏覽:839
不動產壓縮時間 瀏覽:571
租房管理平台源碼 瀏覽:65
復樂園pdf 瀏覽:457
程序員找到公交車 瀏覽:698
嬰兒寶寶操有什麼APP推薦 瀏覽:73
如何將資料庫附加到伺服器上 瀏覽:391
php退出循環 瀏覽:479
夢幻西遊怎麼修改伺服器人數上限 瀏覽:332
自動開啟命令 瀏覽:847
查詢雲伺服器訪問的ip 瀏覽:838