导航:首页 > 文件处理 > mapreduce压缩

mapreduce压缩

发布时间:2023-04-12 19:16:05

python的map和rece和Hadoop的MapRece有什么关系

关系就是都是基于Map-Rece的处理思想设计出来的。
从用户角度看功能其实差不多,
Python的Map函数和Hadoop的Map阶段对输入进行逐行处理;
Python的Rece函数和Hadoop的Rece阶段对输入进行累积处理。
但是其实完整的Hadoop MapRece是Map+Shuffle+Sort+Rece过程。
其中Shuffle过程是为了让分布式机群之间将同Key数据进行互相交换,Sort过程是根据Key对所有数据进行排序,从而才能完成类WordCount功能,而这两步在Python里面当然是需要用户自己去编写的。

② 在maprece程序中怎么读取压缩包来处理

maprece支持几种特定的压缩格式,会自行对这些格式的压缩包进行解压缩操作。具体实现在LineRecordReader类中,该类继承自芦亮橘RecordReader,是MapRece中用于陪团读取文件的类,该类在读键竖取文件内容前会根据文件后缀判断是否进行解压缩。

③ 如何分布式运行maprece程序

一、 首先要知道此前提 转载
若在windows的Eclipse工程中直接启动maprec程序,需要先把hadoop集群的配置目录下的xml都拷贝到src目录下,让程序自动读取集群的地址后去进行分布式运行(您也可以自己写java代码去设置job的configuration属性)。
若不拷贝,工程中bin目录没有完整的xml配悔凯置文件,则windows执行的maprece程序全部通过本机的jvm执行,作业名也是带有“local"字眼的作业,如 job_local2062122004_0001。 这不是真正的分布式运行maprece程序。
估计得研拍塌究org.apache.hadoop.conf.Configuration的源码,反正xml配置文件会影响执行maprece使用的文件系统是本机的windows文件系统还是远程的hdfs系统; 还有影响执行maprece的mapper和recer的是本机的jvm还是集群里面机器的jvm
二、 本文的结论

第一点就是: windows上执行maprece,必须打jar包到所有slave节点才能正确分布式运行maprece程序。(我有个需求碧贺唤是要windows上触发一个maprece分布式运行)
第二点就是: linux上,只需拷贝jar文件到集群master上,执行命令hadoop jarPackage.jar MainClassName即可分布式运行maprece程序。
第三点就是: 推荐使用附一,实现了自动打jar包并上传,分布式执行的maprece程序。
附一、 推荐使用此方法:实现了自动打jar包并上传,分布式执行的maprece程序:
请先参考博文五篇:
Hadoop作业提交分析(一)~~(五)
引用博文的附件中EJob.java到你的工程中,然后main中添加如下方法和代码。

public static File createPack() throws IOException {
File jarFile = EJob.createTempJar("bin");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
return jarFile;
}
在作业启动代码中使用打包:
Job job = Job.getInstance(conf, "testAnaAction");
添加:
String jarPath = createPack().getPath();
job.setJar(jarPath);
即可实现直接run as java application 在windows跑分布式的maprece程序,不用手工上传jar文件。
附二、得出结论的测试过程
(未有空看书,只能通过愚笨的测试方法得出结论了)
一. 直接通过windows上Eclipse右击main程序的java文件,然后"run as application"或选择hadoop插件"run on hadoop"来触发执行MapRece程序的测试。
1,如果不打jar包到进集群任意linux机器上,它报错如下:
[work] 2012-06-25 15:42:47,360 - org.apache.hadoop.maprece.Job -10244 [main] INFO org.apache.hadoop.maprece.Job - map 0% rece 0%
[work] 2012-06-25 15:42:52,223 - org.apache.hadoop.maprece.Job -15107 [main] INFO org.apache.hadoop.maprece.Job - Task Id : attempt_1403517983686_0056_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class bookCount.BookCount$BookCountMapper not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)
at org.apache.hadoop.maprece.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:721)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: java.lang.ClassNotFoundException: Class bookCount.BookCount$BookCountMapper not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1626)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1718)
... 8 more
# Error:后重复三次
2012-06-25 15:44:53,234 - org.apache.hadoop.maprece.Job -37813 [main] INFO org.apache.hadoop.maprece.Job - map 100% rece 100%
现象就是:报错,无进度,无运行结果。

