A. 如何使用Python為Hadoop編寫一個簡單的MapRece程序
我們將編寫一個簡單的 MapRece 程序,使用的是C-Python,而不是Jython編寫後打包成jar包的程序。
我們的這個例子將模仿 WordCount 並使用Python來實現,例子通過讀取文本文件來統計出單詞的出現次數。結果也以文本形式輸出,每一行包含一個單詞和單詞出現的次數,兩者中間使用製表符來想間隔。
先決條件
編寫這個程序之前,你學要架設好Hadoop 集群,這樣才能不會在後期工作抓瞎。如果你沒有架設好,那麼在後面有個簡明教程來教你在Ubuntu linux 上搭建(同樣適用於其他發行版linux、unix)
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立單節點的 Hadoop 集群
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多節點的 Hadoop 集群
Python的MapRece代碼
使用Python編寫MapRece代碼的技巧就在於我們使用了 HadoopStreaming 來幫助我們在Map 和 Rece間傳遞數據通過STDIN (標准輸入)和STDOUT (標准輸出).我們僅僅使用Python的sys.stdin來輸入數據,使用sys.stdout輸出數據,這樣做是因為HadoopStreaming會幫我們辦好其他事。這是真的,別不相信!
B. hadoop中mapper用時間間隔做為key怎麼寫
從Map到Rece
MapRece其實是分治演算法的一種實現,其處理過程亦和用管道命令來處理十分相似,一些簡單的文本字元的處理甚至也可以使用Unix的管道命令來替代,從處理流程的角度來看大概如下:
cat input | grep | sort | uniq -c | cat > output # Input -> Map -> Shuffle & Sort -> Rece -> Output
簡單的流程圖如下:
對於Shuffle,簡單地說就是將Map的輸出通過一定的演算法劃分到合適的Recer中進行處理。Sort當然就是對中間的結果進行按key排序,因為Recer的輸入是嚴格要求按key排序的。
Input->Map->Shuffle&Sort->Rece->Output只是從宏觀的角度對MapRece的簡單描述,實際在MapRece的框架中,即從編程的角度來看,其處理流程是Input->Map->Sort->Combine->Partition->Rece->Output。用之前的對溫度進行統計的例子來講述這些過程。
Input Phase
輸入的數據需要以一定的格式傳遞給Mapper的,格式有多種,如TextInputFormat、DBInputFormat、SequenceFileInput等等,可以使用JobConf.setInputFormat來設置,這個過程還應該包括對輸入的數據進行任務粒度劃分(split)然後再傳遞給Mapper。在溫度的例子中,由於處理的都是文本數據,輸入的格式使用默認的TextInputFormat即可。
Map Phase
對輸入的key、value對進行處理,輸出的是key、value的集合,即map (k1, v1) -> list(k2, v2),使用JobConf.setMapperClass設置自己的Mapper。在例子中,將(行號、溫度的文本數據)作為key/value輸入,經過處理後,從溫度的文件數據中提取出日期中的年份和該日的溫度數據,形成新的key/value對,最後以list(年, 溫度)的結果輸出,如[(1950, 10), (1960, 40), (1960, 5)]。
Sort Phase
對Mapper輸出的數據進行排序,可以通過JobConf.setOutputKeyComparatorClass來設置自己的排序規則。在例子中,經過排序之後,輸出的list集合是按年份進行排序的list(年, 溫度),如[(1950, 10), (1950, 5), (1960, 40)]。
Combine Phase
這個階段是將中間結果中有相同的key的<key, value>對合並成一對,Combine的過程與Rece很相似,使用的甚至是Rece的介面。通過Combine能夠減少<key, value>的集合數量,從而減少網路流量。Combine只是一個可選的優化過程,並且無論Combine執行多少次(>=0),都會使Recer產生相同的輸出,使用JobConf.setCombinerClass來設置自定義的Combine Class。在例子中,假如map1產生出的結果為[(1950, 0), (1950, 20), (1950, 10)],在map2產生出的結果為[(1950, 15), (1950, 25)],這兩組數據作為Recer的輸入並經過Recer處理後的年最高溫度結果為(1950, 25),然而當在Mapper之後加了Combine(Combine先過濾出最高溫度),則map1的輸出是[(1950, 20)]和map2的輸出是[(1950, 25)],雖然其他的三組數據被拋棄了,但是對於Recer的輸出而言,處理後的年最高溫度依然是(1950, 25)。
Partition Phase
把Mapper任務輸出的中間結果按key的范圍劃分成R份(R是預先定義的Rece任務的個數),默認的劃分演算法是」(key.hashCode() & Integer.MAX_VALUE) % numPartitions」,這樣保證了某一范圍的key一定是由某個Recer來處理,簡化了Recer的處理流程,使用JobConf.setPartitionClass來設置自定義的Partition Class。在例子中,默認就自然是對年份進行取模了。
Rece Phase
Recer獲取Mapper輸出的中間結果,作為輸入對某一key范圍區間進行處理,使用JobConf.setRecerClass來設置。在例子中,與Combine Phase中的處理是一樣的,把各個Mapper傳遞過來的數據計算年最高溫度。
Output Phase
Recer的輸出格式和Mapper的輸入格式是相對應的,當然Recer的輸出還可以作為另一個Mapper的輸入繼續進行處理。
C. 請簡要描述Hadoop計算框架MapRece的工作原理
分為2個步驟,map和rece,map專門負責對每個數據獨立地同時地打標簽,框架會對相同標簽的數據分成一組,rece對分好的那些組數據做累計計算。我們只要分別實現map和rece就可以了