‘壹’ 一文解密Kafka,Kafka源码设计与实现原理剖析,真正的通俗易懂
Apache Kafka (简称Kafka )最早是由Linkedln开源出来的分布式消息系统,现在是Apache旗下的一个子项目,并且已经成为开册、领域应用最广泛的消息系统之 Kafka社区也非常活跃,从 版本开始, Kafka 的标语已经从“一个高吞吐量、分布式的消息系统”改为“一个分布式的流平台”
关于Kafka,我打算从入门开始讲起,一直到它的底层实现逻辑个原理以及源码,建议大家花点耐心,从头开始看,相信会对你有所收获。
作为 个流式数据平台,最重要的是要具备下面 个特点
消息系统:
消息系统 也叫作消息队列)主要有两种消息模型:队列和发布订Kafka使用消费组( consumer group )统 上面两种消息模型 Kafka使用队列模型时,它可以将处理 作为平均分配给消费组中的消费者成员
下面我们会从 个角度分析Kafka 的几个基本概念,并尝试解决下面 个问题
消息由生产者发布到 fk 集群后,会被消费者消费 消息的消费模型有两种:推送模型( pu和拉取模型( pull 基于推送模型的消息系统,由消息代理记录消费者的消费状态 消息代理在将消息推送到消费者后 标记这条消息为已消费
但这种方式无法很好地保证消息的处理语义 比如,消息代理把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经 这条消息标记为自己消费了,但实际上这条消息并没有被实际处理) 如果要保证消息的处理语义,消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要在消息代理中记录所有消息的消费状态,这种做法也是不可取的
Kafka每个主题的多个分区日志分布式地存储在Kafka集群上,同时为了故障容错,每个分区都会以副本的方式复制到多个消息代理节点上 其中一个节点会作为主副本( Leader ),其 节点作为备份副本( Follower ,也叫作从副本)
主副本会负责所有的客户端读写操作,备份副本仅仅从主副本同步数据 当主副本 IH 现在故障时,备份副本中的 副本会被选择为新的主副本 因为每个分区的副本中只有主副本接受读写,所以每个服务端都会作为某些分区的主副本,以及另外一些分区的备份副本这样Kafka集群的所有服务端整体上对客户端是负载均衡的
消息系统通常由生产者“pro ucer 消费者( co sumer )和消息代理( broke 大部分组成,生产者会将消息写入消息代理,消费者会从消息代理中读取消息 对于消息代理而言,生产者和消费者都属于客户端:生产者和消费者会发送客户端请求给服务端,服务端的处理分别是存储消息和获取消息,最后服务端返回响应结果给客户端
新的生产者应用程序使用 af aP oce 对象代表 个生产者客户端进程 生产者要发送消息,并不是直接发送给 务端 ,而是先在客户端 消息放入队列 然后 一个 息发送线程从队列中消息,以 盐的方式发送消息给服务端 Kafka的记 集器( Reco dACCUl'lUlato )负责缓存生产者客户端产生的消息,发送线程( Sende )负责读取 集器的批 过网络发送给服务端为了保证客户端 络请求 快速 应, Kafka 用选择器( Selecto 络连接 读写 理,使网络连接( Netwo kCl i.ent )处理客户端 络请求
追加消息到记录收集器时按照分区进行分组,并放到batches集合中,每个分区的队列都保存了将发送到这个分区对应节点上的 记录,客户端的发送线程可 只使用 Sende 线程迭 batches的每个分区,获取分区对应的主剧本节点,取出分区对应的 列中的批记录就可以发送消息了
消息发送线程有两种消息发送方式 按照分区直接发送 按照分区的目标节点发迭 假设有两台服务器, 题有 个分区,那么每台服务器就有 个分区 ,消息发送线程迭代batches的每个分 接往分区的主副本节点发送消息,总共会有 个请求 所示,我 先按照分区的主副本节点进行分组, 属于同 个节点的所有分区放在一起,总共只有两个请求做法可以大大减少网络的开销
消息系统由生产者 存储系统和消费者组成 章分析了生产者发送消息给服务端的过程,本章分析消费者从服务端存储系统读取生产者写入消息的过程 首先我 来了解消费者的 些基础知识
作为分布式的消息系统, Kafka支持多个生产者和多个消费者,生产者可以将消息发布到集群中不同节点的不同分区上;“肖费者也可以消费集群中多个节点的多个分区上的消息 写消息时,多个生产者可以 到同 个分区 读消息时,如果多个消费者同时读取 个分区,为了保证将日志文件的不同数据分配给不同的消费者,需要采用加锁 同步等方式,在分区级别的日志文件上做些控制
相反,如果约定“同 个分区只可被 个消费者处理”,就不需要加锁同步了,从而可提升消费者的处理能力 而且这也并不违反消息的处理语义:原先需要多个消费者处理,现在交给一个消费者处理也是可以的 3- 给出了 种最简单的消息系统部署模式,生产者的数据源多种多样,它们都统写人Kafka集群 处理消息时有多个消费者分担任务 ,这些消费者的处理逻辑都相同, 每个消费者处理的分区都不会重复
因为分区要被重新分配,分区的所有者都会发生变 ,所以在还没有重新分配分区之前 所有消费者都要停止已有的拉取钱程 同时,分区分配给消费者都会在ZK中记录所有者信息,所以也要先删ZK上的节点数据 只有和分区相关的 所有者 拉取线程都释放了,才可以开始分配分区
如果说在重新分配分区前没有释放这些信息,再平衡后就可能造成同 个分区被多个消费者所有的情况 比如分区Pl 原先归消费者 所有,如果没有释放拉取钱程和ZK节点,再平衡后分区Pl 被分配给消费者 了,这样消费者 和消费者 就共享了分区Pl ,而这显然不符合 fka 中关于“一个分区只能被分配给 个消费者”的限制条件 执行再平衡操作的步骤如下
如果是协调者节点发生故障,服务端会有自己的故障容错机制,选出管理消费组所有消费者的新协调者节,点消费者客户端没有权利做这个工作,它能做的只是等待一段时间,查询服务端是否已经选出了新的协调节点如果消费者查到现在已经有管理协调者的协调节点,就会连接这个新协调节,哉由于这个协调节点是服务端新选出来的,所以每个消费者都应该重新连接协调节点
消费者重新加入消费组,在分配到分区的前后,都会对消费者的拉取工作产生影响 消费者发送“加入组请求”之前要停止拉取消息,在收到“加入组响应”中的分区之后要重新开始拉取消息时,为了能够让客户端应用程序感知消费者管理的分区发生变化,在加入组前后,客户端还可以设置自定义的“消费者再平衡监听器”,以便对分区的变化做出合适的处理
‘贰’ Kafka 源码解析之 Consumer 两种 commit 机制和 partition 分配机制
先看下两种不同的 commit 机制,一种是同步 commit,一种是异步 commit,既然其作用都是 offset commit,应该不难猜到它们底层使用接口都是一样的
同步 commit
同步 commit 的实现方式,client.poll() 方法会阻塞直到这个request 完成或超时才会返回。
异步 commit
而对于异步的 commit,最后调用的都是 doCommitOffsetsAsync 方法,其具体实现如下:
在异步 commit 中,可以添加相应的回调函数,如果 request 处理成功或处理失败,ConsumerCoordinator 会通过 () 方法唤醒相应的回调函数。
关键区别在于future是否会get,同步提交就是future会get.
consumer 提供的两种不同 partition 分配策略,可以通过 partition.assignment.strategy 参数进行配置,默认情况下使用的是 org.apache.kafka.clients.consumer.RangeAssignor,Kafka 中提供另一种 partition 的分配策略 org.apache.kafka.clients.consumer.RoundRobinAssignor
用户可以自定义相应的 partition 分配机制,只需要继承这个 AbstractPartitionAssignor 抽象类即可。
AbstractPartitionAssignor
AbstractPartitionAssignor 有一个抽象方法,如下所示:
assign() 这个方法,有两个参数:
RangeAssignor 和 RoundRobinAssignor 通过这个方法 assign() 的实现,来进行相应的 partition 分配。
直接看一下这个方法的实现:
假设 topic 的 partition 数为 numPartitionsForTopic,group 中订阅这个 topic 的 member 数为 consumersForTopic.size(),首先需要算出两个值:
分配的规则是:对于剩下的那些 partition 分配到前 consumersWithExtraPartition 个 consumer 上,也就是前 consumersWithExtraPartition 个 consumer 获得 topic-partition 列表会比后面多一个。
在上述的程序中,举了一个例子,假设有一个 topic 有 7 个 partition,group 有5个 consumer,这个5个 consumer 都订阅这个 topic,那么 range 的分配方式如下:
而如果 group 中有 consumer 没有订阅这个 topic,那么这个 consumer 将不会参与分配。下面再举个例子,将有两个 topic,一个 partition 有5个,一个 partition 有7个,group 有5个 consumer,但是只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:
这个是 roundrobin 的实现,其实现方法如下:
roundrobin 的实现原则,简单来说就是:列出所有 topic-partition 和列出所有的 consumer member,然后开始分配,一轮之后继续下一轮,假设有有一个 topic,它有7个 partition,group 有3个 consumer 都订阅了这个 topic,那么其分配方式为:
对于多个 topic 的订阅,将有两个 topic,一个 partition 有5个,一个 partition 有7个,group 有5个 consumer,但是只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:
roundrobin 分配方式与 range 的分配方式还是略有不同。
‘叁’ 使用IDEA 打开、调试kafka源码
kafka的启动类是隐激:
core/src/main/scala/kafka/Kafka.scala
我尝试在本地运晌誉行,死灶谨袜活跑不起来,报错如下。网上也没有找到靠谱的解决办法。
尝试本地运行失败后,又尝试了远程调试的方式。