導航:首頁 > 源碼編譯 > spark源碼導讀

spark源碼導讀

發布時間:2023-05-11 12:46:03

『壹』 《深入理解SPARK核心思想與源碼分析》epub下載在線閱讀,求百度網盤雲資源

《深入理解SPARK》(耿嘉安)電子書網盤下載免費在線閱讀

資源鏈接:

鏈接:

提取碼:oeso

書名:深入理解SPARK

作者:耿嘉安

豆瓣評分:7.2

出版社:機械工業出版社

出版年份:2016-1-1

頁數:469

內容簡介:

《深入理解SPARK:核心思想與源碼分析》結合大量圖和示例,對Spark的架構、部署模式和工作模塊的設計理念、實現源碼與使用技巧進行了深入的剖析與解讀。

《深入理解SPARK:核心思想與源碼分析》一書對Spark1.2.0版本的源代碼進行了全面而深入的分析,旨在為Spark的優化、定製和擴展提供原理性的指導。阿里巴巴集團專家鼎力推薦、阿里巴巴資深Java開發和大數據專家撰寫。

本書分為三篇:

准備篇(第1~2章),介紹了Spark的環境搭建、設計理念與基本架構,幫助讀者了解一些背景知識。

核心設計篇(第3~7章),著重講解SparkContext的初始化、存儲體系、任務提交與執行、計算引擎及部署模式的原理和源碼分析。通過這部分的內容,讀者可以通過源碼剖析更加深入理解Spark的核心設計與實現,以便在實際使用中能夠快速解決線上問題並對性能進行調優。

擴展篇(第8~11章),主要講解基於Spark核心的各種擴展及應用,包括SQL處理引擎、Hive處理、流式計算框架Spark Streaming、圖計算框架GraphX、機器學習庫MLlib等內容。通過閱讀這部分內容,讀者可以擴展實際項目中對Spark的應用場景,讓Spark煥發活力。

作者簡介:

耿嘉安,10年IT行業相關經驗。就職於阿里巴巴商家業務事業部,任資深Java工程師,專注於開源和大數據領域,目前與小夥伴們基於ODPS構建阿里的大數據商業解決方案——御膳房。在大量的工作實踐中,對J2EE、JVM、Tomcat、Spring、Hadoop、Spark、MySQL、Redis都有深入研究,尤其喜歡剖析開源項目的源碼實現。早期從事J2EE企業級應用開發,對Java相關技術有獨到見解。業余時間喜歡研究中國古代歷史,古詩詞,旅遊,足球等。

『貳』 怎麼在eclipse里正確導入spark2.0.0 的源碼

應該說這個和是不是Spark項目沒什麼關系。

建議你使用intellij idea,在spark目錄下執行"sbt/sbt gen-idea",會自動生成.idea項目,導入即可。
idea我不熟,還需要做一些其他的插件配置(python, sbt等)和環境設置。

你也可以使用Eclipse看,Eclipse有scala IDE,把Spark項目當maven工程導入。但是子項目之間的依賴會有點問題,會報錯。

推薦使用前者,向Databricks的開發者看齊;我使用的是後者,我直接依賴了編譯好的包就不會報錯了,純讀源碼的話也勉強可以跟蹤和調試。

另外,我也看有的Committer用vim看spark

『叄』 spark1.6的源碼怎麼導入idea或者eclipseIDE

下載源頌拿坦碼導入 點擊file->open 選野桐擇以及下載好並解敏陪壓過的spark-1.6.0包 點擊ok,idea會自動安裝下載文件 完成後 打開源碼!

『肆』 Spark Sql 源碼剖析(二): TreeNode

使用 object CurrentOrigin 為 TreeNodes 提供一個可以查找上下文的地方,比如當前正在解析哪行 code。

object CurrentOrigin 主燃顫含要包含一個 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 僅在 parser 中使用,在 visit 每個節點的時候都會使用,記錄當前 parse 的節點是哪行哪列

另外,從 value 是 ThreadLocal 類型可以看出,在 Spark SQL 中,parse sql 時都是在單獨的 thread 里進行的(不同的 sql 不同的 thread)

返回該節點的 seq of children,children 是不可變的。有三種情況:

查找第一個符合 f 條件(比如某個類型的)的 TreeNode,先序遍歷。

將函數 f 遞歸應用洞猛於節點及其子節點

與 foreach 不同的是,foreach 先應用於 parent,再應用與 child;而 foreachUp 是先應用於 child 再應用與 parent

