导航:首页 > 编程语言 > hadoopmapreduce编程

hadoopmapreduce编程

发布时间:2024-11-24 01:15:36

‘壹’ hadoop和maprece是一种什么关系

hadoop是依据maprece的原理,用java语言实现的分布式处理机制。

Hadoop是一个能够对大量数据进行分布式处理的软件框架,实现了Google的MapRece编程模型和框架,能够把应用程序分割成许多的小的工作单元,并把这些单元放到任何集群节点上执行。

MapRece是Hadoop中的一个数据运算核心模块,MapRece通过JobClient生成任务运行文件,并在JobTracker进行调度指派TaskTracker完成任务。


(1)hadoopmaprece编程扩展阅读

1、MapRece分布式计算框架原型:

MapRece分布式计算模型是由Google提出,主要用于搜索领域,解决海量数据的计算问题Apache对其做了开源实现,整合在hadoop中实现通用分布式数据计算。

MR由两个阶段组成:Map和Rece,用户只需要实现map()和rece()两个函数,即可实现分布式计算,非常简单。大大简化了分布式并发处理程序的开发。

Map阶段就是进行分段处理。

Rece阶段就是进行汇总处理。汇总之后还可以进行数据的一系列美化操作,然后再输出。

2、MapRece组件介绍:

JobClient:用于把用户的作业任务生成Job的运行包,并存放到HDFS中。

JobinProgress:把Job运行包分解成MapTask和ReceTask并存放于TaskTracker中。

JobTracker(Master):进行调度管理TaskTracker执行任务。

TaskTracker(Slave):执行分配下来的Map计算或Rece计算任务。

‘贰’ 如何快速地编写和运行一个属于自己的MapRece例子程序

大数据的时代, 到处张嘴闭嘴都是Hadoop, MapRece, 不跟上时代怎么行? 可是对一个hadoop的新手, 写一个属于自己的MapRece程序还是小有点难度的, 需要建立一个maven项目, 还要搞清楚各种库的依赖, 再加上编译运行, 基本上头大两圈了吧。 这也使得很多只是想简单了解一下MapRece的人望而却步。
本文会教你如何用最快最简单的方法编写和运行一个属于自己的MapRece程序, let's go!
首先有两个前提:
1. 有一个已经可以运行的hadoop 集群(也可以是伪分布系统), 上面的hdfs和maprece工作正常 (这个真的是最基本的了, 不再累述, 不会的请参考 http://hadoop.apache.org/docs/current/)
2. 集群上安装了JDK (编译运行时会用到)
正式开始
1. 首先登入hadoop 集群里面的一个节点, 创建一个java源文件, 偷懒起见, 基本盗用官方的word count (因为本文的目的是教会你如何快编写和运行一个MapRece程序, 而不是如何写好一个功能齐全的MapRece程序)
内容如下:
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class myword {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void rece(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println('Usage: wordcount <in> <out>');
System.exit(2);
}
Job job = new Job(conf, 'word count');
job.setJarByClass(myword.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

与官方版本相比, 主要做了两处修改
1) 为了简单起见,去掉了开头的 package org.apache.hadoop.examples;
2) 将类名从 WordCount 改为 myword, 以体现是我们自己的工作成果 :)
2. 拿到hadoop 运行的class path, 主要为编译所用
运行命令
hadoop classpath

