㈠ 可能是全網最詳細的 Spark Sql Aggregate 源碼剖析
縱觀 Spark Sql 源碼,聚合的實現是其中較為復雜的部分,本文希望能以例子結合流程圖的方式來說清楚整個過程。這里僅關注 Aggregate 在物理執行計劃相關的內容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個專門的 Aggregation strategy 用來處理聚合,我們先來看看這個策略。
本文暫不討論 distinct Aggregate 的實現(有興趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我們來看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執行計劃的
創建聚合分為兩個階段:
AggregateExpression 共有以下幾種 mode:
Q:是否支持使用 hash based agg 是如何判斷的?
摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd
為了說明最常用也是最復雜的的 hash based agg,本小節暫時將示例 sql 改為
這樣就能進入 HashAggregateExec 的分支
構造函數主要工作就是對 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進行了初始化
在 enable code gen 的情況下,會調用 HashAggregateExec#inputRDDs 來生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們設置 spark.sql.codegen.wholeStage 為 false 來 disable code gen,這樣就會調用 HashAggregateExec#doExecute 來生成 RDD,如下:
可以看到,關鍵的部分就是根據 child.execute() 生成的 RDD 的每一個 partition 的迭代器轉化生成一個新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個 partition。由於 TungstenAggregationIterator 涉及內容非常多,我們單開一大節來進行介紹。
此迭代器:
註:UnsafeKVExternalSorter 的實現可以參考:
UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實現,由原始內存(byte array)而不是 Java 對象支持,由三個區域組成:
使用 UnsafeRow 的收益:
構造函數的主要流程已在上圖中說明,需要注意的是:當內存不液宴足時(畢竟每銷慎個 grouping 對應的 agg buffer 直接佔用內存,如果 grouping 非常多,或者 agg buffer 較大,容易出現內存用盡)會從 hash based aggregate 切換為 sort based aggregate(會 spill 數據到磁碟),後文會進行詳述。先來鬧斗銀看看最關鍵的 processInputs 方法的實現
上圖中,需要注意的是:hashMap 中 get 一個 groupingKey 對應的 agg buffer 時,若已經存在該 buffer 則直接返回;若不存在,嘗試申請內存新建一個:
上圖中,用於真正處理一條 row 的 AggregationIterator#processRow 還需進一步展開分析。在此之前,我們先來看看 AggregateFunction 的分類
AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類,具體的聚合函數均為這兩類的子類。
DeclarativeAggregate 是一類直接由 Catalyst 中的 Expressions 構成的聚合函數,主要邏輯通過調用 4 個表達式完成,分別是:
我們再次以容易理解的 Count 來舉例說明:
通常來講,實現一個基於 Expressions 的 DeclarativeAggregate 函數包含以下幾個重要的組成部分:
再來看看 AggregationIterator#processRow
AggregationIterator#processRow 會調用
生成用於處理一行數據(row)的函數
說白了 processRow 生成了函數才是直接用來接受一條 input row 來更新對應的 agg buffer,具體是根據 mode 及 aggExpression 中的 aggFunction 的類型調用其 updateExpressions 或 mergeExpressions 方法:
比如,對於 aggFunction 為 DeclarativeAggregate 類型的 Partial 下的 Count 來說就是調用其 updateExpressions 方法,即:
對於 Final 的 Count 來說就是調用其 mergeExpressions 方法,即:
對於 aggFunction 為 ImperativeAggregate 類型的 Partial 下的 Collect 來說就是調用其 update 方法,即:
對於 Final 的 Collect 來說就是調用其 merge 方法,即:
我們都知道,讀取一個迭代器的數據,是要不斷調用 hasNext 方法進行 check 是否還有數據,當該方法返回 true 的時候再調用 next 方法取得下一條數據。所以要知道如何讀取 TungstenAggregationIterator 的數據,就得分析其這兩個方法。
分為兩種情況,分別是:
Agg 的實現確實復雜,本文雖然篇幅已經很長,但還有很多方面沒有 cover 到,但基本最核心、最復雜的點都詳細介紹了,如果對於未 cover 的部分有興趣,請自行閱讀源碼進行分析~
㈡ 開源代碼是什麼意思
一句話來說,開源指的是那些源代碼或源設計可以被大眾使用、修改發行的軟體或設計體。
大眾最熟悉的開源軟體就是安卓,相信用非蘋果的智能手機用戶,現在每天用的肯定都是安卓,它也是現在影響力最大的開源軟體之一,如果沒有安卓的開源開放,相信今天沒有那麼多手機廠商和移動互聯網的興起。
如果只是從生態的角度來說,蘋果的生態也很開放,現在他們也推出了開源Swift。從這方面來說,其實兩者都是一樣的,只是開源、開放的方式方法不太一樣。
Linux 無疑是開源軟體里最最成功的一個,不管是從它目前的生態建設角度,還是從業界評價來看,包括今天雲計算的基礎也都倚賴Linux的貢獻和基石。當然,像OpenStack、Hadoop 、Spark等也非常成功,這些開源項目都屬於底層技術,在支撐今天整個大數據、雲計算的發展。
開源並不意味著免費,開源只是說我們做了一個好東西,把它開放給大家使用,目的是希望大家更多地使用它,並反饋使用過程中的問題或者改進方式,使得整個開源項目進步得更快,能夠更好地共享給更多有需要的人,目前像 Linux、Hadoop、Spark等等,都是這么做的。但很多時候開源背後還是帶有很濃厚的商業背景。
做得比較大的開源項目背後都有商業公司在支撐,如果一個成功的開源項目背後沒有商業公司,這是不健康的,我們需要開源和商業之間的互補對稱來促進整個社區和技術的不斷前進答。
㈢ 《深入理解SPARK核心思想與源碼分析》epub下載在線閱讀,求百度網盤雲資源
《深入理解SPARK》(耿嘉安)電子書網盤下載免費在線閱讀
資源鏈接:
鏈接:
書名:深入理解SPARK
作者:耿嘉安
豆瓣評分:7.2
出版社:機械工業出版社
出版年份:2016-1-1
頁數:469
內容簡介:
《深入理解SPARK:核心思想與源碼分析》結合大量圖和示例,對Spark的架構、部署模式和工作模塊的設計理念、實現源碼與使用技巧進行了深入的剖析與解讀。
《深入理解SPARK:核心思想與源碼分析》一書對Spark1.2.0版本的源代碼進行了全面而深入的分析,旨在為Spark的優化、定製和擴展提供原理性的指導。阿里巴巴集團專家鼎力推薦、阿里巴巴資深Java開發和大數據專家撰寫。
本書分為三篇:
准備篇(第1~2章),介紹了Spark的環境搭建、設計理念與基本架構,幫助讀者了解一些背景知識。
核心設計篇(第3~7章),著重講解SparkContext的初始化、存儲體系、任務提交與執行、計算引擎及部署模式的原理和源碼分析。通過這部分的內容,讀者可以通過源碼剖析更加深入理解Spark的核心設計與實現,以便在實際使用中能夠快速解決線上問題並對性能進行調優。
擴展篇(第8~11章),主要講解基於Spark核心的各種擴展及應用,包括SQL處理引擎、Hive處理、流式計算框架Spark Streaming、圖計算框架GraphX、機器學習庫MLlib等內容。通過閱讀這部分內容,讀者可以擴展實際項目中對Spark的應用場景,讓Spark煥發活力。
作者簡介:
耿嘉安,10年IT行業相關經驗。就職於阿里巴巴商家業務事業部,任資深Java工程師,專注於開源和大數據領域,目前與小夥伴們基於ODPS構建阿里的大數據商業解決方案——御膳房。在大量的工作實踐中,對J2EE、JVM、Tomcat、Spring、Hadoop、Spark、MySQL、Redis都有深入研究,尤其喜歡剖析開源項目的源碼實現。早期從事J2EE企業級應用開發,對Java相關技術有獨到見解。業余時間喜歡研究中國古代歷史,古詩詞,旅遊,足球等。
㈣ spark源碼二次開發難嗎
spark源碼二次開發不難。掌握了源碼編譯,就具備了對Spark進行二次開發的基本條件了,要修改Spark源碼,進行二次開發,那麼就得從官網下載指定版本的源碼,導入ide開發環境,進行源碼的修改。接著修改完了。
㈤ 《深入理解spark核心思想及源碼分析》pdf下載在線閱讀全文,求百度網盤雲資源
《深入理解spark核心思想及源碼分析》網路網盤pdf最新全集下載:
鏈接:https://pan..com/s/1iOq9-MrepVdWcIrbALPMPg
㈥ Spark源碼分析之SparkSubmit的流程
本文主要對SparkSubmit的任務提交流程源碼進行分析。 Spark源碼版本為2.3.1。
首先閱讀一下啟動腳本,看看首先載入的是哪個類,我們看一下 spark-submit 啟動腳本中的具體內容。
可以看到這里載入的類是org.apache.spark.deploy.SparkSubmit,並且把啟動相關的參數也帶過去了。下面我們跟一下源碼看看整個流程是如何運作的...
SparkSubmit的main方法如下
這里我們由於我們是提交作業,所有會走上面的submit(appArgs, uninitLog)方法
可以看到submit方法首先會准備任務提交的環境,調用了prepareSubmitEnvironment,該方法會返回四元組,該方法中會調用doPrepareSubmitEnvironment,這里我們重點注意 childMainClass類具體是什麼 ,因為這里涉及到後面啟動我們主類的過程。
以下是doPrepareSubmitEnvironment方法的源碼...
可以看到該方法首先是解析相關的參數,如jar包,mainClass的全限定名,系統配置,校驗一些參數,等等,之後的關鍵點就是根據我們 deploy-mode 參數來判斷是如何運行我們的mainClass,這里主要是通過childMainClass這個參數來決定下一步首先啟動哪個類。
childMainClass根據部署模型有不同的值:
之後該方法會把准備好的四元組返回,我們接著看之前的submit方法
可以看到這里最終會調用doRunMain()方法去進行下一步。
doRunMain的實現如下...
doRunMain方法中會判斷是否需要一個代理用戶,然後無論需不需要都會執行runMain方法,我們接下來看看runMain方法是如何實現的。
這里我們只假設以集群模式啟動,首先會載入類,將我們的childMainClass載入為位元組碼對象mainClass ,然後將mainClass 映射成SparkApplication對象,因為我們以集群模式啟動,那麼上一步返回四元組中的childMainClass的參數為ClientApp的全限定名,而這里會調用app實例的start方法因此,這里最終調用的是ClientApp的start方法。
ClientApp的start方法如下...
可以看到這里和之前我們的master啟動流程有些相似。
可以參考我上一篇文章 Spark源碼分析之Master的啟動流程 對這一流程加深理解。
首先是准備rpcEnv環境,之後通過master的地址獲取masterEndpoints端點相關信息,因為這里運行start方法時會將之前配置的相關參數都傳進來,之後就會通過rpcEnv注冊相關clientEndPoint端點信息,同時需要注意,這里會把masterEndpoints端點信息也作為構造ClientEndpoint端點的參數,也就是說這個ClientEndpoint會和masterEndpoints通信。
而在我上一篇文章中說過,只要是setupEndpoint方法被調用,一定會調用相關端點的的onStart方法,而這會調用clientEndPoint的onStart方法。
ClientEndPoint類中的onStart方法會匹配launch事件。源碼如下
onStart中匹配我們的launch的過程,這個過程是啟動driverWrapper的過程,可以看到上面源碼中封裝了mainClass ,該參數對應DriverWrapper類的全限定名,之後將mainClass封裝到command中,然後封裝到driverDescription中,向Master申請啟動Driver。
這個過程會向Mster發送消息,是通過rpcEnv來實現發射消息的,而這里就涉及到outbox信箱,會調用postToOutbox方法,向outbox信箱中添加消息,然後通過TransportClient的send或sendRpc方法發送消息。發件箱以及發送過程是在同一個線程中進行。
而細心的同學會注意到這里調用的方法名為SendToMasterAndForwardReply,見名之意,發送消息到master並且期待回應。
下面是rpcEnv來實現向遠端發送消息的一個調用流程,最終會通過netty中的TransportClient來寫出。
之後,Master端會觸發receiveAndReply函數,匹配RequestSubmitDriver樣例類,完成模式匹配執行後續流程。
可以看到這里首先將Driver信息封裝成DriverInfo,然後添加待調度列表waitingDrivers中,然後調用通用的schele函數。
由於waitingDrivers不為空,則會走LaunchDriver的流程,當前的application申請資源,這時會向worker發送消息,觸發Worker的receive方法。
Worker的receive方法中,當Worker遇到LaunchDriver指令時,創建並啟動一個DriverRunner,DriverRunner啟動一個線程,非同步的處理Driver啟動工作。這里說啟動的Driver就是剛才說的org.apache.spark.deploy.worker.DriverWrapper
可以看到上面在DriverRunner中是開辟線程非同步的處理Driver啟動工作,不會阻塞主進程的執行,而prepareAndRunDriver方法中最終調用 runDriver..
runDriver中主要先做了一些初始化工作,接著就開始啟動driver了。
上述Driver啟動工作主要分為以下幾步:
下面我們直接看DriverWrapper的實現
DriverWrapper,會創建了一個RpcEndpoint與RpcEnv,RpcEndpoint為WorkerWatcher,主要目的為監控Worker節點是否正常,如果出現異常就直接退出,然後當前的ClassLoader載入userJar,同時執行userMainClass,在執行用戶的main方法後關閉workerWatcher。
以上就是SparkSubmit的流程,下一篇我會對SparkContext的源碼進行解析。
歡迎關注...