2,拷贝jar包到“只是”集群master的$HADOOP_HOME/share/hadoop/maprece/目录上,直接通过windows的eclipse "run as application"和通过hadoop插件"run on hadoop"来触发执行,它报错同上。
现象就是:报错,无进度,无运行结果。
3,拷贝jar包到集群某些slave的$HADOOP_HOME/share/hadoop/maprece/目录上,直接通过windows的eclipse "run as application"和通过hadoop插件"run on hadoop"来触发执行
和报错:
Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class bookCount.BookCount$BookCountMapper not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)
at org.apache.hadoop.maprece.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)
和报错:
Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class bookCount.BookCount$BookCountRecer not found

现象就是:有报错,但仍然有进度,有运行结果。
4,拷贝jar包到集群所有slave的$HADOOP_HOME/share/hadoop/maprece/目录上,直接通过windows的eclipse "run as application"和通过hadoop插件"run on hadoop"来触发执行:
现象就是:无报错,有进度,有运行结果。
第一点结论就是: windows上执行maprece,必须打jar包到所有slave节点才能正确分布式运行maprece程序。
二 在Linux上的通过以下命令触发MapRece程序的测试。
hadoop jar $HADOOP_HOME/share/hadoop/maprece/bookCount.jar bookCount.BookCount

1,只拷贝到master,在master上执行。
现象就是:无报错,有进度,有运行结果。
2,拷贝随便一个slave节点,在slave上执行。
现象就是:无报错,有进度,有运行结果。
但某些节点上运行会报错如下,且运行结果。:
14/06/25 16:44:02 INFO maprece.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/hser/.staging/job_1403517983686_0071
Exception in thread "main" java.lang.NoSuchFieldError: DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH
at org.apache.hadoop.maprece.v2.util.MRApps.setMRFrameworkClasspath(MRApps.java:157)
at org.apache.hadoop.maprece.v2.util.MRApps.setClasspath(MRApps.java:198)
at org.apache.hadoop.mapred.YARNRunner.(YARNRunner.java:443)
at org.apache.hadoop.mapred.YARNRunner.submitJob(YARNRunner.java:283)
at org.apache.hadoop.maprece.JobSubmitter.submitJobInternal(JobSubmitter.java:415)
at org.apache.hadoop.maprece.Job$10.run(Job.java:1268)
at org.apache.hadoop.maprece.Job$10.run(Job.java:1265)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.maprece.Job.submit(Job.java:1265)
at org.apache.hadoop.maprece.Job.waitForCompletion(Job.java:1286)
at com.etrans.anaSpeed.AnaActionMr.run(AnaActionMr.java:207)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.etrans.anaSpeed.AnaActionMr.main(AnaActionMr.java:44)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
第二点结论就是: Linux上,只需拷贝jar文件到集群master上,执行命令hadoop jarPackage.jar MainClassName即可分布式运行maprece程序。

④ Hadoop 压缩从理论到实战

在大数据领域,无论上层计算引擎采用的是什么,在存储过程中,压缩都是一个避不开的问题。合适的压缩选择可以降低存储成本、减少网络传输I/O。而错误的压缩选择则可能让 cpu 负荷达到瓶颈、降低并发度等等,所以是否选择压缩、选择什么压缩格式在大数据存储环节中都是一个至关重要的问题。

点评:压缩时间和压缩率之间的取舍本质上是 cpu 资源和存储资源的取舍。是否需册猛要支持分片也不是绝对的,如果单个文件大小均小于 splitSize,则没必要支持分片。

点评:一阶段考虑尽可能支持分片(单个文件大于 splitSize 时)。二阶段考虑尽可能快的压悄姿基缩速度。三阶段根据是作为长期归档(几乎不用)or 作为下一作业输入,考虑尽可能高的压缩能力 or 支持分片。

点评:有两点需要注意,第一点:这里的速度和压缩率没有具体测试数据,而是给出了一个模糊的表达。因为即使具体测试了速度和压缩率,也会因数据不同而结果有很大的差异。后面会给出测试的脚本,大家可以结合自己的表数据自行测试。第二点:有些压缩格式性能参数很相似,为什么 Hadoop 中要搞这么多种?较为直观的一个原因是:不同存储格式支持的压缩是不一样的,比如 orc 存储格式只支持 zlib 和 snappy 两种压缩 [8] ,parquet 虽然支持很多压缩格式,但是不支持 bzip2 [7]