保存打出的结果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:
/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/*:/usr/lib/gphd/hadoop/.//*:/usr/lib/gphd/hadoop-hdfs/./:/usr/lib/gphd/hadoop-hdfs/lib/*:/usr/lib/gphd/hadoop-hdfs/.//*:/usr/lib/gphd/hadoop-yarn/lib/*:/usr/lib/gphd/hadoop-yarn/.//*:/usr/lib/gphd/hadoop-maprece/lib/*:/usr/lib/gphd/hadoop-maprece/.//*::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-core.jar:/usr/lib/gphd/pxf/pxf-api.jar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexd.jar::/usr/lib/gphd/zookeeper/zookeeper.jar:/usr/lib/gphd/hbase/lib/hbase-common.jar:/usr/lib/gphd/hbase/lib/hbase-protocol.jar:/usr/lib/gphd/hbase/lib/hbase-client.jar:/usr/lib/gphd/hbase/lib/hbase-thrift.jar:/usr/lib/gphd/hbase/lib/htrace-core-2.01.jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-service.jar:/usr/lib/gphd/hive/lib/libthrift-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-metastore.jar:/usr/lib/gphd/hive/lib/libfb303-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-common.jar:/usr/lib/gphd/hive/lib/hive-exec.jar:/usr/lib/gphd/hive/lib/postgresql-jdbc.jar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/*:

3. 编译
运行命令
javac -classpath xxx ./myword.java

xxx部分就是上一步里面取到的class path
运行完此命令后, 当前目录下会生成一些.class 文件, 例如:
myword.class myword$IntSumRecer.class myword$TokenizerMapper.class
4. 将class文件打包成.jar文件
运行命令
jar -cvf myword.jar ./*.class

至此, 目标jar 文件成功生成
5. 准备一些文本文件, 上传到hdfs, 以做word count的input
例子:
随意创建一些文本文件, 保存到mapred_test 文件夹
运行命令
hadoop fs -put ./mapred_test/

确保此文件夹成功上传到hdfs 当前用户根目录下
6. 运行我们的程序
运行命令
hadoop jar ./myword.jar myword mapred_test output

顺利的话, 此命令会正常进行, 一个MapRece job 会开始工作, 输出的结果会保存在 hdfs 当前用户根目录下的output 文件夹里面。
至此大功告成!
如果还需要更多的功能, 我们可以修改前面的源文件以达到一个真正有用的MapRece job。
但是原理大同小异, 练手的话, 基本够了。
一个抛砖引玉的简单例子, 欢迎板砖。
转载

‘叁’ 如何使用python为Hadoop编写一个简单的MapRece程序

MichaelG.Noll在他的Blog中提到如何在Hadoop中用Python编写MapRece程序,韩国的gogamza在其Bolg中也提到如何用C编汪瞎写MapRece程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapRece程序。首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二安装部署)。HadoopStreaming帮助返锋我们用非Java的编程语言使用MapRece,Streaming用STDIN(标准输入)和STDOUT(标准输出)来和我们编写的Map和Rece进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapRece程序,比如我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。我们还是使用Hadoop的例子WordCount来做示范如何编写MapRece,在WordCount的例子中漏陵晌我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按"1"用标准的输出输出来,代表这个单词出现了一次。在Rece中我们来统计单词的出现频率。PythonCodeMap:mapper.py#!/usr/bin/envpythonimportsys#={}#inputcomesfromSTDIN(standardinput)forlineinsys.stdin:#=line.strip()#=filter(lambdaword:word,line.split())#:#writetheresultstoSTDOUT(standardoutput);##Recestep,i.e.theinputforrecer.py##tab-delimited;thetrivialwordcountis1print'%s\t%s'%(word,1)Rece:recer.py#!/usr/bin/#={}#.stdin:#=line.strip()#parsetheinputwegotfrommapper.pyword,count=line.split()#convertcount(currentlyastring)tointtry:count=int(count)word2count[word]=word2count.get(word,0)+countexceptValueError:#countwasnotanumber,sosilently#ignore/discardthislinepass#sortthewordslexigraphically;##thisstepisNOTrequired,wejustdoitsothatour##wordcountexamplessorted_word2count=sorted(word2count.items(),key=itemgetter(0))#writetheresultstoSTDOUT(standardoutput)forword,countinsorted_word2count:print'%s\t%s'%(word,count)CCodeMap:Mapper.c#include#include#include#include#defineBUF_SIZE2048#defineDELIM"\n"intmain(intargc,char*argv[]){charbuffer[BUF_SIZE];while(fgets(buffer,BUF_SIZE-1,stdin)){intlen=strlen(buffer);if(buffer[len-1]=='\n')buffer[len-1]=0;char*querys=index(buffer,'');char*query=NULL;if(querys==NULL)continue;querys+=1;/*nottoinclude'\t'*/query=strtok(buffer,"");while(query){printf("%s\t1\n",query);query=strtok(NULL,"");}}return0;}h>h>h>h>Rece:Recer.c#include#include#include#include#defineBUFFER_SIZE1024#defineDELIM"\t"intmain(intargc,char*argv[]){charstrLastKey[BUFFER_SIZE];charstrLine[BUFFER_SIZE];intcount=0;*strLastKey='\0';*strLine='\0';while(fgets(strLine,BUFFER_SIZE-1,stdin)){char*strCurrKey=NULL;char*strCurrNum=NULL;strCurrKey=strtok(strLine,DELIM);strCurrNum=strtok(NULL,DELIM);/*necessarytocheckerrorbut.*/if(strLastKey[0]=='\0'){strcpy(strLastKey,strCurrKey);}if(strcmp(strCurrKey,strLastKey)){printf("%s\t%d\n",strLastKey,count);count=atoi(strCurrNum);}else{count+=atoi(strCurrNum);}strcpy(strLastKey,strCurrKey);}printf("%s\t%d\n",strLastKey,count);/*flushthecount*/return0;}h>h>h>h>首先我们调试一下源码:chmod+xmapper.pychmod+xrecer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./recer.pybar1foo3labs1quux2g++Mapper.c-oMapperg++Recer.c-oRecerchmod+xMapperchmod+xRecerecho"foofooquuxlabsfoobarquux"|./Mapper|./你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.在Hadoop中运行程序首先我们要下载我们的测试文档wget页面中摘下的用php编写的MapRece程序,供php程序员参考:Map:mapper.php#!/usr/bin/php$word2count=array();//inputcomesfromSTDIN(standardinput)while(($line=fgets(STDIN))!==false){//$line=strtolower(trim($line));//$words=preg_split('/\W/',$line,0,PREG_SPLIT_NO_EMPTY);//increasecountersforeach($wordsas$word){$word2count[$word]+=1;}}//writetheresultstoSTDOUT(standardoutput)////Recestep,i.e.theinputforrecer.pyforeach($word2countas$word=>$count){//tab-delimitedecho$word,chr(9),$count,PHP_EOL;}?>Rece:mapper.php#!/usr/bin/php$word2count=array();//inputcomesfromSTDINwhile(($line=fgets(STDIN))!==false){//$line=trim($line);//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line);//convertcount(currentlyastring)toint$count=intval($count);//sumcountsif($count>0)$word2count[$word]+=$count;}//sortthewordslexigraphically////thissetisNOTrequired,wejustdoitsothatour////wordcountexamplesksort($word2count);//writetheresultstoSTDOUT(standardoutput)foreach($word2countas$word=>$count){echo$word,chr(9),$count,PHP_EOL;}?>作者:马士华发表于:2008-03-05

