導航:首頁 > 源碼編譯 > flink源碼分析

flink源碼分析

發布時間:2025-03-22 19:57:22

Ⅰ 十一、flink源碼解析-創建和啟動TaskManager【一】

在深入探討Flink源碼以解析創建和啟動TaskManager的過程之前,我們先簡要回顧上一節的內容。上一節講解了Job的執行和調度流程,其中,當所有slot准備就緒後,JobMaster中的slotPool會向ResourceManager(RM)請求這些slot。RM在內部的slotManager中查找可用slot,若無,RM會啟動一個容器並在此容器中啟動TaskExecutor(即TaskManager進程),以承載任務執行。本節將專注於TaskManager進程的創建和啟動流程,具體從YarnResourceManager的startTaskExecutorInContainer方法出發。



在開始之前,有必要了解一些預備知識,包括java動態代理、AKKA通信原理、部分Scala語言、以及CompletableFuture的非同步編程



TaskManager(亦稱worker)主要負責執行作業流中的任務,同時負責緩存和交換數據流。一個TaskManager始終至少需要存在,並且任務槽(task slot)是資源調度的最小單位,表示並發處理任務的數量。一個task slot中可以執行多個運算元(請參考Tasks和運算元鏈)。



對於分布式執行,Flink會將運算元的subtasks鏈接成任務(tasks)。每個任務由一個線程執行。鏈接運算元成任務是一個優化策略,能減少線程間切換、緩沖開銷,同時在不增加延遲的情況下提升整體吞吐量。鏈接行為是可以配置的;請參考鏈接文檔獲取詳細信息。



本節的目標在於,了解如何在容器中啟動TaskExecutor。



在容器中啟動TaskExecutor



內部生成了TaskExecutor的啟動命令,執行如下內容:在JAVA_HOME/bin目錄下,通過java命令運行包含配置參數的類org.apache.flink.yarn.YarnTaskExecutorRunner。這些參數包括內存配置、日誌路徑、RPC地址等。



入口類為org.apache.flink.yarn.YarnTaskExecutorRunner。



主方法解析



執行該類的main方法,這是啟動TaskExecutor容器的核心步驟。



創建TaskManagerRunner



在這一階段,創建了多線程環境,確保線程數量與CPU核心數相匹配,為高可用服務的構建打下基礎。



創建高可用服務

構建服務,確保在集群中提供穩定、可靠的訪問。



創建RpcService

構建RpcService,與JobManager中所使用的機制相呼應。



創建心跳服務

實現心跳機制,保證TaskManager與ResourceManager之間的通信。



創建blobCacheService

提供數據緩存服務,用於數據的高效存儲和訪問。



創建外部資源信息提供者

負責管理並提供外部資源信息,包括集群資源、網路配置等。



創建TaskExecutor

通過一系列初始化步驟,構建TaskExecutor,包括:



創建taskManagerServices服務

確保臨時目錄可用,初始化TaskEventDispatcher、ioManager、shuffleEnvironment、kvStateService等關鍵組件。



啟動shuffleEnvironment

啟動shuffleEnvironment,提供交換shuffle數據的埠。



啟動kvStateService

用於注冊kvState,支持鍵值狀態的管理。



創建UnresolvedTaskManagerLocation

創建並啟動服務,用於處理未解構的任務管理器位置信息。



創建BroadcastVariableManager

管理廣播變數,支持高效數據廣播。



創建taskSlotTable

管理task slot的分配和使用。



創建JobTable

維護Job的執行狀態和相關信息。



創建JobLeaderService

確保Job的領導者角色分配和狀態管理。



創建本地狀態根目錄和文件

為TaskExecutor創建本地存儲根目錄和文件,用於存儲本地狀態。



創建taskStateManager

用於存儲和管理TaskExecutor的本地狀態。



創建LibraryCacheManager

管理庫緩存,優化類載入性能。



啟動RPC伺服器

配置endpoint為TaskExecutor,啟動RPC伺服器,實現與外部系統的通信。



創建主線程執行器

啟動執行任務的主線程。