以下摘自《Hadoop The Definitive Guide》

重点阅读文中加粗片段。大致意思是:因为 gzip 压缩格式使用的 DEFLATE 压缩算法没办法做到随机任意读取,必须同步顺序读取。也就意味着没办法为每一个 block 创建一个分片(split),然后为该分片启一个 mapper 去读取数据。所以即使 gzip 文件有很多 block,MR 程序也只会启动一个 Mapper 去读取所有的 block。也即 gzip 这种压缩格式不支持分片。相反的,如果压缩格式使用的算法支持随机任意读取,那么就可以为每一个 block 创建一个分片,同时启动一个 mapper 去读取数据,这样有多少个 block 就有多少个分片,就有多少个 mapper ,这些 mapper 并行读取数据,效率大大提升。上述涉及到几个小概念,接下来分别进行详述。

一句话总结: zlib、gzip 在大数据语境中都是一种 压缩格式 ,他们使用相同的 压缩算法: DEFLATE,DefaultCodec 是 zlib 使用的 编解码器 ,Gzip 使用的编解码器是 GzipCodec

我们知道,Hadoop 在任务切分时,是按照文件的粒度进行的。即一个文件一个文件启谨进行切分。而每一个文件切分成几块,取决于 splitSize 的大小。比如两个文件,第一个文件 300M,第二个文件150M。分片大小是128M,那么对于第一个文件将会切分成3片(128M,128M,44M),第二个文件会切分成2片(128M,22M)。共计5片。所以分片数量除了由文件数决定,另一个决定因素就是 splitSize 即分片大小。

splitSize 如何计算?

几个前提:

影响参数:

接下来进行实际验证:

经过了 2.4.2 中的一系列实验,验证了一个结论:当一个输入格式支持分片时,mapper 数量是无限制的,反之 mapper 数量小于等于文件的数量。所以我们可以通过设置参数来试图调小分片大小来增加 mapper 数量看其上限是否等于文件数量即可。假如输入的文件个数只有一个,那么当 mapper 数量大于1的时候,说明该输入格式是支持分片的。

大家可以根据自己数据集和想测试的压缩和存储格式自行修改脚本。通过以上脚本跑出来的结果如下:

由 2.1 中评价压缩的三项指标可知,压缩率、压缩/解压速度、是否支持分片是衡量压缩最重要的三项指标。3.1.1小节中只对压缩率进行了测试。压缩/解压速度可以通过跑一些查询语句进一步测试。这里就不展开测试了。业界中常用的存储格式一般是 parquet, orc,所以上面测试除了纯文本只测试了这两种存储格式。

我们可以通过 hive> set io.compression.codecs; 来查看当前Hadoop集群支持的压缩,在公司的集群中查询得到的结果是:

可以看到 lzo 有两种编解码器: LzoCodec 和 LzopCodec。他们之间有什么区别呢?

如果你阅读过关于 Hadoop 压缩的文章,应该可以看到,绝大多数文章中对于 snappy 是否支持分片都是直接给出的否定的答案。 CDH 的文档中也指出来 snappy 是不支持分片的。

看文中加粗片段,虽然 snappy 本身是不支持分片的,但是如果 snappy 存储在一些特定的存储格式比如 SequenceFile 或者 Avro 中,那么是可以支持分片的。也就是说 snappy 是否支持分片是分情况讨论的。不能说使用了 snappy 压缩就一定不支持分片。前面提到了,业界中常用的存储格式一般是 parquet 或者 orc,而上面 CDH 的文章中恰恰没有提到 parquet 和 orc 是否支持,接下来以 parquet 为例,进行测试。测试内容即为 parquet + snappy 组合,是否支持分片。
首先准备数据,因为之前做压缩率测试,已经有了 parquet + snappy 文件了,这里直接拿来用。

一共3个输入文件,启了6个mapper,说明输入文件是可以分片的。即 parquet + snappy 的组合是支持分片的。在《Hadoop The Definitive Guide》中也对 parquet 是否支持分片有说明:

以 maprece.output.fileoutputformat.compress.codec 为例,这个参数可以在三个地方配置:

那么当三者都设置时,以哪个为准呢?按照经验来看,一定是粒度小的优先级大于粒度大的优先级。经过测试也验证了这种猜测。即:表级别 > hive > hadoop