‘肆’ hadoop maprece可以完成下图功能吗需要怎么实现

把A作为map输出的key,value为自己实现的类似vectorwritable的类存 a c
rece时候有sum1,sum2……sumN分别为vectorwritable相同index的和,最后各除values.size
rece的输出value也为v e c to r w ri ta b le

‘伍’ 如何在Hadoop上编写MapRece程序

1. 概述

1970年,IBM的研究员E.F.Codd博士在刊物《Communication of the ACM》上发表了一篇名为“A Relational Model of Data for Large Shared Data Banks”的论文,提出了关系模型的概念,标志着关系数据库的诞生,随后几十年,关系数据库及其结构化查询语言SQL成为程序员必须掌握的基本技能之一。
2005年4月,Jeffrey Dean和Sanjay Ghemawat在国际会议OSDI上发表“MapRece: Simplified Data Processing on Large Cluster”,标志着google的大规模数据处理系统MapRece公开。受这篇论文的启发,当年秋天,Hadoop 由 Apache Software Foundation 公司作为 Lucene 的子项目 Nutch 的一部分正式被引入,2006 年 3 月份,MapRece 和 Nutch Distributed File System (NDFS) 分别被纳入称为 Hadoop 的项目中。如今,Hadoop已经被超过50%的互联网公司使用,其他很多公司正准备使用Hadoop来处理海量数据,随着Hadoop越来越受欢迎,也许在将来的某段时间,Hadoop会成为程序员必须掌握的技能之一,如果真是这样的话,学会如何在Hadoop上编写MapRece程序便是学习Hadoop的开始。
本文介绍了在Hadoop上编写MapRece程序的基本方法,包括MapRece程序的构成,不同语言开发MapRece的方法等。
2. Hadoop 作业构成
2.1 Hadoop作业执行流程
用户配置并将一个Hadoop作业提到Hadoop框架中,Hadoop框架会把这个作业分解成一系列map tasks 和rece tasks。Hadoop框架负责task分发和执行,结果收集和作业进度监控。
下图给出了一个作业从开始执行到结束所经历的阶段和每个阶段被谁控制(用户 or Hadoop框架)。