創建jobManagerHeartbeatManager

管理與JobManager的心跳通信,確保任務執行狀態的更新。



創建

管理與ResourceManager的心跳通信,監控資源使用情況。



啟動TaskManagerRunner



完成初始化和配置後,TaskManagerRunner向RPC端點發送啟動消息,啟動消息被接收並處理,最終開始執行。



至此,TaskManager的創建和啟動過程解析完畢。下一節,我們將深入探討TaskExecutor內部的onStart方法,繼續Flink源碼的探索之旅。

Ⅱ Flink源碼分析——Checkpoint源碼分析(二)

《Flink Checkpoint源碼分析》系列文章深入探討了Flink的Checkpoint機制,本文聚焦於Task內部狀態數據的存儲過程,深入剖析狀態數據的具體存儲方式。

Flink的Checkpoint核心邏輯被封裝在`snapshotStrategy.snapshot()`方法中,這一過程主要由`HeapSnapshotStrategy`實現。在進行狀態數據的快照操作時,首先對狀態數據進行拷貝,這里採取的是引用拷貝而非實例拷貝,速度快且佔用內存較少。拷貝後的狀態數據被寫入到一個臨時的`CheckpointStateOutputStream`,即`$CHECKPOINT_DIR/$UID/chk-n`格式的目錄,這個並非最終數據存儲位置。

在拷貝和初始化輸出流後,`AsyncSnapshotCallable`被創建,其`callInternal()`方法中負責將狀態數據持久化至磁碟。這個過程分為幾個關鍵步驟:


  1. 獲取`CheckpointStateOutputStream`,寫入狀態數據元數據,如狀態名、序列化類型等。

  2. 對狀態數據按`keyGroupId`進行分組,依次將每個`keyGroupId`對應的狀態數據寫入文件。

  3. 封裝狀態數據的元數據信息,包括存儲路徑和大小,以及每個`keyGroupId`在文件中的偏移位置。

在分組過程中,狀態數據首先被扁平化並添加到`partitioningSource[]`中,同時記錄每個元素對應的`keyGroupId`在`counterHistogram[]`中的位置。構建直方圖後,數據依據`keyGroupId`進行排序並寫入文件,同時將偏移位置記錄在`keyGroupOffsets[]`中。

具體實現細節中,`FsCheckpointStateOutputStream`用於創建文件系統輸出流,配置包括基路徑、文件系統類型、緩沖大小、文件狀態閾值等。`StreamStateHandle`最終封裝了狀態數據的存儲文件路徑和大小信息,而`KeyedStateHandle`進一步包含`StreamStateHandle`和`keyGroupRangeOffsets`,後者記錄了每個`keyGroupId`在文件中的存儲位置,以供狀態數據檢索使用。

簡而言之,Flink在執行Checkpoint時,通過一系列精心設計的步驟,確保了狀態數據的高效、安全存儲。從狀態數據的拷貝到元數據的寫入,再到狀態數據的持久化,每一個環節都充分考慮了性能和數據完整性的需求,使得Flink的實時計算能力得以充分發揮。

閱讀全文

與flink源碼分析相關的資料

熱點內容
有什麼看漫畫的網站或者app 瀏覽:495
通達信底部形成副圖源碼 瀏覽:352
java運行命令行參數 瀏覽:164
盲盒小程序源碼下載 瀏覽:498
視頻如何刪除加密的 瀏覽:414
php點菜系統源碼 瀏覽:800
java線程傳參 瀏覽:527
Java編程全能詞典 瀏覽:72
javasdk64位 瀏覽:370
全隨文pdf 瀏覽:601
我是貓pdf 瀏覽:594
通達信大單主力線源碼 瀏覽:153
目前最火的編程語言 瀏覽:464
伺服器購買雲硬碟 瀏覽:64
python3安卓 瀏覽:353
雲伺服器節點ip 瀏覽:53
手機用久後很多無用的文件夾 瀏覽:319
為什麼江西人保app登錄不了 瀏覽:685
寫一個遞歸演算法 瀏覽:525
找書pdf版 瀏覽:94