① python的map和rece和Hadoop的MapRece有什麼關系
關系就是都是基於Map-Rece的處理思想設計出來的。
從用戶角度看功能其實差不多,
Python的Map函數和Hadoop的Map階段對輸入進行逐行處理;
Python的Rece函數和Hadoop的Rece階段對輸入進行累積處理。
但是其實完整的Hadoop MapRece是Map+Shuffle+Sort+Rece過程。
其中Shuffle過程是為了讓分布式機群之間將同Key數據進行互相交換,Sort過程是根據Key對所有數據進行排序,從而才能完成類WordCount功能,而這兩步在Python裡面當然是需要用戶自己去編寫的。
② 在maprece程序中怎麼讀取壓縮包來處理
maprece支持幾種特定的壓縮格式,會自行對這些格式的壓縮包進行解壓縮操作。具體實現在LineRecordReader類中,該類繼承自蘆亮橘RecordReader,是MapRece中用於陪團讀取文件的類,該類在讀鍵豎取文件內容前會根據文件後綴判斷是否進行解壓縮。
③ 如何分布式運行maprece程序
一、 首先要知道此前提 轉載
若在windows的Eclipse工程中直接啟動maprec程序,需要先把hadoop集群的配置目錄下的xml都拷貝到src目錄下,讓程序自動讀取集群的地址後去進行分布式運行(您也可以自己寫java代碼去設置job的configuration屬性)。
若不拷貝,工程中bin目錄沒有完整的xml配悔凱置文件,則windows執行的maprece程序全部通過本機的jvm執行,作業名也是帶有「local"字眼的作業,如 job_local2062122004_0001。 這不是真正的分布式運行maprece程序。
估計得研拍塌究org.apache.hadoop.conf.Configuration的源碼,反正xml配置文件會影響執行maprece使用的文件系統是本機的windows文件系統還是遠程的hdfs系統; 還有影響執行maprece的mapper和recer的是本機的jvm還是集群裡面機器的jvm
二、 本文的結論
第一點就是: windows上執行maprece,必須打jar包到所有slave節點才能正確分布式運行maprece程序。(我有個需求碧賀喚是要windows上觸發一個maprece分布式運行)
第二點就是: linux上,只需拷貝jar文件到集群master上,執行命令hadoop jarPackage.jar MainClassName即可分布式運行maprece程序。
第三點就是: 推薦使用附一,實現了自動打jar包並上傳,分布式執行的maprece程序。
附一、 推薦使用此方法:實現了自動打jar包並上傳,分布式執行的maprece程序:
請先參考博文五篇:
Hadoop作業提交分析(一)~~(五)
引用博文的附件中EJob.java到你的工程中,然後main中添加如下方法和代碼。
public static File createPack() throws IOException {
File jarFile = EJob.createTempJar("bin");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
return jarFile;
}
在作業啟動代碼中使用打包:
Job job = Job.getInstance(conf, "testAnaAction");
添加:
String jarPath = createPack().getPath();
job.setJar(jarPath);
即可實現直接run as java application 在windows跑分布式的maprece程序,不用手工上傳jar文件。
附二、得出結論的測試過程
(未有空看書,只能通過愚笨的測試方法得出結論了)
一. 直接通過windows上Eclipse右擊main程序的java文件,然後"run as application"或選擇hadoop插件"run on hadoop"來觸發執行MapRece程序的測試。
1,如果不打jar包到進集群任意linux機器上,它報錯如下:
[work] 2012-06-25 15:42:47,360 - org.apache.hadoop.maprece.Job -10244 [main] INFO org.apache.hadoop.maprece.Job - map 0% rece 0%
[work] 2012-06-25 15:42:52,223 - org.apache.hadoop.maprece.Job -15107 [main] INFO org.apache.hadoop.maprece.Job - Task Id : attempt_1403517983686_0056_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class bookCount.BookCount$BookCountMapper not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)
at org.apache.hadoop.maprece.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:721)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: java.lang.ClassNotFoundException: Class bookCount.BookCount$BookCountMapper not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1626)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1718)
... 8 more
# Error:後重復三次
2012-06-25 15:44:53,234 - org.apache.hadoop.maprece.Job -37813 [main] INFO org.apache.hadoop.maprece.Job - map 100% rece 100%
現象就是:報錯,無進度,無運行結果。
2,拷貝jar包到「只是」集群master的$HADOOP_HOME/share/hadoop/maprece/目錄上,直接通過windows的eclipse "run as application"和通過hadoop插件"run on hadoop"來觸發執行,它報錯同上。
現象就是:報錯,無進度,無運行結果。
3,拷貝jar包到集群某些slave的$HADOOP_HOME/share/hadoop/maprece/目錄上,直接通過windows的eclipse "run as application"和通過hadoop插件"run on hadoop"來觸發執行
和報錯:
Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class bookCount.BookCount$BookCountMapper not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)
at org.apache.hadoop.maprece.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)
和報錯:
Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class bookCount.BookCount$BookCountRecer not found
現象就是:有報錯,但仍然有進度,有運行結果。
4,拷貝jar包到集群所有slave的$HADOOP_HOME/share/hadoop/maprece/目錄上,直接通過windows的eclipse "run as application"和通過hadoop插件"run on hadoop"來觸發執行:
現象就是:無報錯,有進度,有運行結果。
第一點結論就是: windows上執行maprece,必須打jar包到所有slave節點才能正確分布式運行maprece程序。
二 在Linux上的通過以下命令觸發MapRece程序的測試。
hadoop jar $HADOOP_HOME/share/hadoop/maprece/bookCount.jar bookCount.BookCount
1,只拷貝到master,在master上執行。
現象就是:無報錯,有進度,有運行結果。
2,拷貝隨便一個slave節點,在slave上執行。
現象就是:無報錯,有進度,有運行結果。
但某些節點上運行會報錯如下,且運行結果。:
14/06/25 16:44:02 INFO maprece.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/hser/.staging/job_1403517983686_0071
Exception in thread "main" java.lang.NoSuchFieldError: DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH
at org.apache.hadoop.maprece.v2.util.MRApps.setMRFrameworkClasspath(MRApps.java:157)
at org.apache.hadoop.maprece.v2.util.MRApps.setClasspath(MRApps.java:198)
at org.apache.hadoop.mapred.YARNRunner.(YARNRunner.java:443)
at org.apache.hadoop.mapred.YARNRunner.submitJob(YARNRunner.java:283)
at org.apache.hadoop.maprece.JobSubmitter.submitJobInternal(JobSubmitter.java:415)
at org.apache.hadoop.maprece.Job$10.run(Job.java:1268)
at org.apache.hadoop.maprece.Job$10.run(Job.java:1265)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.maprece.Job.submit(Job.java:1265)
at org.apache.hadoop.maprece.Job.waitForCompletion(Job.java:1286)
at com.etrans.anaSpeed.AnaActionMr.run(AnaActionMr.java:207)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.etrans.anaSpeed.AnaActionMr.main(AnaActionMr.java:44)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
第二點結論就是: Linux上,只需拷貝jar文件到集群master上,執行命令hadoop jarPackage.jar MainClassName即可分布式運行maprece程序。
④ Hadoop 壓縮從理論到實戰
在大數據領域,無論上層計算引擎採用的是什麼,在存儲過程中,壓縮都是一個避不開的問題。合適的壓縮選擇可以降低存儲成本、減少網路傳輸I/O。而錯誤的壓縮選擇則可能讓 cpu 負荷達到瓶頸、降低並發度等等,所以是否選擇壓縮、選擇什麼壓縮格式在大數據存儲環節中都是一個至關重要的問題。
點評:壓縮時間和壓縮率之間的取捨本質上是 cpu 資源和存儲資源的取捨。是否需冊猛要支持分片也不是絕對的,如果單個文件大小均小於 splitSize,則沒必要支持分片。
點評:一階段考慮盡可能支持分片(單個文件大於 splitSize 時)。二階段考慮盡可能快的壓悄姿基縮速度。三階段根據是作為長期歸檔(幾乎不用)or 作為下一作業輸入,考慮盡可能高的壓縮能力 or 支持分片。
點評:有兩點需要注意,第一點:這里的速度和壓縮率沒有具體測試數據,而是給出了一個模糊的表達。因為即使具體測試了速度和壓縮率,也會因數據不同而結果有很大的差異。後面會給出測試的腳本,大家可以結合自己的表數據自行測試。第二點:有些壓縮格式性能參數很相似,為什麼 Hadoop 中要搞這么多種?較為直觀的一個原因是:不同存儲格式支持的壓縮是不一樣的,比如 orc 存儲格式只支持 zlib 和 snappy 兩種壓縮 [8] ,parquet 雖然支持很多壓縮格式,但是不支持 bzip2 [7]
以下摘自《Hadoop The Definitive Guide》
重點閱讀文中加粗片段。大致意思是:因為 gzip 壓縮格式使用的 DEFLATE 壓縮演算法沒辦法做到隨機任意讀取,必須同步順序讀取。也就意味著沒辦法為每一個 block 創建一個分片(split),然後為該分片啟一個 mapper 去讀取數據。所以即使 gzip 文件有很多 block,MR 程序也只會啟動一個 Mapper 去讀取所有的 block。也即 gzip 這種壓縮格式不支持分片。相反的,如果壓縮格式使用的演算法支持隨機任意讀取,那麼就可以為每一個 block 創建一個分片,同時啟動一個 mapper 去讀取數據,這樣有多少個 block 就有多少個分片,就有多少個 mapper ,這些 mapper 並行讀取數據,效率大大提升。上述涉及到幾個小概念,接下來分別進行詳述。
一句話總結: zlib、gzip 在大數據語境中都是一種 壓縮格式 ,他們使用相同的 壓縮演算法: DEFLATE,DefaultCodec 是 zlib 使用的 編解碼器 ,Gzip 使用的編解碼器是 GzipCodec
我們知道,Hadoop 在任務切分時,是按照文件的粒度進行的。即一個文件一個文件啟謹進行切分。而每一個文件切分成幾塊,取決於 splitSize 的大小。比如兩個文件,第一個文件 300M,第二個文件150M。分片大小是128M,那麼對於第一個文件將會切分成3片(128M,128M,44M),第二個文件會切分成2片(128M,22M)。共計5片。所以分片數量除了由文件數決定,另一個決定因素就是 splitSize 即分片大小。
splitSize 如何計算?
幾個前提:
影響參數:
接下來進行實際驗證:
經過了 2.4.2 中的一系列實驗,驗證了一個結論:當一個輸入格式支持分片時,mapper 數量是無限制的,反之 mapper 數量小於等於文件的數量。所以我們可以通過設置參數來試圖調小分片大小來增加 mapper 數量看其上限是否等於文件數量即可。假如輸入的文件個數只有一個,那麼當 mapper 數量大於1的時候,說明該輸入格式是支持分片的。
大家可以根據自己數據集和想測試的壓縮和存儲格式自行修改腳本。通過以上腳本跑出來的結果如下:
由 2.1 中評價壓縮的三項指標可知,壓縮率、壓縮/解壓速度、是否支持分片是衡量壓縮最重要的三項指標。3.1.1小節中只對壓縮率進行了測試。壓縮/解壓速度可以通過跑一些查詢語句進一步測試。這里就不展開測試了。業界中常用的存儲格式一般是 parquet, orc,所以上面測試除了純文本只測試了這兩種存儲格式。
我們可以通過 hive> set io.compression.codecs; 來查看當前Hadoop集群支持的壓縮,在公司的集群中查詢得到的結果是:
可以看到 lzo 有兩種編解碼器: LzoCodec 和 LzopCodec。他們之間有什麼區別呢?
如果你閱讀過關於 Hadoop 壓縮的文章,應該可以看到,絕大多數文章中對於 snappy 是否支持分片都是直接給出的否定的答案。 CDH 的文檔中也指出來 snappy 是不支持分片的。
看文中加粗片段,雖然 snappy 本身是不支持分片的,但是如果 snappy 存儲在一些特定的存儲格式比如 SequenceFile 或者 Avro 中,那麼是可以支持分片的。也就是說 snappy 是否支持分片是分情況討論的。不能說使用了 snappy 壓縮就一定不支持分片。前面提到了,業界中常用的存儲格式一般是 parquet 或者 orc,而上面 CDH 的文章中恰恰沒有提到 parquet 和 orc 是否支持,接下來以 parquet 為例,進行測試。測試內容即為 parquet + snappy 組合,是否支持分片。
首先准備數據,因為之前做壓縮率測試,已經有了 parquet + snappy 文件了,這里直接拿來用。
一共3個輸入文件,啟了6個mapper,說明輸入文件是可以分片的。即 parquet + snappy 的組合是支持分片的。在《Hadoop The Definitive Guide》中也對 parquet 是否支持分片有說明:
以 maprece.output.fileoutputformat.compress.codec 為例,這個參數可以在三個地方配置:
那麼當三者都設置時,以哪個為准呢?按照經驗來看,一定是粒度小的優先順序大於粒度大的優先順序。經過測試也驗證了這種猜測。即:表級別 > hive > hadoop
初學者往往容易混淆存儲格式和壓縮格式之間的關系,其實二者是完全獨立的。如果完整的閱讀了該篇文章,應該已經消除了這一塊理解對誤區。這里總結一下:比如 parquet, orc,他們都是常見的 存儲格式 。是否使用壓縮,使用何種壓縮都是可以設置的。而 zlib、gzip、lzo、lz4、snappy 等等這些都是常見的 壓縮格式 ,他們既可以依附於某些 存儲格式 ,比如之前提到的 parquet + snappy,orc + zlib 等等。也可以脫離特定的 存儲格式 ,比如純文本文件進行壓縮,text + parquet, text + bzip2 等等。
⑤ 如何提升Hadoop MapRece性能
你這個問慎虧培題,問的太大了。目前可能有很多人都在熟悉使用hadoop,當然就會有很多人研究它了。默認的集群環境並不是最優的,所以為了提升集群的性能,人們就開始研究hadoop的優化了。現在,通常從以下幾個方面對空數hadoop進行優化:
1、數據放置和數據副本數量的選擇。集群默認情況一般有三個副本,並且集群默認每個節點的計算能力是一樣的,在分配數據塊的時候,均勻分布在每個節點上。實際環境中,更多的是每個節點得各方面能力是不同的,比如計算能力,那麼原有的數據分配方式就那麼合適了,需要根據節點的能力,合理的放置數據塊,從而提升性能。數據副本的數量也會影響hadoop的性能,這里邊涉及到數據遷移的問題。
2、參數配置方面。例如數據塊的大小當前大多數是128或者64M,相對來說是比較合理的。那麼這個值真的適合你的集群環境嗎?是否有方法可以計算出集群的最優狀態下的數據塊大小,這個需要研究。當然還有很多參數,需要優化,比如容器大小,內存分配,map和rece數量等。
3、作業調度問題。給你一堆的job,如何合理的調度使得執行最快。這也是優化的方向,雖然集群有默認的三種調度策略,但並不一寬唯定是最好的。
當然還有很多優化,這里就不一一列舉了,如果你感興趣,可以去網上搜索更多的參考資料!
⑥ 簡述Hadoop的MapRece與Googl的MapRecc 之間的關系
江湖傳說永流傳:谷歌技術有"三寶",GFS、MapRece和大表(BigTable)!
谷歌在03到06年間連續發表了三篇很有影響力的文章,分別是03年SOSP的GFS,04年OSDI的MapRece,和06年OSDI的BigTable。SOSP和OSDI都是操作系統領域的頂級會議,在計算機學會推薦會議里屬於A類。SOSP在單數年舉辦,而OSDI在雙數年舉辦。
那麼這篇博客就來介紹一下MapRece。
1. MapRece是幹啥的
因為沒找到谷歌的示意圖,所以我想借用一張Hadoop項目的結構圖來說明下MapRece所處的位置,如下圖。
這幅圖描述了MapRece如何處理詞頻統計。由於map worker數量不夠,首先處理了分片1、3、4,並產生中間鍵值對;當所有中間值都准備好了,Rece作業就開始讀取對應分區,並輸出統計結果。
6. 用戶的權利
用戶最主要的任務是實現map和rece介面,但還有一些有用的介面是向用戶開放的。
an input reader。這個函數會將輸入分為M個部分,並且定義了如何從數據中抽取最初的鍵值對,比如詞頻的例子中定義文件名和文件內容是鍵值對。
a partition function。這個函數用於將map函數產生的中間鍵值對映射到一個分區里去,最簡單的實現就是將鍵求哈希再對R取模。
a compare function。這個函數用於Rece作業排序,這個函數定義了鍵的大小關系。
an output writer。負責將結果寫入底層分布式文件系統。
a combiner function。實際就是rece函數,這是用於前面提到的優化的,比如統計詞頻時,如果每個<w, "1">要讀一次,因為rece和map通常不在一台機器,非常浪費時間,所以可以在map執行的地方先運行一次combiner,這樣rece只需要讀一次<w, "n">了。
map和rece函數就不多說了。
7. MapRece的實現
目前MapRece已經有多種實現,除了谷歌自己的實現外,還有著名的hadoop,區別是谷歌是c++,而hadoop是用java。另外斯坦福大學實現了一個在多核/多處理器、共享內存環境內運行的MapRece,稱為Phoenix(介紹),相關的論文發表在07年的HPCA,是當年的最佳論文哦!
⑦ 大數據中的壓縮
一、壓縮分類
1、Lossless conpression(無損壓縮)
壓縮和解壓縮過程中沒有任何數據的丟失
2、Lossy conpression(有損壓縮)
JPEG,MP3,MPEG 壓縮和解壓縮過程中有任何數據的丟失
二、扮擾壓縮場景
輸入
中間
輸出
三、壓縮注意事項
CPU是否夠用,壓縮加壓縮都耗費資源
四、壓縮格式
五、壓縮比
原始文件大小1.4G
Snappy的壓縮比:50%
LZ4的壓縮比:49%
LZO的壓縮比:48%
GZIP的壓縮比:32%
BZIP2的壓縮比:28%
六、總結
1、壓縮陵陵比越高壓縮時間就越長,不同壓縮場景需要選用不同的壓縮。
2、選用壓縮就是空間與時間的選擇
3、如果是老數據/冷數廳汪旦據就採用BZIP2壓縮,如果老數據偶爾還會用的到還需考慮是否分片
4、
七、Hadoop配置Maprece的壓縮
core-site.xml
mapred-site.xml
上面配置的是Bzip2的壓縮,進入
j進行wordcount的測試
查看結果文件是帶有bz2後綴的
八、Hive的壓縮測試
1、創建表
2、載入數據
查看原表的大小
3、
新建表
查看新表的大小
4、
新建表
查看大小
5、
新建表
查看大小
以上為臨時生效壓縮,真正使用需要將具體使用哪種壓縮配置到Hive配置文件
⑧ Hadoop集群執行MapRece時報錯:
原來maprece.jobhistory.address 和maprece.jobhistory.webapp.addres 這兩個address的地址使用的是CDH默認的配置值,這里需要改成hostname,這樣可能就是原來的位置不對造成的
[html]view plain
<?xmlversion="1.0"?>
<!--
(ASF)under運鋒凱oneormore
contributorlicenseagreements.
.
,Version2.0
(the"License");
theLicense.YoumayobtainaoftheLicenseat
,software
"ASIS"BASIS,
,eitherexpressorimplied.
limitationsundertheLicense.
-->
<?xml-stylesheettype="text/xsl"href="configuration.xsl"?>
<configuration>
<property>
<name>maprece.framework.name</name>
<value>yarn</value>
</property>
<property>旁喚
<name>maprece.jobhistory.address</name>
<value>fireslate.cis.umac.mo:10020</value>
</property>
<property>
<name>maprece.jobhistory.webapp.address</name>
<value>fireslate.cis.umac.mo:19888</value>
</property>
<property>
<name>基孝yarn.app.maprece.am.staging-dir</name>
<value>/user</value>
</property>
</configuration>
⑨ hadoop的maprece常見演算法案例有幾種
基本MapRece模式
計數與求和
問題陳述:
有許多文檔,每個文檔都有一些欄位組成。需要計算出每個欄位在所有文檔中的出現次數或者這些欄位的其他什麼統計值。例如,給定一個log文件,其中的每條記錄都包含一個響應時間,需要計算出平均響應時間。
解決方案:
讓我們先從簡單的例子入手。在下面的代碼片段里,Mapper每遇到指定詞就把頻次記1,Recer一個個遍歷這些詞的集合然後把他們的頻次加和。
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Recer
7 method Rece(term t, counts [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
這種方法的缺點顯而易見,Mapper提交了太多無意義的計數。它完全可以通過先對每個文檔中的詞進行計數從而減少傳遞給Recer的數據量:
1 class Mapper
2 method Map(docid id, doc d)
3 H = new AssociativeArray
4 for all term t in doc d do
5 H{t} = H{t} + 1
6 for all term t in H do
7 Emit(term t, count H{t})
如果要累計計數的的不只是單個文檔中的內容,還包括了一個Mapper節點處理的所有文檔,那就要用到Combiner了:
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Combiner
7 method Combine(term t, [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
12
13 class Recer
14 method Rece(term t, counts [c1, c2,...])
15 sum = 0
16 for all count c in [c1, c2,...] do
17 sum = sum + c
18 Emit(term t, count sum)
應用:Log 分析, 數據查詢
整理歸類
問題陳述:
有一系列條目,每個條目都有幾個屬性,要把具有同一屬性值的條目都保存在一個文件里,或者把條目按照屬性值分組。 最典型的應用是倒排索引。
解決方案:
解決方案很簡單。 在 Mapper 中以每個條目的所需屬性值作為 key,其本身作為值傳遞給 Recer。 Recer 取得按照屬性值分組的條目,然後可以處理或者保存。如果是在構建倒排索引,那麼 每個條目相當於一個詞而屬性值就是詞所在的文檔ID。
應用:倒排索引, ETL
過濾 (文本查找),解析和校驗
問題陳述:
假設有很多條記錄,需要從其中找出滿足某個條件的所有記錄,或者將每條記錄傳換成另外一種形式(轉換操作相對於各條記錄獨立,即對一條記錄的操作與其他記錄無關)。像文本解析、特定值抽取、格式轉換等都屬於後一種用例。
解決方案:
非常簡單,在Mapper 里逐條進行操作,輸出需要的值或轉換後的形式。
應用:日誌分析,數據查詢,ETL,數據校驗
分布式任務執行
問題陳述:
大型計算可以分解為多個部分分別進行然後合並各個計算的結果以獲得最終結果。
解決方案: 將數據切分成多份作為每個 Mapper 的輸入,每個Mapper處理一份數據,執行同樣的運算,產生結果,Recer把多個Mapper的結果組合成一個。
案例研究: 數字通信系統模擬
像 WiMAX 這樣的數字通信模擬軟體通過系統模型來傳輸大量的隨機數據,然後計算傳輸中的錯誤幾率。 每個 Mapper 處理樣本 1/N 的數據,計算出這部分數據的錯誤率,然後在 Recer 里計算平均錯誤率。
應用:工程模擬,數字分析,性能測試
排序
問題陳述:
有許多條記錄,需要按照某種規則將所有記錄排序或是按照順序來處理記錄。
解決方案: 簡單排序很好辦 – Mappers 將待排序的屬性值為鍵,整條記錄為值輸出。 不過實際應用中的排序要更加巧妙一點, 這就是它之所以被稱為MapRece 核心的原因(「核心」是說排序?因為證明Hadoop計算能力的實驗是大數據排序?還是說Hadoop的處理過程中對key排序的環節?)。在實踐中,常用組合鍵來實現二次排序和分組。
MapRece 最初只能夠對鍵排序, 但是也有技術利用可以利用Hadoop 的特性來實現按值排序。想了解的話可以看這篇博客。
按照BigTable的概念,使用 MapRece來對最初數據而非中間數據排序,也即保持數據的有序狀態更有好處,必須注意這一點。換句話說,在數據插入時排序一次要比在每次查詢數據的時候排序更高效。
應用:ETL,數據分析
非基本 MapRece 模式
迭代消息傳遞 (圖處理)
問題陳述:
假設一個實體網路,實體之間存在著關系。 需要按照與它比鄰的其他實體的屬性計算出一個狀態。這個狀態可以表現為它和其它節點之間的距離, 存在特定屬性的鄰接點的跡象, 鄰域密度特徵等等。
解決方案:
網路存儲為系列節點的結合,每個節點包含有其所有鄰接點ID的列表。按照這個概念,MapRece 迭代進行,每次迭代中每個節點都發消息給它的鄰接點。鄰接點根據接收到的信息更新自己的狀態。當滿足了某些條件的時候迭代停止,如達到了最大迭代次數(網路半徑)或兩次連續的迭代幾乎沒有狀態改變。從技術上來看,Mapper 以每個鄰接點的ID為鍵發出信息,所有的信息都會按照接受節點分組,recer 就能夠重算各節點的狀態然後更新那些狀態改變了的節點。下面展示了這個演算法:
1 class Mapper
2 method Map(id n, object N)
3 Emit(id n, object N)
4 for all id m in N.OutgoingRelations do
5 Emit(id m, message getMessage(N))
6
7 class Recer
8 method Rece(id m, [s1, s2,...])
9 M = null
10 messages = []
11 for all s in [s1, s2,...] do
12 if IsObject(s) then
13 M = s
14 else // s is a message
15 messages.add(s)
16 M.State = calculateState(messages)
17 Emit(id m, item M)
一個節點的狀態可以迅速的沿著網路傳全網,那些被感染了的節點又去感染它們的鄰居,整個過程就像下面的圖示一樣:
案例研究: 沿分類樹的有效性傳遞
問題陳述:
這個問題來自於真實的電子商務應用。將各種貨物分類,這些類別可以組成一個樹形結構,比較大的分類(像男人、女人、兒童)可以再分出小分類(像男褲或女裝),直到不能再分為止(像男式藍色牛仔褲)。這些不能再分的基層類別可以是有效(這個類別包含有貨品)或者已無效的(沒有屬於這個分類的貨品)。如果一個分類至少含有一個有效的子分類那麼認為這個分類也是有效的。我們需要在已知一些基層分類有效的情況下找出分類樹上所有有效的分類。
解決方案:
這個問題可以用上一節提到的框架來解決。我們咋下面定義了名為 getMessage和 calculateState 的方法:
1 class N
2 State in {True = 2, False = 1, null = 0},
3 initialized 1 or 2 for end-of-line categories, 0 otherwise
4 method getMessage(object N)
5 return N.State
6 method calculateState(state s, data [d1, d2,...])
7 return max( [d1, d2,...] )
案例研究:廣度優先搜索
問題陳述:需要計算出一個圖結構中某一個節點到其它所有節點的距離。
解決方案: Source源節點給所有鄰接點發出值為0的信號,鄰接點把收到的信號再轉發給自己的鄰接點,每轉發一次就對信號值加1:
1 class N
2 State is distance,
3 initialized 0 for source node, INFINITY for all other nodes
4 method getMessage(N)
5 return N.State + 1
6 method calculateState(state s, data [d1, d2,...])
7 min( [d1, d2,...] )
案例研究:網頁排名和 Mapper 端數據聚合
這個演算法由Google提出,使用權威的PageRank演算法,通過連接到一個網頁的其他網頁來計算網頁的相關性。真實演算法是相當復雜的,但是核心思想是權重可以傳播,也即通過一個節點的各聯接節點的權重的均值來計算節點自身的權重。
1 class N
2 State is PageRank
3 method getMessage(object N)
4 return N.State / N.OutgoingRelations.size()
5 method calculateState(state s, data [d1, d2,...])
6 return ( sum([d1, d2,...]) )
要指出的是上面用一個數值來作為評分實際上是一種簡化,在實際情況下,我們需要在Mapper端來進行聚合計算得出這個值。下面的代碼片段展示了這個改變後的邏輯 (針對於 PageRank 演算法):
1 class Mapper
2 method Initialize
3 H = new AssociativeArray
4 method Map(id n, object N)
5 p = N.PageRank / N.OutgoingRelations.size()
6 Emit(id n, object N)
7 for all id m in N.OutgoingRelations do
8 H{m} = H{m} + p
9 method Close
10 for all id n in H do
11 Emit(id n, value H{n})
12
13 class Recer
14 method Rece(id m, [s1, s2,...])
15 M = null
16 p = 0
17 for all s in [s1, s2,...] do
18 if IsObject(s) then
19 M = s
20 else
21 p = p + s
22 M.PageRank = p
23 Emit(id m, item M)
應用:圖分析,網頁索引
值去重 (對唯一項計數)
問題陳述: 記錄包含值域F和值域 G,要分別統計相同G值的記錄中不同的F值的數目 (相當於按照 G分組).
這個問題可以推而廣之應用於分面搜索(某些電子商務網站稱之為Narrow Search)
Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}
Result:
a -> 3 // F=1, F=2, F=3
b -> 2 // F=1, F=3
d -> 1 // F=2
e -> 1 // F=2
解決方案 I:
第一種方法是分兩個階段來解決這個問題。第一階段在Mapper中使用F和G組成一個復合值對,然後在Recer中輸出每個值對,目的是為了保證F值的唯一性。在第二階段,再將值對按照G值來分組計算每組中的條目數。
第一階段:
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...]])
3 for all category g in [g1, g2,...]
4 Emit(record [g, f], count 1)
5
6 class Recer
7 method Rece(record [g, f], counts [n1, n2, ...])
8 Emit(record [g, f], null )
第二階段:
1 class Mapper
2 method Map(record [f, g], null)
3 Emit(value g, count 1)
4
5 class Recer
6 method Rece(value g, counts [n1, n2,...])
7 Emit(value g, sum( [n1, n2,...] ) )
解決方案 II:
第二種方法只需要一次MapRece 即可實現,但擴展性不強。演算法很簡單-Mapper 輸出值和分類,在Recer里為每個值對應的分類去重然後給每個所屬的分類計數加1,最後再在Recer結束後將所有計數加和。這種方法適用於只有有限個分類,而且擁有相同F值的記錄不是很多的情況。例如網路日誌處理和用戶分類,用戶的總數很多,但是每個用戶的事件是有限的,以此分類得到的類別也是有限的。值得一提的是在這種模式下可以在數據傳輸到Recer之前使用Combiner來去除分類的重復值。
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...] )
3 for all category g in [g1, g2,...]
4 Emit(value f, category g)
5
6 class Recer
7 method Initialize
8 H = new AssociativeArray : category -> count
9 method Rece(value f, categories [g1, g2,...])
10 [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
11 for all category g in [g1', g2',...]
12 H{g} = H{g} + 1
13 method Close
14 for all category g in H do
15 Emit(category g, count H{g})
應用:日誌分析,用戶計數
互相關
問題陳述:有多個各由若干項構成的組,計算項兩兩共同出現於一個組中的次數。假如項數是N,那麼應該計算N*N。
這種情況常見於文本分析(條目是單詞而元組是句子),市場分析(購買了此物的客戶還可能購買什麼)。如果N*N小到可以容納於一台機器的內存,實現起來就比較簡單了。
配對法
第一種方法是在Mapper中給所有條目配對,然後在Recer中將同一條目對的計數加和。但這種做法也有缺點:
使用 combiners 帶來的的好處有限,因為很可能所有項對都是唯一的
不能有效利用內存
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 for all item j in [i1, i2,...]
5 Emit(pair [i j], count 1)
6
7 class Recer
8 method Rece(pair [i j], counts [c1, c2,...])
9 s = sum([c1, c2,...])
10 Emit(pair[i j], count s)
Stripes Approach(條方法?不知道這個名字怎麼理解)
第二種方法是將數據按照pair中的第一項來分組,並維護一個關聯數組,數組中存儲的是所有關聯項的計數。The second approach is to group data by the first item in pair and maintain an associative array (「stripe」) where counters for all adjacent items are accumulated. Recer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
中間結果的鍵數量相對較少,因此減少了排序消耗。
可以有效利用 combiners。
可在內存中執行,不過如果沒有正確執行的話也會帶來問題。
實現起來比較復雜。
一般來說, 「stripes」 比 「pairs」 更快
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 H = new AssociativeArray : item -> counter
5 for all item j in [i1, i2,...]
6 H{j} = H{j} + 1
7 Emit(item i, stripe H)
8
9 class Recer
10 method Rece(item i, stripes [H1, H2,...])
11 H = new AssociativeArray : item -> counter
12 H = merge-sum( [H1, H2,...] )
13 for all item j in H.keys()
14 Emit(pair [i j], H{j})
應用:文本分析,市場分析
參考資料:Lin J. Dyer C. Hirst G. Data Intensive Processing MapRece
用MapRece 表達關系模式
在這部分我們會討論一下怎麼使用MapRece來進行主要的關系操作。
篩選(Selection)
1 class Mapper
2 method Map(rowkey key, tuple t)
3 if t satisfies the predicate
4 Emit(tuple t, null)
投影(Projection)
投影只比篩選稍微復雜一點,在這種情況下我們可以用Recer來消除可能的重復值。
1 class Mapper
2 method Map(rowkey key, tuple t)
3 tuple g = project(t) // extract required fields to tuple g
4 Emit(tuple g, null)
5
6 class Recer
⑩ 請簡要描述Hadoop計算框架MapRece的工作原理
分為2個步驟,map和rece,map專門負責對每個數據獨立地同時地打標簽,框架會對相同標簽的數據分成一組,rece對分好的那些組數據做累計計算。我們只要分別實現map和rece就可以了