下图详细给出了用户编写MapRee作业时需要进行那些工作以及Hadoop框架自动完成的工作:

在编写MapRece程序时,用户分别通过InputFormat和OutputFormat指定输入和输出格式,并定义Mapper和Recer指定map阶段和rece阶段的要做的工作。在Mapper或者Recer中,用户只需指定一对key/value的处理逻辑,Hadoop框架会自动顺序迭代解析所有key/value,并将每对key/value交给Mapper或者Recer处理。表面上看来,Hadoop限定数据格式必须为key/value形式,过于简单,很难解决复杂问题,实际上,可以通过组合的方法使key或者value(比如在key或者value中保存多个字段,每个字段用分隔符分开,或者value是个序列化后的对象,在Mapper中使用时,将其反序列化等)保存多重信息,以解决输入格式较复杂的应用。
2.2 用户的工作
用户编写MapRece需要实现的类或者方法有:
(1) InputFormat接口
用户需要实现该接口以指定输入文件的内容格式。该接口有两个方法

1
2
3
4
5
6
7
8
9
10
11

public interface InputFormat<K, V> {

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

RecordReader<K, V> getRecordReader(InputSplit split,

JobConf job,

Reporter reporter) throws IOException;

}


其中getSplits函数将所有输入数据分成numSplits个split,每个split交给一个map task处理。getRecordReader函数提供一个用户解析split的迭代器对象,它将split中的每个record解析成key/value对。
Hadoop本身提供了一些InputFormat:

(2)Mapper接口
用户需继承Mapper接口实现自己的Mapper,Mapper中必须实现的函数是

1
2
3
4
5
6
7
8
9

void map(K1 key,

V1 value,

OutputCollector<K2,V2> output,

Reporter reporter

) throws IOException


其中,<K1 V1>是通过Inputformat中的RecordReader对象解析处理 的,OutputCollector获取map()的输出结果,Reporter保存了当前task处理进度。
Hadoop本身提供了一些Mapper供用户使用:

(3)Partitioner接口
用户需继承该接口实现自己的Partitioner以指定map task产生的key/value对交给哪个rece task处理,好的Partitioner能让每个rece task处理的数据相近,从而达到负载均衡。Partitioner中需实现的函数是
getPartition( K2 key, V2 value, int numPartitions)
该函数返回<K2 V2>对应的rece task ID。
用户如果不提供Partitioner,Hadoop会使用默认的(实际上是个hash函数)。
(4)Combiner
Combiner使得map task与rece task之间的数据传输量大大减小,可明显提高性能。大多数情况下,Combiner与Recer相同。
(5)Recer接口
用户需继承Recer接口实现自己的Recer,Recer中必须实现的函数是

1
2
3
4
5
6
7
8
9

void rece(K2 key,

Iterator<V2> values,

OutputCollector<K3,V3> output,

Reporter reporter

) throws IOException


Hadoop本身提供了一些Recer供用户使用:

(6)OutputFormat
用户通过OutputFormat指定输出文件的内容格式,不过它没有split。每个rece task将其数据写入自己的文件,文件名为part-nnnnn,其中nnnnn为rece task的ID。
Hadoop本身提供了几个OutputFormat:
3. 分布式缓存
Haoop中自带了一个分布式缓存,即DistributedCache对象,方便map task之间或者rece task之间共享一些信息,比如某些实际应用中,所有map task要读取同一个配置文件或者字典,则可将该配置文件或者字典放到分布式缓存中。
4. 多语言编写MapRece作业
Hadoop采用java编写,因而Hadoop天生支持java语言编写作业,但在实际应用中,有时候,因要用到非java的第三方库或者其他原因,要采用C/C++或者其他语言编写MapRece作业,这时候可能要用到Hadoop提供的一些工具。
如果你要用C/C++编写MpaRece作业,可使用的工具有Hadoop Streaming或者Hadoop Pipes。
如果你要用Python编写MapRece作业,可以使用Hadoop Streaming或者Pydoop。
如果你要使用其他语言,如shell,php,ruby等,可使用Hadoop Streaming。
关于Hadoop Streaming编程,可参见我的这篇博文:《Hadoop Streaming编程》(http://dongxicheng.org/maprece/hadoop-streaming-programming/ )
关于Pydoop编程,可参见其官方网站:http://sourceforge.net/projects/pydoop/
关于Hadoop pipes编程,可参见《Hadoop Tutorial 2.2 — Running C++ Programs on Hadoop》。
5. 编程方式比较
(1)java。 Hadoop支持的最好最全面的语言,而且提供了很多工具方便程序员开发。
(2)Hadoop Streaming。 它最大的优点是支持多种语言,但效率较低,rece task需等到map 阶段完成后才能启动;它不支持用户自定义InputFormat,如果用户想指定输入文件格式,可使用java语言编写或者在命令行中指定分隔符;它采用标准输入输出让C/C++与java通信,因而只支持text数据格式。
(3)Hadoop Pipes。 专门为C/C++语言设计,由于其采用了socket方式让C/C++与java通信,因而其效率较低(其优势在于,但作业需要大量,速度很快)。它支持用户(用C/C++)编写RecordReader。
(4)Pydoop。它是专门方便python程序员编写MapRece作业设计的,其底层使用了Hadoop Streaming接口和libhdfs库。
6. 总结
Hadoop使得分布式程序的编写变得异常简单,很多情况下,用户只需写map()和rece()两个函数即可(InputFormat,Outputformat可用系统缺省的)。正是由于Hadoop编程的简单性,越来越多的公司或者研究单位开始使用Hadoop。

‘陆’ 有没有关于maprece编程的书籍推荐

maprece编程书籍推荐一:《MapRece设计模式》


将各种有价值的MapRece设计模式汇集在一起,形成一本独特的合集,可以帮读者节省大量的时间和精力,无论读者身处哪个领域,使用哪种编程语言,使用什么开发框架。
书中对每一种模式都会详细解释其使用的上下文、可能存在的陷阱及使用的注意事项,以帮助读者在对大数据问题架构建模时避免常见的设计错误。本书还提供了MapRece的一个完整综述,解释其起源和实现,并说明设计模式如此重要的原因。书中的所有示例代码都是基于Hadoop平台编写的。
maprece编程书籍推荐二:《Hadoop MapRece实战手册》

阅读全文

与hadoopmapreduce编程相关的资料

热点内容
易语言编译改名 浏览:721
阿里服务器都提供什么 浏览:754
cf打开服务器接不上怎么办 浏览:901
linux下more命令 浏览:402
des算法运算位数 浏览:375
珠海建行贷款解压 浏览:635
布谷源码iOS 浏览:66
云存储节点服务器是啥 浏览:784
压缩文件可以用pad解压么 浏览:609
我的世界服务器如何换 浏览:64
程序员要拒绝吗 浏览:124
下期视频怎么解压 浏览:383
方法命令函数指令 浏览:130
视频已加密请输入密码确认 浏览:362
香港中产程序员 浏览:917
python适合什么编译器 浏览:844
双强力夹文件夹使用方法 浏览:330
程序员瑜伽教学 浏览:809
python网页分析工具 浏览:689
服务器如何手动关机 浏览:47