❶ 怎麼編譯spark的源代碼 windows
1.maven的安裝
到maven官網下載maven,maven依賴於jdk,所以胡爛先確保以安裝jdk( http://maven.apache.org/download.cgi )
解壓到一個目褲肢漏錄下,我選擇的是D:\learning_soft\maven
然後配置環境變數
%M2_HOME%=D:\learning_soft\maven\apache-maven-3.3.9
添加到path路徑 %M2_HOME%bin
驗證安裝成功
修改maven的內存配置
在mvn.cmd或者mvn.bat中找到:
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 11
在其後添加
-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m 11
2.spark編譯
到spark官網下載spark的源碼飢仔,這里選擇的spark-1.3.1
,解壓文件後,修改pom.xml文件
<java.version>1.7</java.version>
<hadoop.version>2.4.1</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<hbase.version>0.98.9-hadoop2</hbase.version>
<zookeeper.version>3.4.6</zookeeper.version>
<derby.version>10.11.1.1</derby.version>123456123456
主要是指定hadoop,hbase的版本
然後在將目錄切換到spark的目錄下
輸入
mvn -Pyarn -Dhadoop.version=2.4.1 -Dyarn.version=2.4.1 -DskipTests clean package
❷ Spark Sql 源碼剖析(二): TreeNode
使用 object CurrentOrigin 為 TreeNodes 提供一個可以查找上下文的地方,比如當前正在解析哪行 code。
object CurrentOrigin 主燃顫含要包含一個 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 僅在 parser 中使用,在 visit 每個節點的時候都會使用,記錄當前 parse 的節點是哪行哪列
另外,從 value 是 ThreadLocal 類型可以看出,在 Spark SQL 中,parse sql 時都是在單獨的 thread 里進行的(不同的 sql 不同的 thread)
返回該節點的 seq of children,children 是不可變的。有三種情況:
查找第一個符合 f 條件(比如某個類型的)的 TreeNode,先序遍歷。
將函數 f 遞歸應用洞猛於節點及其子節點
與 foreach 不同的是,foreach 先應用於 parent,再應用與 child;而 foreachUp 是先應用於 child 再應用與 parent
調用 foreach,foreach 中應用的函數是 ret += f(_) ,最終返回一個 seq,包含將 f 通過 foreach 方式應用於所有節點並 add 到 ret。其中 f 本身是 BaseType => A 類型
原理與 map 一致,只是 f 變成了 BaseType => TraversableOnce[A]
PartialFunction#lift :將 partial func 轉換為一個返回 Option 結果的函數。將 pf 函數應用於符合 pf 定義的節點(即 pf.lift(node)返回的 Option 不是 None )並都皮笑 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回
以 Seq 的形式返回 tree 的所有葉子節點
def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因為可能沒有符合 pf 定義的節點,所有返回的 Option 可能是 None
相當於 proctIterator.map(f).toArray ,即對於 proctIterator 每個元素執行 f 然後將 ret 組成一個 arr 返回
注意:TreeNode 沒有實現 Proct 相關方法,都由其子類自行實現
使用 new children 替換並返回該節點的拷貝。該方法會對 proctElement 每個元素進行模式匹配,根據節點類型及一定規則進行替換。
調用 transformDown
rule: PartialFunction[BaseType, BaseType]
返回 f 應用於所有子節點(非遞歸,一般將遞歸操作放在調用該函數的地方)後該節點的 。其內部的原理是調用 mapProctIterator,對每一個 proctElement(i) 進行各種模式匹配,若能匹配上某個再根據一定規則進行轉換,核心匹配轉換如下:
以上都是適用於有 children 的 node,如果是 children 為 null 的 node 直接返回
反射生成節點副本
返回該類型 TreeNode 的 name,默認為 class name;注意,會移除物理操作的 Exec$ 前綴
所有應該以該節點內嵌套樹表示的 nodes,比如,可以被用來表示 sub-queries
(children ++ innerChildren).toSet[TreeNode[_]]
主要用於互動式 debug,返回該 tree 指定下標的節點,num 可以在 numberedTreeString 找到。最終調用的
我的博客即將搬運同步至騰訊雲+社區,邀請大家一同入駐: https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi
❸ spark和java的關系
通常大家只瞎胡是說Spark是基於內存計算的,速度比MapRece要快。或者說內存中迭代計算。其實我們要抓住問題的本質。總結有以下幾點:
1、Spark vs MapRece ≠ 內存 vs 磁碟
其實Spark和MapRece的計算都發生在內存中,區別在於:
MapRece通常需要將計算的中間結果寫入磁碟,然後還要讀取磁碟,從而導致了頻繁的磁碟IO。
Spark則不需要將計算的中間結果寫入磁碟,這得益於Spark的RDD(彈性局高分布式數據集,很強大)和DAG(有向無環圖),其中DAG記錄了job的stage以及在job執行過程中父RDD和子RDD之間的依賴關系。中間結果能夠以RDD的形式存放在內存中,且能夠從DAG中恢復,大大減少了磁碟IO。
2、Spark vs MapRece Shuffle的不同
Spark和MapRece在計算過程中通常都不可避免的會進行Shuffle,兩者至少有一點不同:
MapRece在Shuffle時需要花費大量時間進行排序,排序在MapRece的Shuffle中似乎是不可避免的;
Spark在Shuffle時則只有部分場景才需要排序,支持基於Hash的分布式聚合,更加省時;
3、多進程模型 vs 多線程模型的區別
MapRece採用了多進程模型,而Spark採用了多磨臘攔線程模型。多進程模型的好處是便於細粒度控制每個任務佔用的資源,但每次任務的啟動都會消耗一定的啟動時間。就是說MapRece的Map Task和Rece Task是進程級別的,而Spark Task則是基於線程模型的,就是說maprece 中的 map 和 rece 都是 jvm 進程,每次啟動都需要重新申請資源,消耗了不必要的時間(假設容器啟動時間大概1s,如果有1200個block,那麼單獨啟動map進程事件就需要20分鍾)
Spark則是通過復用線程池中的線程來減少啟動、關閉task所需要的開銷。(多線程模型也有缺點,由於同節點上所有任務運行在一個進程中,因此,會出現嚴重的資源爭用,難以細粒度控制每個任務佔用資源)
總結:關於Spark為什麼比MapRece快,或者Spark速度快於MapRece的原因,總結至少有這幾點不同之處吧。
❹ Spark 中用 Scala 和 java 開發有什麼區別
Scala相對於Java的優勢是巨大的。熟悉Scala之後再看Java代碼,有種讀匯編的感覺…… 如果僅僅是寫Spark應用,並非一定要學Scala,可以直接用Spark的Java API或Python API。但因為語言上的差異,用Java開發Spark應用要羅嗦許多。好在帶lambda的Java 8出來之後有所改善。 在Spark應用開發上,學Scala主要好處有二: 開發效率更高,代碼更精簡; 使用Spark過程中出現異常情況,在排查時如果對Spark源碼比較熟悉,可以事半功倍
❺ Spark源碼分析之SparkSubmit的流程
本文主要對SparkSubmit的任務提交流程源碼進行分析。 Spark源碼版本為2.3.1。
首先閱讀一下啟動腳本,看看首先載入的是哪個類,我們看一下 spark-submit 啟動腳本中的具體內容。
可以看到這里載入的類是org.apache.spark.deploy.SparkSubmit,並且把啟動相關的參數也帶過去了。下面我們跟一下源碼看看整個流程是如何運作的...
SparkSubmit的main方法如下
這里我們由於我們是提交作業,所有會走上面的submit(appArgs, uninitLog)方法
可以看到submit方法首先會准備任務提交的環境,調用了prepareSubmitEnvironment,該方法會返回四元組,該方法中會調用doPrepareSubmitEnvironment,這里我們重點注意 childMainClass類具體是什麼 ,因為這里涉及到後面啟動我們主類的過程。
以下是doPrepareSubmitEnvironment方法的源碼...
可以看到該方法首先是解析相關的參數,如jar包,mainClass的全限定名,系統配置,校驗一些參數,等等,之後的關鍵點就是根據我們 deploy-mode 參數來判斷是如何運行我們的mainClass,這里主要是通過childMainClass這個參數來決定下一步首先啟動哪個類。
childMainClass根據部署模型有不同的值:
之後該方法會把准備好的四元組返回,我們接著看之前的submit方法
可以看到這里最終會調用doRunMain()方法去進行下一步。
doRunMain的實現如下...
doRunMain方法中會判斷是否需要一個代理用戶,然後無論需不需要都會執行runMain方法,我們接下來看看runMain方法是如何實現的。
這里我們只假設以集群模式啟動,首先會載入類,將我們的childMainClass載入為位元組碼對象mainClass ,然後將mainClass 映射成SparkApplication對象,因為我們以集群模式啟動,那麼上一步返回四元組中的childMainClass的參數為ClientApp的全限定名,而這里會調用app實例的start方法因此,這里最終調用的是ClientApp的start方法。
ClientApp的start方法如下...
可以看到這里和之前我們的master啟動流程有些相似。
可以參考我上一篇文章 Spark源碼分析之Master的啟動流程 對這一流程加深理解。
首先是准備rpcEnv環境,之後通過master的地址獲取masterEndpoints端點相關信息,因為這里運行start方法時會將之前配置的相關參數都傳進來,之後就會通過rpcEnv注冊相關clientEndPoint端點信息,同時需要注意,這里會把masterEndpoints端點信息也作為構造ClientEndpoint端點的參數,也就是說這個ClientEndpoint會和masterEndpoints通信。
而在我上一篇文章中說過,只要是setupEndpoint方法被調用,一定會調用相關端點的的onStart方法,而這會調用clientEndPoint的onStart方法。
ClientEndPoint類中的onStart方法會匹配launch事件。源碼如下
onStart中匹配我們的launch的過程,這個過程是啟動driverWrapper的過程,可以看到上面源碼中封裝了mainClass ,該參數對應DriverWrapper類的全限定名,之後將mainClass封裝到command中,然後封裝到driverDescription中,向Master申請啟動Driver。
這個過程會向Mster發送消息,是通過rpcEnv來實現發射消息的,而這里就涉及到outbox信箱,會調用postToOutbox方法,向outbox信箱中添加消息,然後通過TransportClient的send或sendRpc方法發送消息。發件箱以及發送過程是在同一個線程中進行。
而細心的同學會注意到這里調用的方法名為SendToMasterAndForwardReply,見名之意,發送消息到master並且期待回應。
下面是rpcEnv來實現向遠端發送消息的一個調用流程,最終會通過netty中的TransportClient來寫出。
之後,Master端會觸發receiveAndReply函數,匹配RequestSubmitDriver樣例類,完成模式匹配執行後續流程。
可以看到這里首先將Driver信息封裝成DriverInfo,然後添加待調度列表waitingDrivers中,然後調用通用的schele函數。
由於waitingDrivers不為空,則會走LaunchDriver的流程,當前的application申請資源,這時會向worker發送消息,觸發Worker的receive方法。
Worker的receive方法中,當Worker遇到LaunchDriver指令時,創建並啟動一個DriverRunner,DriverRunner啟動一個線程,非同步的處理Driver啟動工作。這里說啟動的Driver就是剛才說的org.apache.spark.deploy.worker.DriverWrapper
可以看到上面在DriverRunner中是開辟線程非同步的處理Driver啟動工作,不會阻塞主進程的執行,而prepareAndRunDriver方法中最終調用 runDriver..
runDriver中主要先做了一些初始化工作,接著就開始啟動driver了。
上述Driver啟動工作主要分為以下幾步:
下面我們直接看DriverWrapper的實現
DriverWrapper,會創建了一個RpcEndpoint與RpcEnv,RpcEndpoint為WorkerWatcher,主要目的為監控Worker節點是否正常,如果出現異常就直接退出,然後當前的ClassLoader載入userJar,同時執行userMainClass,在執行用戶的main方法後關閉workerWatcher。
以上就是SparkSubmit的流程,下一篇我會對SparkContext的源碼進行解析。
歡迎關注...
❻ spark源碼二次開發難嗎
spark源碼二次開發不難。掌握了源碼編譯,就具備了對Spark進行二次開發的基本條件了,要修改Spark源碼,進行二次開發,那麼就得從官網下載指定版本的源碼,導入ide開發環境,進行源碼的修改。接著修改完了。
❼ Linux裡面spark作用是什麼
Spark是通用數據處理引擎,適用於多種情況。 應用程序開發人員和數據科學家將Spark集成到他們的應用程序中,以快速地大規模查詢,分析和轉換數據。 與Spark最頻繁相關的任務包括跨大型數據集的互動式查詢,來自感測器或金融系統的流數據處理以及機器學習任務。
Spark於2009年開始運作,最初是加州大學伯克利分校AMPLab內部的一個項目。 更具體地說,它是出於證明Mesos概念的需要而誕生的,Mesos概念也是在AMPLab中創建的。 在Mesos白皮書《 Mesos:數據中心中的細粒度資源共享平台》中首次討論了Spark,其中最著名的作者是Benjamin Hindman和Matei Zaharia。
2013年,Spark成為Apache Software Foundation的孵化項目,並於2014年初被提升為該基金會的頂級項目之一。 Spark是基金會管理的最活躍的項目之一,圍繞該項目成長的社區包括多產的個人貢獻者和資金雄厚的企業支持者,例如Databricks,IBM和中國的華為。
從一開始,Spark就被優化為在內存中運行。 它比Hadoop的MapRece等替代方法更快地處理數據,後者傾向於在處理的每個階段之間向計算機硬碟寫入數據或從計算機硬碟寫入數據。 Spark的支持者聲稱,Spark在內存中的運行速度可以比Hadoop MapRece快100倍,並且在以類似於Hadoop MapRece本身的方式處理基於磁碟的數據時也可以快10倍。 這種比較並不完全公平,這不僅是因為原始速度對Spark的典型用例而言比對批處理更為重要,在這種情況下,類似於MapRece的解決方案仍然很出色。
❽ spark thrift server 與 網易 kyuubi thrift server
thrift server可以實現通過jdbc, beeline等工具,實現連接到spark集群,並提交sql查詢的機制。
默認情況下,cdh安裝的spark沒有包含thrift server模塊,因此我們需要重新編譯spark。
另外,為了不影響cdh自帶的spark,而且spark目前都是基於yarn運行的,本身也沒有什麼獨立的服務部署(除了history sever)。
所以,在一個集群中,可以部署安裝多個版本的spark。
我們使用源碼編譯的spark 2.4.0(其中hive的版本是1.2.1)
cdh集成的spark版本和Hive版本如下:
使用jdk1.8
修改spark提供的mvn,使用自行安裝的maven 3.8.1
使用make-distribution.sh可以幫助與我們編譯之後打包成tgz文件
修改pom.xml文件的配置如下。
最後,執行編譯命令如下:
這樣打出的包,就含有thrift server的jar包了。
最終打包文件,根目錄下。
之後就是解壓到其他目錄下後即可。
將hive-site.xml的文件連接過來,這樣spark就可以讀取hive的表了。
為了確保spark提交到yarn上運行,需要配置
cp spark-defaults.conf.template spar-defaults.conf
另外,可以在spark-env.sh中設置環境變數。
HADOOP_CONF_DIR
環境變數,也可以在/etc/profile中設置
啟動日誌可以查看,注意下埠佔用問題,如下。
啟動時候,使用beeline工具連接上,主要這里不用使用cdh默認安裝hive提供的beeline工具,應為版本太高。
使用編譯後spark生成beeline工具
參考beeline使用教程。
https://github.com/apache/incubator-kyuubi
kyuubi是基於thrift sever二次開發,在系能和安全上優於thrift server。
鑒於目前hive的版本是2.1,而最新的kyuubi的hive是2.3,所以採用前天版本的kyuubi,採用0.7版本,保證hive的版本小於當前集群中的hive版本。
使用build目錄下的dist腳本進行編譯和打包。
編譯成功後,會在更目錄下出現tar.gz的壓縮文件,如上圖。
之後解壓到目錄下。
配置bin/kyuubi-env.sh腳本,設置spark路徑
執行bin/start-kyuubi.sh命令即可。
訪問的方式同樣採用beelin,注意使用上面章節的beeline工具。
訪問後,可以通過beeline訪問到hive的表(在spark中已經配置了hive-site.xml)
!connect jdbc: hive2://xxxx:10009 即可。