初学者往往容易混淆存储格式和压缩格式之间的关系,其实二者是完全独立的。如果完整的阅读了该篇文章,应该已经消除了这一块理解对误区。这里总结一下:比如 parquet, orc,他们都是常见的 存储格式 。是否使用压缩,使用何种压缩都是可以设置的。而 zlib、gzip、lzo、lz4、snappy 等等这些都是常见的 压缩格式 ,他们既可以依附于某些 存储格式 ,比如之前提到的 parquet + snappy,orc + zlib 等等。也可以脱离特定的 存储格式 ,比如纯文本文件进行压缩,text + parquet, text + bzip2 等等。

⑤ 如何提升Hadoop MapRece性能

你这个问慎亏培题,问的太大了。目前可能有很多人都在熟悉使用hadoop,当然就会有很多人研究它了。默认的集群环境并不是最优的,所以为了提升集群的性能,人们就开始研究hadoop的优化了。现在,通常从以下几个方面对空数hadoop进行优化:
1、数据放置和数据副本数量的选择。集群默认情况一般有三个副本,并且集群默认每个节点的计算能力是一样的,在分配数据块的时候,均匀分布在每个节点上。实际环境中,更多的是每个节点得各方面能力是不同的,比如计算能力,那么原有的数据分配方式就那么合适了,需要根据节点的能力,合理的放置数据块,从而提升性能。数据副本的数量也会影响hadoop的性能,这里边涉及到数据迁移的问题。
2、参数配置方面。例如数据块的大小当前大多数是128或者64M,相对来说是比较合理的。那么这个值真的适合你的集群环境吗?是否有方法可以计算出集群的最优状态下的数据块大小,这个需要研究。当然还有很多参数,需要优化,比如容器大小,内存分配,map和rece数量等。
3、作业调度问题。给你一堆的job,如何合理的调度使得执行最快。这也是优化的方向,虽然集群有默认的三种调度策略,但并不一宽唯定是最好的。
当然还有很多优化,这里就不一一列举了,如果你感兴趣,可以去网上搜索更多的参考资料!

⑥ 简述Hadoop的MapRece与Googl的MapRecc 之间的关系

江湖传说永流传:谷歌技术有"三宝",GFS、MapRece和大表(BigTable)!

谷歌在03到06年间连续发表了三篇很有影响力的文章,分别是03年SOSP的GFS,04年OSDI的MapRece,和06年OSDI的BigTable。SOSP和OSDI都是操作系统领域的顶级会议,在计算机学会推荐会议里属于A类。SOSP在单数年举办,而OSDI在双数年举办。

那么这篇博客就来介绍一下MapRece。

1. MapRece是干啥的

因为没找到谷歌的示意图,所以我想借用一张Hadoop项目的结构图来说明下MapRece所处的位置,如下图。


这幅图描述了MapRece如何处理词频统计。由于map worker数量不够,首先处理了分片1、3、4,并产生中间键值对;当所有中间值都准备好了,Rece作业就开始读取对应分区,并输出统计结果。

6. 用户的权利

