① 請簡要描述Hadoop計算框架MapRece的工作原理
分為2個步驟,map和rece,map專門負責對每個數據獨立地同時地打標簽,框架會對相同標簽的數據分成一組,rece對分好的那些組數據做累計計算。我們只要分別實現map和rece就可以了
② hadoop的maprece常見演算法案例有幾種
基本MapRece模式
計數與求和
問題陳述:
有許多文檔,每個文檔都有一些欄位組成。需要計算出每個欄位在所有文檔中的出現次數或者這些欄位的其他什麼統計值。例如,給定一個log文件,其中的每條記錄都包含一個響應時間,需要計算出平均響應時間。
解決方案:
讓我們先從簡單的例子入手。在下面的代碼片段里,Mapper每遇到指定詞就把頻次記1,Recer一個個遍歷這些詞的集合然後把他們的頻次加和。
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Recer
7 method Rece(term t, counts [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
這種方法的缺點顯而易見,Mapper提交了太多無意義的計數。它完全可以通過先對每個文檔中的詞進行計數從而減少傳遞給Recer的數據量:
1 class Mapper
2 method Map(docid id, doc d)
3 H = new AssociativeArray
4 for all term t in doc d do
5 H{t} = H{t} + 1
6 for all term t in H do
7 Emit(term t, count H{t})
如果要累計計數的的不只是單個文檔中的內容,還包括了一個Mapper節點處理的所有文檔,那就要用到Combiner了:
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Combiner
7 method Combine(term t, [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
12
13 class Recer
14 method Rece(term t, counts [c1, c2,...])
15 sum = 0
16 for all count c in [c1, c2,...] do
17 sum = sum + c
18 Emit(term t, count sum)
應用:Log 分析, 數據查詢
整理歸類
問題陳述:
有一系列條目,每個條目都有幾個屬性,要把具有同一屬性值的條目都保存在一個文件里,或者把條目按照屬性值分組。 最典型的應用是倒排索引。
解決方案:
解決方案很簡單。 在 Mapper 中以每個條目的所需屬性值作為 key,其本身作為值傳遞給 Recer。 Recer 取得按照屬性值分組的條目,然後可以處理或者保存。如果是在構建倒排索引,那麼 每個條目相當於一個詞而屬性值就是詞所在的文檔ID。
應用:倒排索引, ETL
過濾 (文本查找),解析和校驗
問題陳述:
假設有很多條記錄,需要從其中找出滿足某個條件的所有記錄,或者將每條記錄傳換成另外一種形式(轉換操作相對於各條記錄獨立,即對一條記錄的操作與其他記錄無關)。像文本解析、特定值抽取、格式轉換等都屬於後一種用例。
解決方案:
非常簡單,在Mapper 里逐條進行操作,輸出需要的值或轉換後的形式。
應用:日誌分析,數據查詢,ETL,數據校驗
分布式任務執行
問題陳述:
大型計算可以分解為多個部分分別進行然後合並各個計算的結果以獲得最終結果。
解決方案: 將數據切分成多份作為每個 Mapper 的輸入,每個Mapper處理一份數據,執行同樣的運算,產生結果,Recer把多個Mapper的結果組合成一個。
案例研究: 數字通信系統模擬
像 WiMAX 這樣的數字通信模擬軟體通過系統模型來傳輸大量的隨機數據,然後計算傳輸中的錯誤幾率。 每個 Mapper 處理樣本 1/N 的數據,計算出這部分數據的錯誤率,然後在 Recer 里計算平均錯誤率。
應用:工程模擬,數字分析,性能測試
排序
問題陳述:
有許多條記錄,需要按照某種規則將所有記錄排序或是按照順序來處理記錄。
解決方案: 簡單排序很好辦 – Mappers 將待排序的屬性值為鍵,整條記錄為值輸出。 不過實際應用中的排序要更加巧妙一點, 這就是它之所以被稱為MapRece 核心的原因(「核心」是說排序?因為證明Hadoop計算能力的實驗是大數據排序?還是說Hadoop的處理過程中對key排序的環節?)。在實踐中,常用組合鍵來實現二次排序和分組。
MapRece 最初只能夠對鍵排序, 但是也有技術利用可以利用Hadoop 的特性來實現按值排序。想了解的話可以看這篇博客。
按照BigTable的概念,使用 MapRece來對最初數據而非中間數據排序,也即保持數據的有序狀態更有好處,必須注意這一點。換句話說,在數據插入時排序一次要比在每次查詢數據的時候排序更高效。
應用:ETL,數據分析
非基本 MapRece 模式
迭代消息傳遞 (圖處理)
問題陳述:
假設一個實體網路,實體之間存在著關系。 需要按照與它比鄰的其他實體的屬性計算出一個狀態。這個狀態可以表現為它和其它節點之間的距離, 存在特定屬性的鄰接點的跡象, 鄰域密度特徵等等。
解決方案:
網路存儲為系列節點的結合,每個節點包含有其所有鄰接點ID的列表。按照這個概念,MapRece 迭代進行,每次迭代中每個節點都發消息給它的鄰接點。鄰接點根據接收到的信息更新自己的狀態。當滿足了某些條件的時候迭代停止,如達到了最大迭代次數(網路半徑)或兩次連續的迭代幾乎沒有狀態改變。從技術上來看,Mapper 以每個鄰接點的ID為鍵發出信息,所有的信息都會按照接受節點分組,recer 就能夠重算各節點的狀態然後更新那些狀態改變了的節點。下面展示了這個演算法:
1 class Mapper
2 method Map(id n, object N)
3 Emit(id n, object N)
4 for all id m in N.OutgoingRelations do
5 Emit(id m, message getMessage(N))
6
7 class Recer
8 method Rece(id m, [s1, s2,...])
9 M = null
10 messages = []
11 for all s in [s1, s2,...] do
12 if IsObject(s) then
13 M = s
14 else // s is a message
15 messages.add(s)
16 M.State = calculateState(messages)
17 Emit(id m, item M)
一個節點的狀態可以迅速的沿著網路傳全網,那些被感染了的節點又去感染它們的鄰居,整個過程就像下面的圖示一樣:
案例研究: 沿分類樹的有效性傳遞
問題陳述:
這個問題來自於真實的電子商務應用。將各種貨物分類,這些類別可以組成一個樹形結構,比較大的分類(像男人、女人、兒童)可以再分出小分類(像男褲或女裝),直到不能再分為止(像男式藍色牛仔褲)。這些不能再分的基層類別可以是有效(這個類別包含有貨品)或者已無效的(沒有屬於這個分類的貨品)。如果一個分類至少含有一個有效的子分類那麼認為這個分類也是有效的。我們需要在已知一些基層分類有效的情況下找出分類樹上所有有效的分類。
解決方案:
這個問題可以用上一節提到的框架來解決。我們咋下面定義了名為 getMessage和 calculateState 的方法:
1 class N
2 State in {True = 2, False = 1, null = 0},
3 initialized 1 or 2 for end-of-line categories, 0 otherwise
4 method getMessage(object N)
5 return N.State
6 method calculateState(state s, data [d1, d2,...])
7 return max( [d1, d2,...] )
案例研究:廣度優先搜索
問題陳述:需要計算出一個圖結構中某一個節點到其它所有節點的距離。
解決方案: Source源節點給所有鄰接點發出值為0的信號,鄰接點把收到的信號再轉發給自己的鄰接點,每轉發一次就對信號值加1:
1 class N
2 State is distance,
3 initialized 0 for source node, INFINITY for all other nodes
4 method getMessage(N)
5 return N.State + 1
6 method calculateState(state s, data [d1, d2,...])
7 min( [d1, d2,...] )
案例研究:網頁排名和 Mapper 端數據聚合
這個演算法由Google提出,使用權威的PageRank演算法,通過連接到一個網頁的其他網頁來計算網頁的相關性。真實演算法是相當復雜的,但是核心思想是權重可以傳播,也即通過一個節點的各聯接節點的權重的均值來計算節點自身的權重。
1 class N
2 State is PageRank
3 method getMessage(object N)
4 return N.State / N.OutgoingRelations.size()
5 method calculateState(state s, data [d1, d2,...])
6 return ( sum([d1, d2,...]) )
要指出的是上面用一個數值來作為評分實際上是一種簡化,在實際情況下,我們需要在Mapper端來進行聚合計算得出這個值。下面的代碼片段展示了這個改變後的邏輯 (針對於 PageRank 演算法):
1 class Mapper
2 method Initialize
3 H = new AssociativeArray
4 method Map(id n, object N)
5 p = N.PageRank / N.OutgoingRelations.size()
6 Emit(id n, object N)
7 for all id m in N.OutgoingRelations do
8 H{m} = H{m} + p
9 method Close
10 for all id n in H do
11 Emit(id n, value H{n})
12
13 class Recer
14 method Rece(id m, [s1, s2,...])
15 M = null
16 p = 0
17 for all s in [s1, s2,...] do
18 if IsObject(s) then
19 M = s
20 else
21 p = p + s
22 M.PageRank = p
23 Emit(id m, item M)
應用:圖分析,網頁索引
值去重 (對唯一項計數)
問題陳述: 記錄包含值域F和值域 G,要分別統計相同G值的記錄中不同的F值的數目 (相當於按照 G分組).
這個問題可以推而廣之應用於分面搜索(某些電子商務網站稱之為Narrow Search)
Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}
Result:
a -> 3 // F=1, F=2, F=3
b -> 2 // F=1, F=3
d -> 1 // F=2
e -> 1 // F=2
解決方案 I:
第一種方法是分兩個階段來解決這個問題。第一階段在Mapper中使用F和G組成一個復合值對,然後在Recer中輸出每個值對,目的是為了保證F值的唯一性。在第二階段,再將值對按照G值來分組計算每組中的條目數。
第一階段:
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...]])
3 for all category g in [g1, g2,...]
4 Emit(record [g, f], count 1)
5
6 class Recer
7 method Rece(record [g, f], counts [n1, n2, ...])
8 Emit(record [g, f], null )
第二階段:
1 class Mapper
2 method Map(record [f, g], null)
3 Emit(value g, count 1)
4
5 class Recer
6 method Rece(value g, counts [n1, n2,...])
7 Emit(value g, sum( [n1, n2,...] ) )
解決方案 II:
第二種方法只需要一次MapRece 即可實現,但擴展性不強。演算法很簡單-Mapper 輸出值和分類,在Recer里為每個值對應的分類去重然後給每個所屬的分類計數加1,最後再在Recer結束後將所有計數加和。這種方法適用於只有有限個分類,而且擁有相同F值的記錄不是很多的情況。例如網路日誌處理和用戶分類,用戶的總數很多,但是每個用戶的事件是有限的,以此分類得到的類別也是有限的。值得一提的是在這種模式下可以在數據傳輸到Recer之前使用Combiner來去除分類的重復值。
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...] )
3 for all category g in [g1, g2,...]
4 Emit(value f, category g)
5
6 class Recer
7 method Initialize
8 H = new AssociativeArray : category -> count
9 method Rece(value f, categories [g1, g2,...])
10 [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
11 for all category g in [g1', g2',...]
12 H{g} = H{g} + 1
13 method Close
14 for all category g in H do
15 Emit(category g, count H{g})
應用:日誌分析,用戶計數
互相關
問題陳述:有多個各由若干項構成的組,計算項兩兩共同出現於一個組中的次數。假如項數是N,那麼應該計算N*N。
這種情況常見於文本分析(條目是單詞而元組是句子),市場分析(購買了此物的客戶還可能購買什麼)。如果N*N小到可以容納於一台機器的內存,實現起來就比較簡單了。
配對法
第一種方法是在Mapper中給所有條目配對,然後在Recer中將同一條目對的計數加和。但這種做法也有缺點:
使用 combiners 帶來的的好處有限,因為很可能所有項對都是唯一的
不能有效利用內存
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 for all item j in [i1, i2,...]
5 Emit(pair [i j], count 1)
6
7 class Recer
8 method Rece(pair [i j], counts [c1, c2,...])
9 s = sum([c1, c2,...])
10 Emit(pair[i j], count s)
Stripes Approach(條方法?不知道這個名字怎麼理解)
第二種方法是將數據按照pair中的第一項來分組,並維護一個關聯數組,數組中存儲的是所有關聯項的計數。The second approach is to group data by the first item in pair and maintain an associative array (「stripe」) where counters for all adjacent items are accumulated. Recer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
中間結果的鍵數量相對較少,因此減少了排序消耗。
可以有效利用 combiners。
可在內存中執行,不過如果沒有正確執行的話也會帶來問題。
實現起來比較復雜。
一般來說, 「stripes」 比 「pairs」 更快
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 H = new AssociativeArray : item -> counter
5 for all item j in [i1, i2,...]
6 H{j} = H{j} + 1
7 Emit(item i, stripe H)
8
9 class Recer
10 method Rece(item i, stripes [H1, H2,...])
11 H = new AssociativeArray : item -> counter
12 H = merge-sum( [H1, H2,...] )
13 for all item j in H.keys()
14 Emit(pair [i j], H{j})
應用:文本分析,市場分析
參考資料:Lin J. Dyer C. Hirst G. Data Intensive Processing MapRece
用MapRece 表達關系模式
在這部分我們會討論一下怎麼使用MapRece來進行主要的關系操作。
篩選(Selection)
1 class Mapper
2 method Map(rowkey key, tuple t)
3 if t satisfies the predicate
4 Emit(tuple t, null)
投影(Projection)
投影只比篩選稍微復雜一點,在這種情況下我們可以用Recer來消除可能的重復值。
1 class Mapper
2 method Map(rowkey key, tuple t)
3 tuple g = project(t) // extract required fields to tuple g
4 Emit(tuple g, null)
5
6 class Recer
③ 如何實現maprece計算框架以有效實現迭代
MapRece從出現以來,已經成為Apache Hadoop計算範式的扛鼎之作。它對於符合其設計的各項工作堪稱完美:大規模日誌處理,ETL批處理操作等。
隨著Hadoop使用范圍的不斷擴大,人們已經清楚知道MapRece不是所有計算的最佳框架。Hadoop 2將資源管理器YARN作為自己的頂級組件,為其他計算引擎的接入提供了可能性。如Impala等非MapRece架構的引入,使平台具備了支持互動式SQL的能力。
今天,Apache Spark是另一種這樣的替代,並且被稱為是超越MapRece的通用計算範例。也許您會好奇:MapRece一直以來已經這么有用了,怎麼能突然被取代?畢竟,還有很多ETL這樣的工作需要在Hadoop上進行,即使該平台目前也已經擁有其他實時功能。
值得慶幸的是,在Spark上重新實現MapRece一樣的計算是完全可能的。它們可以被更簡單的維護,而且在某些情況下更快速,這要歸功於Spark優化了刷寫數據到磁碟的過程。Spark重新實現MapRece編程範式不過是回歸本源。Spark模仿了Scala的函數式編程風格和API。而MapRece的想法來自於函數式編程語言LISP。
盡管Spark的主要抽象是RDD(彈性分布式數據集),實現了Map,rece等操作,但這些都不是Hadoop的Mapper或Recer API的直接模擬。這些轉變也往往成為開發者從Mapper和Recer類平行遷移到Spark的絆腳石。
與Scala或Spark中經典函數語言實現的map和rece函數相比,原有Hadoop提供的Mapper和Recer API 更靈活也更復雜。這些區別對於習慣了MapRece的開發者而言也許並不明顯,下列行為是針對Hadoop的實現而不是MapRece的抽象概念:
· Mapper和Recer總是使用鍵值對作為輸入輸出。
· 每個Recer按照Key對Value進行rece。
· 每個Mapper和Recer對於每組輸入可能產生0個,1個或多個鍵值對。
· Mapper和Recer可能產生任意的keys和values,而不局限於輸入的子集和變換。
Mapper和Recer對象的生命周期可能橫跨多個map和rece操作。它們支持setup和cleanup方法,在批量記錄處理開始之前和結束之後被調用。
本文將簡要展示怎樣在Spark中重現以上過程,您將發現不需要逐字翻譯Mapper和Recer!
作為元組的鍵值對
假定我們需要計算大文本中每一行的長度,並且報告每個長度的行數。在HadoopMapRece中,我們首先使用一個Mapper,生成為以行的長度作為key,1作為value的鍵值對。
public class LineLengthMapper extends
Mapper<LongWritable, Text, IntWritable, IntWritable> {
@Override
protected void map(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
context.write(new IntWritable(line.getLength()), new IntWritable(1));
}
}
值得注意的是Mappers和Recers只對鍵值對進行操作。所以由TextInputFormat提供輸入給LineLengthMapper,實際上也是以文本中位置為key(很少這么用,但是總是需要有東西作為Key),文本行為值的鍵值對。
與之對應的Spark實現:
lines.map(line => (line.length, 1))
Spark中,輸入只是String構成的RDD,而不是key-value鍵值對。Spark中對key-value鍵值對的表示是一個Scala的元組,用(A,B)這樣的語法來創建。上面的map操作的結果是(Int,Int)元組的RDD。當一個RDD包含很多元組,它獲得了多個方法,如receByKey,這對再現MapRece行為將是至關重要的。
Rece
rece()與receBykey()
統計行的長度的鍵值對,需要在Recer中對每種長度作為key,計算其行數的總和作為value。
public class LineLengthRecer extends
Recer<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
protected void rece(IntWritable length, Iterable<IntWritable> counts,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(length, new IntWritable(sum));
}
}
Spark中與上述Mapper,Recer對應的實現只要一行代碼:
val lengthCounts = lines.map(line => (line.length, 1)).receByKey(_ + _)
Spark的RDD API有個rece方法,但是它會將所有key-value鍵值對rece為單個value。這並不是Hadoop MapRece的行為,Spark中與之對應的是ReceByKey。
另外,Recer的Rece方法接收多值流,並產生0,1或多個結果。而receByKey,它接受的是一個將兩個值轉化為一個值的函數,在這里,就是把兩個數字映射到它們的和的簡單加法函數。此關聯函數可以被調用者用來rece多個值到一個值。與Recer方法相比,他是一個根據Key來Rece Value的更簡單而更精確的API。
Mapper
map() 與 flatMap()
現在,考慮一個統計以大寫字母開頭的單詞的個數的演算法。對於每行輸入文本,Mapper可能產生0個,1個或多個鍵值對。
public class CountUppercaseMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
for (String word : line.toString().split(" ")) {
if (Character.isUpperCase(word.charAt(0))) {
context.write(new Text(word), new IntWritable(1));
}
}
}
}
Spark對應的寫法:
lines.flatMap(
_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))
)
簡單的Spark map函數不適用於這種場景,因為map對於每個輸入只能產生單個輸出,但這個例子中一行需要產生多個輸出。所以,和MapperAPI支持的相比,Spark的map函數語義更簡單,應用范圍更窄。
Spark的解決方案是首先將每行映射為一組輸出值,這組值可能為空值或多值。隨後會通過flatMap函數被扁平化。數組中的詞會被過濾並被轉化為函數中的元組。這個例子中,真正模仿Mapper行為的是flatMap,而不是map。
groupByKey()
寫一個統計次數的recer是簡單的,在Spark中,receByKey可以被用來統計每個單詞的總數。比如出於某種原因要求輸出文件中每個單詞都要顯示為大寫字母和其數量,在MapRece中,實現如下:
public class CountUppercaseRecer extends
Recer<Text, IntWritable, Text, IntWritable> {
@Override
protected void rece(Text word, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
context
.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
}
}
但是redeceByKey不能單獨在Spark中工作,因為他保留了原來的key。為了在Spark中模擬,我們需要一些更像Recer API的操作。我們知道Recer的rece方法接受一個key和一組值,然後完成一組轉換。groupByKey和一個連續的map操作能夠達到這樣的目標:
groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }
groupByKey只是將某一個key的所有值收集在一起,並且不提供rece功能。以此為基礎,任何轉換都可以作用在key和一系列值上。此處,將key轉變為大寫字母,將values直接求和。
setup()和cleanup()
在MapRece中,Mapper和Recer可以聲明一個setup方法,在處理輸入之前執行,來進行分配資料庫連接等昂貴資源,同時可以用cleanup函數可以釋放資源。
public class SetupCleanupMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private Connection dbConnection;
@Override
protected void setup(Context context) {
dbConnection = ...;
}
...
@Override
protected void cleanup(Context context) {
dbConnection.close();
}
}
Spark中的map和flatMap方法每次只能在一個input上操作,而且沒有提供在轉換大批值前後執行代碼的方法,看起來,似乎可以直接將setup和cleanup代碼放在Sparkmap函數調用之前和之後:
val dbConnection = ...
lines.map(... dbConnection.createStatement(...) ...)
dbConnection.close() // Wrong!
然而這種方法卻不可行,原因在於:
· 它將對象dbConnection放在map函數的閉包中,這需要他是可序列化的(比如,通過java.io.Serializable實現)。而資料庫連接這種對象一般不能被序列化。
· map是一種轉換,而不是操作,並且拖延執行。連接對象不能被及時關閉。
· 即便如此,它也只能關閉driver上的連接,而不是釋放被序列化拷貝版本分配的資源連接。
事實上,map和flatMap都不是Spark中Mapper的最接近的對應函數,Spark中Mapper的最接近的對應函數是十分重要的mapPartitions()方法,這個方法能夠不僅完成單值對單值的映射,也能完成一組值對另一組值的映射,很像一個批映射(bulkmap)方法。這意味著mapPartitions()方法能夠在開始時從本地分配資源,並在批映射結束時釋放資源。
添加setup方法是簡單的,添加cleanup會更困難,這是由於檢測轉換完成仍然是困難的。例如,這樣是能工作的:
lines.mapPartitions { valueIterator =>
val dbConnection = ... // OK
val transformedIterator = valueIterator.map(... dbConnection ...)
dbConnection.close() // Still wrong! May not have evaluated iterator
transformedIterator
}
一個完整的範式應該看起來類似於:
lines.mapPartitions { valueIterator =>
if (valueIterator.isEmpty) {
Iterator[...]()
} else {
val dbConnection = ...
valueIterator.map { item =>
val transformedItem = ...
if (!valueIterator.hasNext) {
dbConnection.close()
}
transformedItem
}
}
}
雖然後者代碼翻譯註定不如前者優雅,但它確實能夠完成工作。
flatMapPartitions方法並不存在,然而,可以通過調用mapPartitions,後面跟一個flatMap(a= > a)的調用達到同樣效果。
帶有setup和cleanup的Recer對應只需仿照上述代碼使用groupByKey後面跟一個mapPartition函數。
別急,等一下,還有更多
MapRece的開發者會指出,還有更多的還沒有被提及的API:
· MapRece支持一種特殊類型的Recer,也稱為Combiner,可以從Mapper中減少洗牌(shuffled)數據大小。
· 它還支持同通過Partitioner實現的自定義分區,和通過分組Comparator實現的自定義分組。
· Context對象授予Counter API的訪問許可權以及它的累積統計。
· Recer在其生命周期內一直能看到已排序好的key 。
· MapRece有自己的Writable序列化方案。
· Mapper和Recer可以一次發射多組輸出。
· MapRece有幾十個調優參數。
有很多方法可以在Spark中實現這些方案,使用類似Accumulator的API,類似groupBy和在不同的這些方法中加入partitioner參數的方法,Java或Kryo序列化,緩存和更多。由於篇幅限制,在這篇文章中就不再累贅介紹了。
需要指出的是,MapRece的概念仍然有用。只不過現在有了一個更強大的實現,並利用函數式語言,更好地匹配其功能性。理解Spark RDD API和原來的Mapper和RecerAPI之間的差異,可以幫助開發者更好地理解所有這些函數的工作原理,以及理解如何利用Spark發揮其優勢。