調用 foreach,foreach 中應用的函數是 ret += f(_) ,最終返回一個 seq,包含將 f 通過 foreach 方式應用於所有節點並 add 到 ret。其中 f 本身是 BaseType => A 類型

原理與 map 一致,只是 f 變成了 BaseType => TraversableOnce[A]

PartialFunction#lift :將 partial func 轉換為一個返回 Option 結果的函數。將 pf 函數應用於符合 pf 定義的節點(即 pf.lift(node)返回的 Option 不是 None )並都皮笑 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回

以 Seq 的形式返回 tree 的所有葉子節點

def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因為可能沒有符合 pf 定義的節點,所有返回的 Option 可能是 None

相當於 proctIterator.map(f).toArray ,即對於 proctIterator 每個元素執行 f 然後將 ret 組成一個 arr 返回

注意:TreeNode 沒有實現 Proct 相關方法,都由其子類自行實現

使用 new children 替換並返回該節點的拷貝。該方法會對 proctElement 每個元素進行模式匹配,根據節點類型及一定規則進行替換。

調用 transformDown

rule: PartialFunction[BaseType, BaseType]

返回 f 應用於所有子節點(非遞歸,一般將遞歸操作放在調用該函數的地方)後該節點的 。其內部的原理是調用 mapProctIterator,對每一個 proctElement(i) 進行各種模式匹配,若能匹配上某個再根據一定規則進行轉換,核心匹配轉換如下:

以上都是適用於有 children 的 node,如果是 children 為 null 的 node 直接返回

反射生成節點副本

返回該類型 TreeNode 的 name,默認為 class name;注意,會移除物理操作的 Exec$ 前綴

所有應該以該節點內嵌套樹表示的 nodes,比如,可以被用來表示 sub-queries

(children ++ innerChildren).toSet[TreeNode[_]]

主要用於互動式 debug,返回該 tree 指定下標的節點,num 可以在 numberedTreeString 找到。最終調用的

我的博客即將搬運同步至騰訊雲+社區,邀請大家一同入駐: https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi

『伍』 可能是全網最詳細的 Spark Sql Aggregate 源碼剖析

縱觀 Spark Sql 源碼,聚合的實現是其中較為復雜的部分,本文希望能以例子結合流程圖的方式來說清楚整個過程。這里僅關注 Aggregate 在物理執行計劃相關的內容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個專門的 Aggregation strategy 用來處理聚合,我們先來看看這個策略。