用户最主要的任务是实现map和rece接口,但还有一些有用的接口是向用户开放的。

  • an input reader。这个函数会将输入分为M个部分,并且定义了如何从数据中抽取最初的键值对,比如词频的例子中定义文件名和文件内容是键值对。

  • a partition function。这个函数用于将map函数产生的中间键值对映射到一个分区里去,最简单的实现就是将键求哈希再对R取模。

  • a compare function。这个函数用于Rece作业排序,这个函数定义了键的大小关系。

  • an output writer。负责将结果写入底层分布式文件系统。

  • a combiner function。实际就是rece函数,这是用于前面提到的优化的,比如统计词频时,如果每个<w, "1">要读一次,因为rece和map通常不在一台机器,非常浪费时间,所以可以在map执行的地方先运行一次combiner,这样rece只需要读一次<w, "n">了。

  • map和rece函数就不多说了。

  • 7. MapRece的实现

    目前MapRece已经有多种实现,除了谷歌自己的实现外,还有着名的hadoop,区别是谷歌是c++,而hadoop是用java。另外斯坦福大学实现了一个在多核/多处理器、共享内存环境内运行的MapRece,称为Phoenix(介绍),相关的论文发表在07年的HPCA,是当年的最佳论文哦!

    ⑦ 大数据中的压缩

    一、压缩分类
    1、Lossless conpression(无损压缩)
    压缩和解压缩过程中没有任何数据的丢失

    2、Lossy conpression(有损压缩)
    JPEG,MP3,MPEG 压缩和解压缩过程中有任何数据的丢失

    二、扮扰压缩场景
    输入

    中间

    输出

    三、压缩注意事项
    CPU是否够用,压缩加压缩都耗费资源

    四、压缩格式

    五、压缩比

    原始文件大小1.4G
    Snappy的压缩比:50%
    LZ4的压缩比:49%
    LZO的压缩比:48%
    GZIP的压缩比:32%
    BZIP2的压缩比:28%

    六、总结
    1、压缩陵陵比越高压缩时间就越长,不同压缩场景需要选用不同的压缩。
    2、选用压缩就是空间与时间的选择
    3、如果是老数据/冷数厅汪旦据就采用BZIP2压缩,如果老数据偶尔还会用的到还需考虑是否分片
    4、

    七、Hadoop配置Maprece的压缩
    core-site.xml

    mapred-site.xml

    上面配置的是Bzip2的压缩,进入

    j进行wordcount的测试

    查看结果文件是带有bz2后缀的

    八、Hive的压缩测试
    1、创建表

    2、加载数据

    查看原表的大小

    3、

    新建表

    查看新表的大小

    4、

    新建表

    查看大小

    5、

    新建表

    查看大小

    以上为临时生效压缩,真正使用需要将具体使用哪种压缩配置到Hive配置文件

    ⑧ Hadoop集群执行MapRece时报错:

    原来maprece.jobhistory.address 和maprece.jobhistory.webapp.addres 这两个address的地址使用的是CDH默认的配置值,这里需要改成hostname,这样可能就是原来的位置不对造成的

    [html]view plain

    ⑨ hadoop的maprece常见算法案例有几种

    基本MapRece模式

    计数与求和
    问题陈述:
    有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。
    解决方案:
    让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Recer一个个遍历这些词的集合然后把他们的频次加和。

    1 class Mapper
    2 method Map(docid id, doc d)
    3 for all term t in doc d do
    4 Emit(term t, count 1)
    5
    6 class Recer
    7 method Rece(term t, counts [c1, c2,...])
    8 sum = 0
    9 for all count c in [c1, c2,...] do
    10 sum = sum + c
    11 Emit(term t, count sum)

    这种方法的缺点显而易见,Mapper提交了太多无意义的计数。它完全可以通过先对每个文档中的词进行计数从而减少传递给Recer的数据量:

    1 class Mapper
    2 method Map(docid id, doc d)
    3 H = new AssociativeArray
    4 for all term t in doc d do
    5 H{t} = H{t} + 1
    6 for all term t in H do
    7 Emit(term t, count H{t})

    如果要累计计数的的不只是单个文档中的内容,还包括了一个Mapper节点处理的所有文档,那就要用到Combiner了:

    1 class Mapper
    2 method Map(docid id, doc d)
    3 for all term t in doc d do
    4 Emit(term t, count 1)
    5
    6 class Combiner
    7 method Combine(term t, [c1, c2,...])
    8 sum = 0
    9 for all count c in [c1, c2,...] do
    10 sum = sum + c
    11 Emit(term t, count sum)
    12
    13 class Recer
    14 method Rece(term t, counts [c1, c2,...])
    15 sum = 0
    16 for all count c in [c1, c2,...] do
    17 sum = sum + c
    18 Emit(term t, count sum)

    应用:Log 分析, 数据查询

    整理归类

    问题陈述:
    有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。
    解决方案:
    解决方案很简单。 在 Mapper 中以每个条目的所需属性值作为 key,其本身作为值传递给 Recer。 Recer 取得按照属性值分组的条目,然后可以处理或者保存。如果是在构建倒排索引,那么 每个条目相当于一个词而属性值就是词所在的文档ID。
    应用:倒排索引, ETL
    过滤 (文本查找),解析和校验
    问题陈述:
    假设有很多条记录,需要从其中找出满足某个条件的所有记录,或者将每条记录传换成另外一种形式(转换操作相对于各条记录独立,即对一条记录的操作与其他记录无关)。像文本解析、特定值抽取、格式转换等都属于后一种用例。
    解决方案:
    非常简单,在Mapper 里逐条进行操作,输出需要的值或转换后的形式。
    应用:日志分析,数据查询,ETL,数据校验

    分布式任务执行

    问题陈述:
    大型计算可以分解为多个部分分别进行然后合并各个计算的结果以获得最终结果。
    解决方案: 将数据切分成多份作为每个 Mapper 的输入,每个Mapper处理一份数据,执行同样的运算,产生结果,Recer把多个Mapper的结果组合成一个。
    案例研究: 数字通信系统模拟
    像 WiMAX 这样的数字通信模拟软件通过系统模型来传输大量的随机数据,然后计算传输中的错误几率。 每个 Mapper 处理样本 1/N 的数据,计算出这部分数据的错误率,然后在 Recer 里计算平均错误率。
    应用:工程模拟,数字分析,性能测试
    排序
    问题陈述:
    有许多条记录,需要按照某种规则将所有记录排序或是按照顺序来处理记录。
    解决方案: 简单排序很好办 – Mappers 将待排序的属性值为键,整条记录为值输出。 不过实际应用中的排序要更加巧妙一点, 这就是它之所以被称为MapRece 核心的原因(“核心”是说排序?因为证明Hadoop计算能力的实验是大数据排序?还是说Hadoop的处理过程中对key排序的环节?)。在实践中,常用组合键来实现二次排序和分组。
    MapRece 最初只能够对键排序, 但是也有技术利用可以利用Hadoop 的特性来实现按值排序。想了解的话可以看这篇博客。
    按照BigTable的概念,使用 MapRece来对最初数据而非中间数据排序,也即保持数据的有序状态更有好处,必须注意这一点。换句话说,在数据插入时排序一次要比在每次查询数据的时候排序更高效。
    应用:ETL,数据分析

    非基本 MapRece 模式

    迭代消息传递 (图处理)

    问题陈述:
    假设一个实体网络,实体之间存在着关系。 需要按照与它比邻的其他实体的属性计算出一个状态。这个状态可以表现为它和其它节点之间的距离, 存在特定属性的邻接点的迹象, 邻域密度特征等等。
    解决方案:
    网络存储为系列节点的结合,每个节点包含有其所有邻接点ID的列表。按照这个概念,MapRece 迭代进行,每次迭代中每个节点都发消息给它的邻接点。邻接点根据接收到的信息更新自己的状态。当满足了某些条件的时候迭代停止,如达到了最大迭代次数(网络半径)或两次连续的迭代几乎没有状态改变。从技术上来看,Mapper 以每个邻接点的ID为键发出信息,所有的信息都会按照接受节点分组,recer 就能够重算各节点的状态然后更新那些状态改变了的节点。下面展示了这个算法:

    1 class Mapper
    2 method Map(id n, object N)
    3 Emit(id n, object N)
    4 for all id m in N.OutgoingRelations do
    5 Emit(id m, message getMessage(N))
    6
    7 class Recer
    8 method Rece(id m, [s1, s2,...])
    9 M = null
    10 messages = []
    11 for all s in [s1, s2,...] do
    12 if IsObject(s) then
    13 M = s
    14 else // s is a message
    15 messages.add(s)
    16 M.State = calculateState(messages)
    17 Emit(id m, item M)

    一个节点的状态可以迅速的沿着网络传全网,那些被感染了的节点又去感染它们的邻居,整个过程就像下面的图示一样:

    案例研究: 沿分类树的有效性传递
    问题陈述:
    这个问题来自于真实的电子商务应用。将各种货物分类,这些类别可以组成一个树形结构,比较大的分类(像男人、女人、儿童)可以再分出小分类(像男裤或女装),直到不能再分为止(像男式蓝色牛仔裤)。这些不能再分的基层类别可以是有效(这个类别包含有货品)或者已无效的(没有属于这个分类的货品)。如果一个分类至少含有一个有效的子分类那么认为这个分类也是有效的。我们需要在已知一些基层分类有效的情况下找出分类树上所有有效的分类。
    解决方案:
    这个问题可以用上一节提到的框架来解决。我们咋下面定义了名为 getMessage和 calculateState 的方法:

    1 class N
    2 State in {True = 2, False = 1, null = 0},
    3 initialized 1 or 2 for end-of-line categories, 0 otherwise
    4 method getMessage(object N)
    5 return N.State
    6 method calculateState(state s, data [d1, d2,...])
    7 return max( [d1, d2,...] )

    案例研究:广度优先搜索
    问题陈述:需要计算出一个图结构中某一个节点到其它所有节点的距离。
    解决方案: Source源节点给所有邻接点发出值为0的信号,邻接点把收到的信号再转发给自己的邻接点,每转发一次就对信号值加1:

    1 class N
    2 State is distance,
    3 initialized 0 for source node, INFINITY for all other nodes
    4 method getMessage(N)
    5 return N.State + 1
    6 method calculateState(state s, data [d1, d2,...])
    7 min( [d1, d2,...] )

    案例研究:网页排名和 Mapper 端数据聚合
    这个算法由Google提出,使用权威的PageRank算法,通过连接到一个网页的其他网页来计算网页的相关性。真实算法是相当复杂的,但是核心思想是权重可以传播,也即通过一个节点的各联接节点的权重的均值来计算节点自身的权重。

    1 class N
    2 State is PageRank
    3 method getMessage(object N)
    4 return N.State / N.OutgoingRelations.size()
    5 method calculateState(state s, data [d1, d2,...])
    6 return ( sum([d1, d2,...]) )

    要指出的是上面用一个数值来作为评分实际上是一种简化,在实际情况下,我们需要在Mapper端来进行聚合计算得出这个值。下面的代码片段展示了这个改变后的逻辑 (针对于 PageRank 算法):

    1 class Mapper
    2 method Initialize
    3 H = new AssociativeArray
    4 method Map(id n, object N)
    5 p = N.PageRank / N.OutgoingRelations.size()
    6 Emit(id n, object N)
    7 for all id m in N.OutgoingRelations do
    8 H{m} = H{m} + p
    9 method Close
    10 for all id n in H do
    11 Emit(id n, value H{n})
    12
    13 class Recer
    14 method Rece(id m, [s1, s2,...])
    15 M = null
    16 p = 0
    17 for all s in [s1, s2,...] do
    18 if IsObject(s) then
    19 M = s
    20 else
    21 p = p + s
    22 M.PageRank = p
    23 Emit(id m, item M)

    应用:图分析,网页索引

    值去重 (对唯一项计数)
    问题陈述: 记录包含值域F和值域 G,要分别统计相同G值的记录中不同的F值的数目 (相当于按照 G分组).
    这个问题可以推而广之应用于分面搜索(某些电子商务网站称之为Narrow Search)
    Record 1: F=1, G={a, b}
    Record 2: F=2, G={a, d, e}
    Record 3: F=1, G={b}
    Record 4: F=3, G={a, b}

    Result:
    a -> 3 // F=1, F=2, F=3
    b -> 2 // F=1, F=3
    d -> 1 // F=2
    e -> 1 // F=2

    解决方案 I:
    第一种方法是分两个阶段来解决这个问题。第一阶段在Mapper中使用F和G组成一个复合值对,然后在Recer中输出每个值对,目的是为了保证F值的唯一性。在第二阶段,再将值对按照G值来分组计算每组中的条目数。
    第一阶段:

    1 class Mapper
    2 method Map(null, record [value f, categories [g1, g2,...]])
    3 for all category g in [g1, g2,...]
    4 Emit(record [g, f], count 1)
    5
    6 class Recer
    7 method Rece(record [g, f], counts [n1, n2, ...])
    8 Emit(record [g, f], null )

    第二阶段:

    1 class Mapper
    2 method Map(record [f, g], null)
    3 Emit(value g, count 1)
    4
    5 class Recer
    6 method Rece(value g, counts [n1, n2,...])
    7 Emit(value g, sum( [n1, n2,...] ) )

    解决方案 II:
    第二种方法只需要一次MapRece 即可实现,但扩展性不强。算法很简单-Mapper 输出值和分类,在Recer里为每个值对应的分类去重然后给每个所属的分类计数加1,最后再在Recer结束后将所有计数加和。这种方法适用于只有有限个分类,而且拥有相同F值的记录不是很多的情况。例如网络日志处理和用户分类,用户的总数很多,但是每个用户的事件是有限的,以此分类得到的类别也是有限的。值得一提的是在这种模式下可以在数据传输到Recer之前使用Combiner来去除分类的重复值。

    1 class Mapper
    2 method Map(null, record [value f, categories [g1, g2,...] )
    3 for all category g in [g1, g2,...]
    4 Emit(value f, category g)
    5
    6 class Recer
    7 method Initialize
    8 H = new AssociativeArray : category -> count
    9 method Rece(value f, categories [g1, g2,...])
    10 [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
    11 for all category g in [g1', g2',...]
    12 H{g} = H{g} + 1
    13 method Close
    14 for all category g in H do
    15 Emit(category g, count H{g})

    应用:日志分析,用户计数
    互相关
    问题陈述:有多个各由若干项构成的组,计算项两两共同出现于一个组中的次数。假如项数是N,那么应该计算N*N。
    这种情况常见于文本分析(条目是单词而元组是句子),市场分析(购买了此物的客户还可能购买什么)。如果N*N小到可以容纳于一台机器的内存,实现起来就比较简单了。
    配对法
    第一种方法是在Mapper中给所有条目配对,然后在Recer中将同一条目对的计数加和。但这种做法也有缺点:
    使用 combiners 带来的的好处有限,因为很可能所有项对都是唯一的
    不能有效利用内存

    1 class Mapper
    2 method Map(null, items [i1, i2,...] )
    3 for all item i in [i1, i2,...]
    4 for all item j in [i1, i2,...]
    5 Emit(pair [i j], count 1)
    6
    7 class Recer
    8 method Rece(pair [i j], counts [c1, c2,...])
    9 s = sum([c1, c2,...])
    10 Emit(pair[i j], count s)

    Stripes Approach(条方法?不知道这个名字怎么理解)
    第二种方法是将数据按照pair中的第一项来分组,并维护一个关联数组,数组中存储的是所有关联项的计数。The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Recer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
    中间结果的键数量相对较少,因此减少了排序消耗。
    可以有效利用 combiners。
    可在内存中执行,不过如果没有正确执行的话也会带来问题。
    实现起来比较复杂。
    一般来说, “stripes” 比 “pairs” 更快

    1 class Mapper
    2 method Map(null, items [i1, i2,...] )
    3 for all item i in [i1, i2,...]
    4 H = new AssociativeArray : item -> counter
    5 for all item j in [i1, i2,...]
    6 H{j} = H{j} + 1
    7 Emit(item i, stripe H)
    8
    9 class Recer
    10 method Rece(item i, stripes [H1, H2,...])
    11 H = new AssociativeArray : item -> counter
    12 H = merge-sum( [H1, H2,...] )
    13 for all item j in H.keys()
    14 Emit(pair [i j], H{j})

    应用:文本分析,市场分析
    参考资料:Lin J. Dyer C. Hirst G. Data Intensive Processing MapRece
    用MapRece 表达关系模式
    在这部分我们会讨论一下怎么使用MapRece来进行主要的关系操作。
    筛选(Selection)

    1 class Mapper
    2 method Map(rowkey key, tuple t)
    3 if t satisfies the predicate
    4 Emit(tuple t, null)

    投影(Projection)
    投影只比筛选稍微复杂一点,在这种情况下我们可以用Recer来消除可能的重复值。

    1 class Mapper
    2 method Map(rowkey key, tuple t)
    3 tuple g = project(t) // extract required fields to tuple g
    4 Emit(tuple g, null)
    5
    6 class Recer

    ⑩ 请简要描述Hadoop计算框架MapRece的工作原理

    分为2个步骤,map和rece,map专门负责对每个数据独立地同时地打标签,框架会对相同标签的数据分成一组,rece对分好的那些组数据做累计计算。我们只要分别实现map和rece就可以了

    阅读全文

    与mapreduce压缩相关的资料

    热点内容
    pr怎么压缩文件大小 浏览:859
    查看oracle字符集命令 浏览:175
    锂电池增加密度 浏览:657
    linux用户密码忘记 浏览:240
    gb压缩天然气 浏览:633
    图片拼接不压缩app 浏览:668
    我的世界如何编程 浏览:84
    vue反编译代码有问题 浏览:948
    linuxshell字符串连接字符串 浏览:51
    androidviewpager刷新 浏览:438
    python编程计算平均分 浏览:678
    加密数字货币市值查询 浏览:692
    时尚商圈app怎么样 浏览:584
    stacklesspython教程 浏览:138
    用命令行禁用135端口 浏览:212
    linux防火墙编程 浏览:627
    pdf阅读器删除 浏览:979
    考研人如何缓解压力 浏览:822
    买电暖壶哪个app便宜 浏览:505
    洛克王国忘记服务器了怎么办 浏览:782