Ⅰ 十一、flink源碼解析-創建和啟動TaskManager【一】
在深入探討Flink源碼以解析創建和啟動TaskManager的過程之前,我們先簡要回顧上一節的內容。上一節講解了Job的執行和調度流程,其中,當所有slot准備就緒後,JobMaster中的slotPool會向ResourceManager(RM)請求這些slot。RM在內部的slotManager中查找可用slot,若無,RM會啟動一個容器並在此容器中啟動TaskExecutor(即TaskManager進程),以承載任務執行。本節將專注於TaskManager進程的創建和啟動流程,具體從YarnResourceManager的startTaskExecutorInContainer方法出發。
在開始之前,有必要了解一些預備知識,包括java動態代理、AKKA通信原理、部分Scala語言、以及CompletableFuture的非同步編程。
TaskManager(亦稱worker)主要負責執行作業流中的任務,同時負責緩存和交換數據流。一個TaskManager始終至少需要存在,並且任務槽(task slot)是資源調度的最小單位,表示並發處理任務的數量。一個task slot中可以執行多個運算元(請參考Tasks和運算元鏈)。
對於分布式執行,Flink會將運算元的subtasks鏈接成任務(tasks)。每個任務由一個線程執行。鏈接運算元成任務是一個優化策略,能減少線程間切換、緩沖開銷,同時在不增加延遲的情況下提升整體吞吐量。鏈接行為是可以配置的;請參考鏈接文檔獲取詳細信息。
本節的目標在於,了解如何在容器中啟動TaskExecutor。
內部生成了TaskExecutor的啟動命令,執行如下內容:在JAVA_HOME/bin目錄下,通過java命令運行包含配置參數的類org.apache.flink.yarn.YarnTaskExecutorRunner。這些參數包括內存配置、日誌路徑、RPC地址等。
入口類為org.apache.flink.yarn.YarnTaskExecutorRunner。
執行該類的main方法,這是啟動TaskExecutor容器的核心步驟。
在這一階段,創建了多線程環境,確保線程數量與CPU核心數相匹配,為高可用服務的構建打下基礎。
構建服務,確保在集群中提供穩定、可靠的訪問。
構建RpcService,與JobManager中所使用的機制相呼應。
實現心跳機制,保證TaskManager與ResourceManager之間的通信。
提供數據緩存服務,用於數據的高效存儲和訪問。
負責管理並提供外部資源信息,包括集群資源、網路配置等。
通過一系列初始化步驟,構建TaskExecutor,包括:
確保臨時目錄可用,初始化TaskEventDispatcher、ioManager、shuffleEnvironment、kvStateService等關鍵組件。
啟動shuffleEnvironment,提供交換shuffle數據的埠。
用於注冊kvState,支持鍵值狀態的管理。
創建並啟動服務,用於處理未解構的任務管理器位置信息。
管理廣播變數,支持高效數據廣播。
管理task slot的分配和使用。
維護Job的執行狀態和相關信息。
確保Job的領導者角色分配和狀態管理。
為TaskExecutor創建本地存儲根目錄和文件,用於存儲本地狀態。
用於存儲和管理TaskExecutor的本地狀態。
管理庫緩存,優化類載入性能。
配置endpoint為TaskExecutor,啟動RPC伺服器,實現與外部系統的通信。
啟動執行任務的主線程。
管理與JobManager的心跳通信,確保任務執行狀態的更新。
管理與ResourceManager的心跳通信,監控資源使用情況。
完成初始化和配置後,TaskManagerRunner向RPC端點發送啟動消息,啟動消息被接收並處理,最終開始執行。
至此,TaskManager的創建和啟動過程解析完畢。下一節,我們將深入探討TaskExecutor內部的onStart方法,繼續Flink源碼的探索之旅。
Ⅱ Flink源碼分析——Checkpoint源碼分析(二)
《Flink Checkpoint源碼分析》系列文章深入探討了Flink的Checkpoint機制,本文聚焦於Task內部狀態數據的存儲過程,深入剖析狀態數據的具體存儲方式。
Flink的Checkpoint核心邏輯被封裝在`snapshotStrategy.snapshot()`方法中,這一過程主要由`HeapSnapshotStrategy`實現。在進行狀態數據的快照操作時,首先對狀態數據進行拷貝,這里採取的是引用拷貝而非實例拷貝,速度快且佔用內存較少。拷貝後的狀態數據被寫入到一個臨時的`CheckpointStateOutputStream`,即`$CHECKPOINT_DIR/$UID/chk-n`格式的目錄,這個並非最終數據存儲位置。
在拷貝和初始化輸出流後,`AsyncSnapshotCallable`被創建,其`callInternal()`方法中負責將狀態數據持久化至磁碟。這個過程分為幾個關鍵步驟:
在分組過程中,狀態數據首先被扁平化並添加到`partitioningSource[]`中,同時記錄每個元素對應的`keyGroupId`在`counterHistogram[]`中的位置。構建直方圖後,數據依據`keyGroupId`進行排序並寫入文件,同時將偏移位置記錄在`keyGroupOffsets[]`中。
具體實現細節中,`FsCheckpointStateOutputStream`用於創建文件系統輸出流,配置包括基路徑、文件系統類型、緩沖大小、文件狀態閾值等。`StreamStateHandle`最終封裝了狀態數據的存儲文件路徑和大小信息,而`KeyedStateHandle`進一步包含`StreamStateHandle`和`keyGroupRangeOffsets`,後者記錄了每個`keyGroupId`在文件中的存儲位置,以供狀態數據檢索使用。
簡而言之,Flink在執行Checkpoint時,通過一系列精心設計的步驟,確保了狀態數據的高效、安全存儲。從狀態數據的拷貝到元數據的寫入,再到狀態數據的持久化,每一個環節都充分考慮了性能和數據完整性的需求,使得Flink的實時計算能力得以充分發揮。