❶ 大家對spark的源碼了解多少,sparkshuffle,調度,sparkstreaming的源碼
流(Streaming),在大數據時代為數據流處理,就像水流一樣,是數據流;既然是數據流處理,就會想到數據的流入、數據的加工、數據的流出。
日常工作、生活中數據來源很多不同的地方。例如:工業時代的汽車製造、監控設備、工業設備會產生很多源數據;信息時代的電商網站、日誌伺服器、社交網路、金融交易系統、黑客攻擊、垃圾郵件、交通監控等;通信時代的手機、平板、智能設備、物聯網等會產生很多實時數據,數據流無處不在。
在大數據時代SparkStreaming能做什麼?
平時用戶都有網上購物的經歷,用戶在網站上進行的各種操作通過Spark Streaming流處理技術可以被監控,用戶的購買愛好、關注度、交易等可以進行行為分析。在金融領域,通過Spark Streaming流處理技術可以對交易量很大的賬號進行監控,防止罪犯洗錢、財產轉移、防欺詐等。在網路安全性方面,黑客攻擊時有發生,通過Spark Streaming流處理技術可以將某類可疑IP進行監控並結合機器學習訓練模型匹配出當前請求是否屬於黑客攻擊。其他方面,如:垃圾郵件監控過濾、交通監控、網路監控、工業設備監控的背後都是Spark Streaming發揮強大流處理的地方。
大數據時代,數據價值一般怎麼定義?
所有沒經過流處理的數據都是無效數據或沒有價值的數據;數據產生之後立即處理產生的價值是最大的,數據放置越久或越滯後其使用價值越低。以前絕大多數電商網站盈利走的是網路流量(即用戶的訪問量),如今,電商網站不僅僅需要關注流量、交易量,更重要的是要通過數據流技術讓電商網站的各種數據流動起來,通過實時流動的數據及時分析、挖掘出各種有價值的數據;比如:對不同交易量的用戶指定用戶畫像,從而提供不同服務質量;准對用戶訪問電商網站板塊愛好及時推薦相關的信息。
SparkStreaming VSHadoopMR:
Spark Streaming是一個准實時流處理框架,而Hadoop MR是一個離線、批處理框架;很顯然,在數據的價值性角度,Spark Streaming完勝於Hadoop MR。
SparkStreaming VS Storm:
Spark Streaming是一個准實時流處理框架,處理響應時間一般以分鍾為單位,也就是說處理實時數據的延遲時間是秒級別的;Storm是一個實時流處理框架,處理響應是毫秒級的。所以在流框架選型方面要看具體業務場景。需要澄清的是現在很多人認為Spark Streaming流處理運行不穩定、數據丟失、事務性支持不好等等,那是因為很多人不會駕馭Spark Streaming及Spark本身。在Spark Streaming流處理的延遲時間方面,Spark定製版本,會將Spark Streaming的延遲從秒級別推進到100毫秒之內甚至更少。
SparkStreaming優點:
1、提供了豐富的API,企業中能快速實現各種復雜的業務邏輯。
2、流入Spark Streaming的數據流通過和機器學習演算法結合,完成機器模擬和圖計算。
3、Spark Streaming基於Spark優秀的血統。
SparkStreaming能不能像Storm一樣,一條一條處理數據?
Storm處理數據的方式是以條為單位來一條一條處理的,而Spark Streaming基於單位時間處理數據的,SparkStreaming能不能像Storm一樣呢?答案是:可以的。
業界一般的做法是Spark Streaming和Kafka搭檔即可達到這種效果,入下圖:
總結:
使用Spark Streaming可以處理各種數據來源類型,如:資料庫、HDFS,伺服器log日誌、網路流,其強大超越了你想像不到的場景,只是很多時候大家不會用,其真正原因是對Spark、spark streaming本身不了解。
❷ spark sql 2.3 源碼解讀 - Execute (7)
終於到了最後一步執行了:
最關鍵的兩個函數便是 doPrepare和 doExecute了。
還是以上一章的sql語句為例,其最終生成的sparkplan為:
看一下SortExec的doPrepare 和 doExecute方法:
下面看child也就是ShuffleExchangeExec:
先看沒有exchangeCoordinator的情況,
首先執行:
上面的方法會返回一個ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它決定了每一條InternalRow shuffle後的partition id:
接下來:
返回結果是ShuffledRowRDD:
CoalescedPartitioner的邏輯:
再看有exchangeCoordinator的情況:
同樣返回的是ShuffledRowRDD:
再看doEstimationIfNecessary:
estimatePartitionStartIndices 函數得到了 partitionStartIndices:
有exchangeCoordinator的情況就生成了partitionStartIndices,從而對分區進行了調整。
最後來一個例子:
未開啟exchangeCoordinator的plan:
開啟exchangeCoordinator的plan:
不同之處是 兩個Exchange都帶了coordinator,且都是同一個coordinator。
執行withExchangeCoordinator前:
執行withExchangeCoordinator後:
生成了coordinator,且執行了 doPrepare後,可以看到兩個exchange都向其注冊了。
doExecute後:
原先的numPartitions是200,經過執行後,生成的partitionStartIndices為[1],也就是只有1個partition,顯然在測試數據量很小的情況下,1個partition是更為合理的。這就是ExchangeCoordinator的功勞。
execute 最終的輸出是rdd,剩下的結果便是spark對rdd的運算了。其實 spark sql 最終的目標便也是生成rdd,交給spark core來運算。
spark sql的介紹到這里就結束了。
❸ Spark源碼分析之SparkSubmit的流程
本文主要對SparkSubmit的任務提交流程源碼進行分析。 Spark源碼版本為2.3.1。
首先閱讀一下啟動腳本,看看首先載入的是哪個類,我們看一下 spark-submit 啟動腳本中的具體內容。
可以看到這里載入的類是org.apache.spark.deploy.SparkSubmit,並且把啟動相關的參數也帶過去了。下面我們跟一下源碼看看整個流程是如何運作的...
SparkSubmit的main方法如下
這里我們由於我們是提交作業,所有會走上面的submit(appArgs, uninitLog)方法
可以看到submit方法首先會准備任務提交的環境,調用了prepareSubmitEnvironment,該方法會返回四元組,該方法中會調用doPrepareSubmitEnvironment,這里我們重點注意 childMainClass類具體是什麼 ,因為這里涉及到後面啟動我們主類的過程。
以下是doPrepareSubmitEnvironment方法的源碼...
可以看到該方法首先是解析相關的參數,如jar包,mainClass的全限定名,系統配置,校驗一些參數,等等,之後的關鍵點就是根據我們 deploy-mode 參數來判斷是如何運行我們的mainClass,這里主要是通過childMainClass這個參數來決定下一步首先啟動哪個類。
childMainClass根據部署模型有不同的值:
之後該方法會把准備好的四元組返回,我們接著看之前的submit方法
可以看到這里最終會調用doRunMain()方法去進行下一步。
doRunMain的實現如下...
doRunMain方法中會判斷是否需要一個代理用戶,然後無論需不需要都會執行runMain方法,我們接下來看看runMain方法是如何實現的。
這里我們只假設以集群模式啟動,首先會載入類,將我們的childMainClass載入為位元組碼對象mainClass ,然後將mainClass 映射成SparkApplication對象,因為我們以集群模式啟動,那麼上一步返回四元組中的childMainClass的參數為ClientApp的全限定名,而這里會調用app實例的start方法因此,這里最終調用的是ClientApp的start方法。
ClientApp的start方法如下...
可以看到這里和之前我們的master啟動流程有些相似。
可以參考我上一篇文章 Spark源碼分析之Master的啟動流程 對這一流程加深理解。
首先是准備rpcEnv環境,之後通過master的地址獲取masterEndpoints端點相關信息,因為這里運行start方法時會將之前配置的相關參數都傳進來,之後就會通過rpcEnv注冊相關clientEndPoint端點信息,同時需要注意,這里會把masterEndpoints端點信息也作為構造ClientEndpoint端點的參數,也就是說這個ClientEndpoint會和masterEndpoints通信。
而在我上一篇文章中說過,只要是setupEndpoint方法被調用,一定會調用相關端點的的onStart方法,而這會調用clientEndPoint的onStart方法。
ClientEndPoint類中的onStart方法會匹配launch事件。源碼如下
onStart中匹配我們的launch的過程,這個過程是啟動driverWrapper的過程,可以看到上面源碼中封裝了mainClass ,該參數對應DriverWrapper類的全限定名,之後將mainClass封裝到command中,然後封裝到driverDescription中,向Master申請啟動Driver。
這個過程會向Mster發送消息,是通過rpcEnv來實現發射消息的,而這里就涉及到outbox信箱,會調用postToOutbox方法,向outbox信箱中添加消息,然後通過TransportClient的send或sendRpc方法發送消息。發件箱以及發送過程是在同一個線程中進行。
而細心的同學會注意到這里調用的方法名為SendToMasterAndForwardReply,見名之意,發送消息到master並且期待回應。
下面是rpcEnv來實現向遠端發送消息的一個調用流程,最終會通過netty中的TransportClient來寫出。
之後,Master端會觸發receiveAndReply函數,匹配RequestSubmitDriver樣例類,完成模式匹配執行後續流程。
可以看到這里首先將Driver信息封裝成DriverInfo,然後添加待調度列表waitingDrivers中,然後調用通用的schele函數。
由於waitingDrivers不為空,則會走LaunchDriver的流程,當前的application申請資源,這時會向worker發送消息,觸發Worker的receive方法。
Worker的receive方法中,當Worker遇到LaunchDriver指令時,創建並啟動一個DriverRunner,DriverRunner啟動一個線程,非同步的處理Driver啟動工作。這里說啟動的Driver就是剛才說的org.apache.spark.deploy.worker.DriverWrapper
可以看到上面在DriverRunner中是開辟線程非同步的處理Driver啟動工作,不會阻塞主進程的執行,而prepareAndRunDriver方法中最終調用 runDriver..
runDriver中主要先做了一些初始化工作,接著就開始啟動driver了。
上述Driver啟動工作主要分為以下幾步:
下面我們直接看DriverWrapper的實現
DriverWrapper,會創建了一個RpcEndpoint與RpcEnv,RpcEndpoint為WorkerWatcher,主要目的為監控Worker節點是否正常,如果出現異常就直接退出,然後當前的ClassLoader載入userJar,同時執行userMainClass,在執行用戶的main方法後關閉workerWatcher。
以上就是SparkSubmit的流程,下一篇我會對SparkContext的源碼進行解析。
歡迎關注...