Ⅰ 十一、flink源码解析-创建和启动TaskManager【一】
在深入探讨Flink源码以解析创建和启动TaskManager的过程之前,我们先简要回顾上一节的内容。上一节讲解了Job的执行和调度流程,其中,当所有slot准备就绪后,JobMaster中的slotPool会向ResourceManager(RM)请求这些slot。RM在内部的slotManager中查找可用slot,若无,RM会启动一个容器并在此容器中启动TaskExecutor(即TaskManager进程),以承载任务执行。本节将专注于TaskManager进程的创建和启动流程,具体从YarnResourceManager的startTaskExecutorInContainer方法出发。
在开始之前,有必要了解一些预备知识,包括java动态代理、AKKA通信原理、部分Scala语言、以及CompletableFuture的异步编程。
TaskManager(亦称worker)主要负责执行作业流中的任务,同时负责缓存和交换数据流。一个TaskManager始终至少需要存在,并且任务槽(task slot)是资源调度的最小单位,表示并发处理任务的数量。一个task slot中可以执行多个算子(请参考Tasks和算子链)。
对于分布式执行,Flink会将算子的subtasks链接成任务(tasks)。每个任务由一个线程执行。链接算子成任务是一个优化策略,能减少线程间切换、缓冲开销,同时在不增加延迟的情况下提升整体吞吐量。链接行为是可以配置的;请参考链接文档获取详细信息。
本节的目标在于,了解如何在容器中启动TaskExecutor。
内部生成了TaskExecutor的启动命令,执行如下内容:在JAVA_HOME/bin目录下,通过java命令运行包含配置参数的类org.apache.flink.yarn.YarnTaskExecutorRunner。这些参数包括内存配置、日志路径、RPC地址等。
入口类为org.apache.flink.yarn.YarnTaskExecutorRunner。
执行该类的main方法,这是启动TaskExecutor容器的核心步骤。
在这一阶段,创建了多线程环境,确保线程数量与CPU核心数相匹配,为高可用服务的构建打下基础。
构建服务,确保在集群中提供稳定、可靠的访问。
构建RpcService,与JobManager中所使用的机制相呼应。
实现心跳机制,保证TaskManager与ResourceManager之间的通信。
提供数据缓存服务,用于数据的高效存储和访问。
负责管理并提供外部资源信息,包括集群资源、网络配置等。
通过一系列初始化步骤,构建TaskExecutor,包括:
确保临时目录可用,初始化TaskEventDispatcher、ioManager、shuffleEnvironment、kvStateService等关键组件。
启动shuffleEnvironment,提供交换shuffle数据的端口。
用于注册kvState,支持键值状态的管理。
创建并启动服务,用于处理未解构的任务管理器位置信息。
管理广播变量,支持高效数据广播。
管理task slot的分配和使用。
维护Job的执行状态和相关信息。
确保Job的领导者角色分配和状态管理。
为TaskExecutor创建本地存储根目录和文件,用于存储本地状态。
用于存储和管理TaskExecutor的本地状态。
管理库缓存,优化类加载性能。
配置endpoint为TaskExecutor,启动RPC服务器,实现与外部系统的通信。
启动执行任务的主线程。
管理与JobManager的心跳通信,确保任务执行状态的更新。
管理与ResourceManager的心跳通信,监控资源使用情况。
完成初始化和配置后,TaskManagerRunner向RPC端点发送启动消息,启动消息被接收并处理,最终开始执行。
至此,TaskManager的创建和启动过程解析完毕。下一节,我们将深入探讨TaskExecutor内部的onStart方法,继续Flink源码的探索之旅。
Ⅱ Flink源码分析——Checkpoint源码分析(二)
《Flink Checkpoint源码分析》系列文章深入探讨了Flink的Checkpoint机制,本文聚焦于Task内部状态数据的存储过程,深入剖析状态数据的具体存储方式。
Flink的Checkpoint核心逻辑被封装在`snapshotStrategy.snapshot()`方法中,这一过程主要由`HeapSnapshotStrategy`实现。在进行状态数据的快照操作时,首先对状态数据进行拷贝,这里采取的是引用拷贝而非实例拷贝,速度快且占用内存较少。拷贝后的状态数据被写入到一个临时的`CheckpointStateOutputStream`,即`$CHECKPOINT_DIR/$UID/chk-n`格式的目录,这个并非最终数据存储位置。
在拷贝和初始化输出流后,`AsyncSnapshotCallable`被创建,其`callInternal()`方法中负责将状态数据持久化至磁盘。这个过程分为几个关键步骤:
在分组过程中,状态数据首先被扁平化并添加到`partitioningSource[]`中,同时记录每个元素对应的`keyGroupId`在`counterHistogram[]`中的位置。构建直方图后,数据依据`keyGroupId`进行排序并写入文件,同时将偏移位置记录在`keyGroupOffsets[]`中。
具体实现细节中,`FsCheckpointStateOutputStream`用于创建文件系统输出流,配置包括基路径、文件系统类型、缓冲大小、文件状态阈值等。`StreamStateHandle`最终封装了状态数据的存储文件路径和大小信息,而`KeyedStateHandle`进一步包含`StreamStateHandle`和`keyGroupRangeOffsets`,后者记录了每个`keyGroupId`在文件中的存储位置,以供状态数据检索使用。
简而言之,Flink在执行Checkpoint时,通过一系列精心设计的步骤,确保了状态数据的高效、安全存储。从状态数据的拷贝到元数据的写入,再到状态数据的持久化,每一个环节都充分考虑了性能和数据完整性的需求,使得Flink的实时计算能力得以充分发挥。