本文暫不討論 distinct Aggregate 的實現(有興趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我們來看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執行計劃的

創建聚合分為兩個階段:

AggregateExpression 共有以下幾種 mode:

Q:是否支持使用 hash based agg 是如何判斷的?

摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd

為了說明最常用也是最復雜的的 hash based agg,本小節暫時將示例 sql 改為

這樣就能進入 HashAggregateExec 的分支

構造函數主要工作就是對 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進行了初始化

在 enable code gen 的情況下,會調用 HashAggregateExec#inputRDDs 來生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們設置 spark.sql.codegen.wholeStage 為 false 來 disable code gen,這樣就會調用 HashAggregateExec#doExecute 來生成 RDD,如下:

可以看到,關鍵的部分就是根據 child.execute() 生成的 RDD 的每一個 partition 的迭代器轉化生成一個新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個 partition。由於 TungstenAggregationIterator 涉及內容非常多,我們單開一大節來進行介紹。

此迭代器:

註:UnsafeKVExternalSorter 的實現可以參考:

UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實現,由原始內存(byte array)而不是 Java 對象支持,由三個區域組成:

使用 UnsafeRow 的收益:

構造函數的主要流程已在上圖中說明,需要注意的是:當內存不液宴足時(畢竟每銷慎個 grouping 對應的 agg buffer 直接佔用內存,如果 grouping 非常多,或者 agg buffer 較大,容易出現內存用盡)會從 hash based aggregate 切換為 sort based aggregate(會 spill 數據到磁碟),後文會進行詳述。先來鬧斗銀看看最關鍵的 processInputs 方法的實現

上圖中,需要注意的是:hashMap 中 get 一個 groupingKey 對應的 agg buffer 時,若已經存在該 buffer 則直接返回;若不存在,嘗試申請內存新建一個:

上圖中,用於真正處理一條 row 的 AggregationIterator#processRow 還需進一步展開分析。在此之前,我們先來看看 AggregateFunction 的分類

AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類,具體的聚合函數均為這兩類的子類。

DeclarativeAggregate 是一類直接由 Catalyst 中的 Expressions 構成的聚合函數,主要邏輯通過調用 4 個表達式完成,分別是:

我們再次以容易理解的 Count 來舉例說明:

通常來講,實現一個基於 Expressions 的 DeclarativeAggregate 函數包含以下幾個重要的組成部分:

再來看看 AggregationIterator#processRow

AggregationIterator#processRow 會調用

生成用於處理一行數據(row)的函數

說白了 processRow 生成了函數才是直接用來接受一條 input row 來更新對應的 agg buffer,具體是根據 mode 及 aggExpression 中的 aggFunction 的類型調用其 updateExpressions 或 mergeExpressions 方法:

比如,對於 aggFunction 為 DeclarativeAggregate 類型的 Partial 下的 Count 來說就是調用其 updateExpressions 方法,即:

對於 Final 的 Count 來說就是調用其 mergeExpressions 方法,即:

對於 aggFunction 為 ImperativeAggregate 類型的 Partial 下的 Collect 來說就是調用其 update 方法,即:

對於 Final 的 Collect 來說就是調用其 merge 方法,即:

我們都知道,讀取一個迭代器的數據,是要不斷調用 hasNext 方法進行 check 是否還有數據,當該方法返回 true 的時候再調用 next 方法取得下一條數據。所以要知道如何讀取 TungstenAggregationIterator 的數據,就得分析其這兩個方法。

分為兩種情況,分別是:

Agg 的實現確實復雜,本文雖然篇幅已經很長,但還有很多方面沒有 cover 到,但基本最核心、最復雜的點都詳細介紹了,如果對於未 cover 的部分有興趣,請自行閱讀源碼進行分析~

『陸』 源碼級解讀如何解決Spark-sql讀取hive分區表執行效率低問題

問題描述

在開發過程中使用spark去讀取hive分區表的過程中(或者使用hive on spark、nodepad開發工具),部分開發人員未注意添加分區屬性過濾導致在執行過程中載入了全量數據,引起任務執行效率低、首臘磁碟IO大量禪芹鍵損耗等問題。

解決辦法

1、自定義規則CheckPartitionTable類,實現Rule,通過以下方式創建SparkSession。

2、自定義規則CheckPartitionTable類,實現Rule,將規則類追加至Optimizer.batches: Seq[Batch]中,如下。

規則內容實現

1、CheckPartitionTable規則執行類,需要通過引入sparkSession從而獲取到引入conf;需要繼承Rule[LogicalPlan];

2、通過splitPredicates方法,分離分區謂詞,得到分區賀巧謂詞表達式。在sql解析過程中將謂詞解析為TreeNode,此處採用遞歸的方式獲取分區謂詞。

3、判斷是否是分區表,且是否添加分區欄位。

4、實現Rule的apply方法

大數據和雲計算的關系

大數據JUC面試題

大數據之Kafka集群部署

大數據logstsh架構

大數據技術kafka的零拷貝

『柒』 Spark源碼分析之SparkSubmit的流程

本文主要對SparkSubmit的任務提交流程源碼進行分析。 Spark源碼版本為2.3.1。

首先閱讀一下啟動腳本,看看首先載入的是哪個類,我們看一下 spark-submit 啟動腳本中的具體內容。

可以看到這里載入的類是org.apache.spark.deploy.SparkSubmit,並且把啟動相關的參數也帶過去了。下面我們跟一下源碼看看整個流程是如何運作的...

SparkSubmit的main方法如下

這里我們由於我們是提交作業,所有會走上面的submit(appArgs, uninitLog)方法

可以看到submit方法首先會准備任務提交的環境,調用了prepareSubmitEnvironment,該方法會返回四元組,該方法中會調用doPrepareSubmitEnvironment,這里我們重點注意 childMainClass類具體是什麼 ,因為這里涉及到後面啟動我們主類的過程。

以下是doPrepareSubmitEnvironment方法的源碼...

可以看到該方法首先是解析相關的參數,如jar包,mainClass的全限定名,系統配置,校驗一些參數,等等,之後的關鍵點就是根據我們 deploy-mode 參數來判斷是如何運行我們的mainClass,這里主要是通過childMainClass這個參數來決定下一步首先啟動哪個類。

childMainClass根據部署模型有不同的值:

之後該方法會把准備好的四元組返回,我們接著看之前的submit方法

可以看到這里最終會調用doRunMain()方法去進行下一步。

doRunMain的實現如下...

doRunMain方法中會判斷是否需要一個代理用戶,然後無論需不需要都會執行runMain方法,我們接下來看看runMain方法是如何實現的。

這里我們只假設以集群模式啟動,首先會載入類,將我們的childMainClass載入為位元組碼對象mainClass ,然後將mainClass 映射成SparkApplication對象,因為我們以集群模式啟動,那麼上一步返回四元組中的childMainClass的參數為ClientApp的全限定名,而這里會調用app實例的start方法因此,這里最終調用的是ClientApp的start方法。

ClientApp的start方法如下...

可以看到這里和之前我們的master啟動流程有些相似。
可以參考我上一篇文章 Spark源碼分析之Master的啟動流程 對這一流程加深理解。

首先是准備rpcEnv環境,之後通過master的地址獲取masterEndpoints端點相關信息,因為這里運行start方法時會將之前配置的相關參數都傳進來,之後就會通過rpcEnv注冊相關clientEndPoint端點信息,同時需要注意,這里會把masterEndpoints端點信息也作為構造ClientEndpoint端點的參數,也就是說這個ClientEndpoint會和masterEndpoints通信。

而在我上一篇文章中說過,只要是setupEndpoint方法被調用,一定會調用相關端點的的onStart方法,而這會調用clientEndPoint的onStart方法。

ClientEndPoint類中的onStart方法會匹配launch事件。源碼如下

onStart中匹配我們的launch的過程,這個過程是啟動driverWrapper的過程,可以看到上面源碼中封裝了mainClass ,該參數對應DriverWrapper類的全限定名,之後將mainClass封裝到command中,然後封裝到driverDescription中,向Master申請啟動Driver。

這個過程會向Mster發送消息,是通過rpcEnv來實現發射消息的,而這里就涉及到outbox信箱,會調用postToOutbox方法,向outbox信箱中添加消息,然後通過TransportClient的send或sendRpc方法發送消息。發件箱以及發送過程是在同一個線程中進行。

而細心的同學會注意到這里調用的方法名為SendToMasterAndForwardReply,見名之意,發送消息到master並且期待回應。

下面是rpcEnv來實現向遠端發送消息的一個調用流程,最終會通過netty中的TransportClient來寫出。

之後,Master端會觸發receiveAndReply函數,匹配RequestSubmitDriver樣例類,完成模式匹配執行後續流程。

可以看到這里首先將Driver信息封裝成DriverInfo,然後添加待調度列表waitingDrivers中,然後調用通用的schele函數。

由於waitingDrivers不為空,則會走LaunchDriver的流程,當前的application申請資源,這時會向worker發送消息,觸發Worker的receive方法。

Worker的receive方法中,當Worker遇到LaunchDriver指令時,創建並啟動一個DriverRunner,DriverRunner啟動一個線程,非同步的處理Driver啟動工作。這里說啟動的Driver就是剛才說的org.apache.spark.deploy.worker.DriverWrapper

可以看到上面在DriverRunner中是開辟線程非同步的處理Driver啟動工作,不會阻塞主進程的執行,而prepareAndRunDriver方法中最終調用 runDriver..

runDriver中主要先做了一些初始化工作,接著就開始啟動driver了。

上述Driver啟動工作主要分為以下幾步:

下面我們直接看DriverWrapper的實現

DriverWrapper,會創建了一個RpcEndpoint與RpcEnv,RpcEndpoint為WorkerWatcher,主要目的為監控Worker節點是否正常,如果出現異常就直接退出,然後當前的ClassLoader載入userJar,同時執行userMainClass,在執行用戶的main方法後關閉workerWatcher。

以上就是SparkSubmit的流程,下一篇我會對SparkContext的源碼進行解析。

歡迎關注...

『捌』 怎麼用Eclipse搭建Spark源碼閱讀環境

第一部分、軟體安裝

1、 安裝JDK (版本為1.7.0_11)

2、 安裝Scala (版本為2.11.2)

3、 安裝ScalaIDE(版本為3.0.4)

第二部分:加壓縮官網下載的源代碼包或者找到通過Git抽取的Spark源文件:

我用的是spark-1.1.1版本(最新版本),由於idea 13已經原生支持sbt,所以無須為idea安裝sbt插件。

源碼下載(用git工具):

# Masterdevelopment branch

gitclone git://github.com/apache/spark.git

# 1.1 maintenancebranch with stability fixes on top of Spark 1.1.1

gitclone git://github.com/apache/spark.git -b branch-1.1

源碼更新(用git工具同步跟新源碼):

gitclone https://github.com/apache/spark.git

第三部分:通過sbt工具,構建Scala的Eclipse工程,詳細步驟如下所示

1、通過cmd命令進入DOS界面,之後通過cd命令進入源代碼項目中,我下載的Spark.1.1.1版本的源代碼放在(E:\Spark計算框架的研究\spark_1_1_1_eclipse)文件夾中,之後運行sbt命令,如下所示:

2、運行sbt命令之後,解析編譯相關的jar包,並出現sbt命令界面窗口,出現的效果圖如下所示,之後運行eclipse命令,sbt對這個工程進行編譯,構建Eclipse項目,效果圖如下所示:

4、 打開ScalaIDE工具,File à Import à Existing Projects into Workspace à
Next à
選擇剛好用sbt工具編譯好的Eclipse工程(E:\Spark計算框架的研究\spark_1_1_1_eclipse),如下圖所示。

5、 通過上面的操作,就可以將通過sbt工具編譯生成的Eclipse項目導入到EclipseIDE開發環境中,效果圖如下所示:

錯誤提示如下所示:我導入的包為,如下文件夾中所示。

(E:\Spark計算框架的研究\spark_1_1_1_eclipse\lib_managed\bundles)

Description Resource Path Location Type

akka-remote_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled

in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-slf4j_2.10-2.2.3-shaded-protobuf.jar is cross-compiled with

an incompatible version of Scala (2.10). In case of errorneous report,

this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-testkit_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-zeromq_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

上面這些包兼容性問題還沒有解決,修改相應的jar包就可以解決。

『玖』 spark源碼的checkpoint部分在哪

Spark源碼是有Scala語言寫好塵成的,目前,IDEA對Scala的支持要比eclipse要好,大多數人會選在在IDEA上完成Spark平台應用的開發。因此神團,Spark源碼閱讀的IDE理友瞎禪所當然的選擇了IDEA。

『拾』 spark sql 2.3 源碼解讀 - Execute (7)

終於到了最後一步執行了:

最關鍵的兩個函數便是 doPrepare和 doExecute了。

還是以上一章的sql語句為例,其最終生成的sparkplan為:

看一下SortExec的doPrepare 和 doExecute方法:

下面看child也就是ShuffleExchangeExec:

先看沒有exchangeCoordinator的情況,

首先執行:

上面的方法會返回一個ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它決定了每一條InternalRow shuffle後的partition id:

接下來:

返回結果是ShuffledRowRDD:

CoalescedPartitioner的邏輯:

再看有exchangeCoordinator的情況:

同樣返回的是ShuffledRowRDD:

再看doEstimationIfNecessary:

estimatePartitionStartIndices 函數得到了 partitionStartIndices:

有exchangeCoordinator的情況就生成了partitionStartIndices,從而對分區進行了調整。

最後來一個例子:

未開啟exchangeCoordinator的plan:

開啟exchangeCoordinator的plan:

不同之處是 兩個Exchange都帶了coordinator,且都是同一個coordinator。

執行withExchangeCoordinator前:

執行withExchangeCoordinator後:

生成了coordinator,且執行了 doPrepare後,可以看到兩個exchange都向其注冊了。

doExecute後:

原先的numPartitions是200,經過執行後,生成的partitionStartIndices為[1],也就是只有1個partition,顯然在測試數據量很小的情況下,1個partition是更為合理的。這就是ExchangeCoordinator的功勞。

execute 最終的輸出是rdd,剩下的結果便是spark對rdd的運算了。其實 spark sql 最終的目標便也是生成rdd,交給spark core來運算。

spark sql的介紹到這里就結束了。

閱讀全文

與spark源碼導讀相關的資料

熱點內容
五菱宏光空調壓縮機 瀏覽:64
為什麼app佔用幾百兆 瀏覽:676
自動解壓失敗叫我聯系客服 瀏覽:482
易語言新手源碼 瀏覽:456
oa伺服器必須有固定ip地址 瀏覽:42
傳奇源碼分析是什麼 瀏覽:267
解放壓縮機支架 瀏覽:255
程序員禿頂搞笑相遇 瀏覽:6
IBM手機app商店叫什麼名字 瀏覽:834
jpeg壓縮質量 瀏覽:774
雲伺服器評測對比 瀏覽:145
java日期轉string 瀏覽:221
openfire源碼編譯 瀏覽:897
在線小工具箱引流網站源碼 瀏覽:337
非科班程序員自學 瀏覽:801
壓縮泡沫鞋底底材 瀏覽:219
程序員職場第一課2正確的溝通 瀏覽:679
遇到不合法app應該怎麼辦 瀏覽:91
匯編程序編譯後的文件 瀏覽:81
大智慧均線源碼 瀏覽:374