❶ Kafka简介+Kafka Tool使用简介+使用实例
详细安装访问: https://www.jianshu.com/p/c74e0ec577b0
macOS 可以用homebrew快速安装,访问地址: https://www.jianshu.com/p/cddd25da8061
原文链接: https://www.jianshu.com/p/06884c5bf3f1
查看topic列表:
创建topic:
--create :创建命令;
--topic :后面指定topic名称;
--replication-factor :后面指定副本数;
--partitions :指定分区数,根据broker的数量决定;
--zookeeper :后面指定 zookeeper.connect 的zk链接
查看某个topic:
Kafka 作为消息系统的一种, 当然可 以像其他消 息中 间件一样作为消息数据中转的平台。 下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。
1、引入依赖
2、消息生产者
示例 中用 KafkaProcer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。
bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理服务器,则服务器地址之间用逗号分隔, 比如” 192.168.1.1 :9092,192.168.1.2:9092” 。 localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。
key.serializer 和 value.serializer 表示消息的序列化类型 。 Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的 消息序列化为 二 进制类型,示例中是发送文本消息到服务器 , 所以使用的是StringSerializer。
key.deserializer 和 value.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。
zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。
有 了 消息生产者之后 , 就可以调用 send 方法发送消息了。该方法的入参是 ProcerRecord类型对象 , ProcerRecord 类提供了多种构造函数形参,常见的有如下三种 :
ProcerRecord(topic,partition,key,value);
ProcerRecord(topic,key,value);
ProcerRecord(topic, value) ;
其中 topic 和 value 是必填的, partition 和 key 是可选的 。如果指定了 pa时tion,那么消息会被发送至指定的 partition ;如果没指定 partition 但指定了 Key,那么消息会按照 hash(key)发送至对应的 partition: 如果既没指定 partition 也没指定 key,那么 消息会按照 round-robin 模式发送(即以轮询的方式依次发送〉到每一个 partition。示例中将向 test-topic 主题发送三条消息。
3、消息消费者
和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。
bootstrap. servers 和生产者一样,表示 Kafka 集群。
group.id 表示消费者的分组 ID。
enable.auto.commit 表示 Consumer 的 offset 是否自 动提交 。
auto.commit.interval .ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。
key. deserializer 和 value.deserializer 表示用字符串来反序列化消息数据。
消息消费者使用 subscribe 方法 订阅了 Topic 为 test-topic 的消息。 Consumer 调用poll 方法来轮询 Kafka 集群的消息, 一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止 。 如果读取到消息,则打印出消息记录的 pa此ition, offset、key 等。
❷ 一文解密Kafka,Kafka源码设计与实现原理剖析,真正的通俗易懂
Apache Kafka (简称Kafka )最早是由Linkedln开源出来的分布式消息系统,现在是Apache旗下的一个子项目,并且已经成为开册、领域应用最广泛的消息系统之 Kafka社区也非常活跃,从 版本开始, Kafka 的标语已经从“一个高吞吐量、分布式的消息系统”改为“一个分布式的流平台”
关于Kafka,我打算从入门开始讲起,一直到它的底层实现逻辑个原理以及源码,建议大家花点耐心,从头开始看,相信会对你有所收获。
作为 个流式数据平台,最重要的是要具备下面 个特点
消息系统:
消息系统 也叫作消息队列)主要有两种消息模型:队列和发布订Kafka使用消费组( consumer group )统 上面两种消息模型 Kafka使用队列模型时,它可以将处理 作为平均分配给消费组中的消费者成员
下面我们会从 个角度分析Kafka 的几个基本概念,并尝试解决下面 个问题
消息由生产者发布到 fk 集群后,会被消费者消费 消息的消费模型有两种:推送模型( pu和拉取模型( pull 基于推送模型的消息系统,由消息代理记录消费者的消费状态 消息代理在将消息推送到消费者后 标记这条消息为已消费
但这种方式无法很好地保证消息的处理语义 比如,消息代理把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经 这条消息标记为自己消费了,但实际上这条消息并没有被实际处理) 如果要保证消息的处理语义,消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要在消息代理中记录所有消息的消费状态,这种做法也是不可取的
Kafka每个主题的多个分区日志分布式地存储在Kafka集群上,同时为了故障容错,每个分区都会以副本的方式复制到多个消息代理节点上 其中一个节点会作为主副本( Leader ),其 节点作为备份副本( Follower ,也叫作从副本)
主副本会负责所有的客户端读写操作,备份副本仅仅从主副本同步数据 当主副本 IH 现在故障时,备份副本中的 副本会被选择为新的主副本 因为每个分区的副本中只有主副本接受读写,所以每个服务端都会作为某些分区的主副本,以及另外一些分区的备份副本这样Kafka集群的所有服务端整体上对客户端是负载均衡的
消息系统通常由生产者“pro ucer 消费者( co sumer )和消息代理( broke 大部分组成,生产者会将消息写入消息代理,消费者会从消息代理中读取消息 对于消息代理而言,生产者和消费者都属于客户端:生产者和消费者会发送客户端请求给服务端,服务端的处理分别是存储消息和获取消息,最后服务端返回响应结果给客户端
新的生产者应用程序使用 af aP oce 对象代表 个生产者客户端进程 生产者要发送消息,并不是直接发送给 务端 ,而是先在客户端 消息放入队列 然后 一个 息发送线程从队列中消息,以 盐的方式发送消息给服务端 Kafka的记 集器( Reco dACCUl'lUlato )负责缓存生产者客户端产生的消息,发送线程( Sende )负责读取 集器的批 过网络发送给服务端为了保证客户端 络请求 快速 应, Kafka 用选择器( Selecto 络连接 读写 理,使网络连接( Netwo kCl i.ent )处理客户端 络请求
追加消息到记录收集器时按照分区进行分组,并放到batches集合中,每个分区的队列都保存了将发送到这个分区对应节点上的 记录,客户端的发送线程可 只使用 Sende 线程迭 batches的每个分区,获取分区对应的主剧本节点,取出分区对应的 列中的批记录就可以发送消息了
消息发送线程有两种消息发送方式 按照分区直接发送 按照分区的目标节点发迭 假设有两台服务器, 题有 个分区,那么每台服务器就有 个分区 ,消息发送线程迭代batches的每个分 接往分区的主副本节点发送消息,总共会有 个请求 所示,我 先按照分区的主副本节点进行分组, 属于同 个节点的所有分区放在一起,总共只有两个请求做法可以大大减少网络的开销
消息系统由生产者 存储系统和消费者组成 章分析了生产者发送消息给服务端的过程,本章分析消费者从服务端存储系统读取生产者写入消息的过程 首先我 来了解消费者的 些基础知识
作为分布式的消息系统, Kafka支持多个生产者和多个消费者,生产者可以将消息发布到集群中不同节点的不同分区上;“肖费者也可以消费集群中多个节点的多个分区上的消息 写消息时,多个生产者可以 到同 个分区 读消息时,如果多个消费者同时读取 个分区,为了保证将日志文件的不同数据分配给不同的消费者,需要采用加锁 同步等方式,在分区级别的日志文件上做些控制
相反,如果约定“同 个分区只可被 个消费者处理”,就不需要加锁同步了,从而可提升消费者的处理能力 而且这也并不违反消息的处理语义:原先需要多个消费者处理,现在交给一个消费者处理也是可以的 3- 给出了 种最简单的消息系统部署模式,生产者的数据源多种多样,它们都统写人Kafka集群 处理消息时有多个消费者分担任务 ,这些消费者的处理逻辑都相同, 每个消费者处理的分区都不会重复
因为分区要被重新分配,分区的所有者都会发生变 ,所以在还没有重新分配分区之前 所有消费者都要停止已有的拉取钱程 同时,分区分配给消费者都会在ZK中记录所有者信息,所以也要先删ZK上的节点数据 只有和分区相关的 所有者 拉取线程都释放了,才可以开始分配分区
如果说在重新分配分区前没有释放这些信息,再平衡后就可能造成同 个分区被多个消费者所有的情况 比如分区Pl 原先归消费者 所有,如果没有释放拉取钱程和ZK节点,再平衡后分区Pl 被分配给消费者 了,这样消费者 和消费者 就共享了分区Pl ,而这显然不符合 fka 中关于“一个分区只能被分配给 个消费者”的限制条件 执行再平衡操作的步骤如下
如果是协调者节点发生故障,服务端会有自己的故障容错机制,选出管理消费组所有消费者的新协调者节,点消费者客户端没有权利做这个工作,它能做的只是等待一段时间,查询服务端是否已经选出了新的协调节点如果消费者查到现在已经有管理协调者的协调节点,就会连接这个新协调节,哉由于这个协调节点是服务端新选出来的,所以每个消费者都应该重新连接协调节点
消费者重新加入消费组,在分配到分区的前后,都会对消费者的拉取工作产生影响 消费者发送“加入组请求”之前要停止拉取消息,在收到“加入组响应”中的分区之后要重新开始拉取消息时,为了能够让客户端应用程序感知消费者管理的分区发生变化,在加入组前后,客户端还可以设置自定义的“消费者再平衡监听器”,以便对分区的变化做出合适的处理
❸ Kafka 基础原理及工作流程简述
Kafka 工作流程
基础总结:
1)broker :broker代表kafka的节点, Broker是分布式部署并且相互之间相互独立的, 启动的时候向zookeeper 注册,在Zookeeper上会有一个专门 用来进行Broker服务器列表记录 的节点:/brokers/ids。每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0...N]。Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后, 每个Broker就会将自己的IP地址和端口信息记录 到该节点中去。其中,Broker创建的节点类型是 临时节点 ,一旦Broker 宕机 ,则 对应的临时节点也会被自动删除 。
2)topic:消息主题,在Kafka中,同一个 Topic的消息会被分成多个分区 并将其分布在多个Broker上, 这些分区信息及与Broker的对应关系 也都是由Zookeeper在维护,由专门的节点来记录,如:/borkers/topics Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
3)partition :同一topic类型消息的分区,如图,每个分区都存在一个leader 和N个follower(副本),副本个数在创建topic的时候可以指定创建多少个。消息生产者生产消息和消费组消费消息都是通过leader完成,副本的存在是为了防止发生节点宕机,导致leader挂了,follower随时顶上去变成leader,继续恢复生产。重点来了,leader所在节点挂了,会有follower变成leader,所以同一个topic的同一个partition的leader与follower不可能在同一个broker,这样才能做到这个broker上的某个topic的某个partition的leader挂了,其他正常节点上的这个topic的这个partition的follower会顶上来。
4)生产者发送消息的 负载均衡 :由于同一个Topic消息会被分区并将其分布在多个Broker上,因此, 生产者需要将消息合理地发送到这些分布式的Broker上 ,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。 (4.1) 四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。 (4.2) 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。
5)消费者负载均衡:与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费组分组包含若干消费者, 每条消息都只会发送给分组中的一个消费者 ,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
6)分区与消费者 的关系: 消费组 (Consumer Group) consumer group 下有多个 Consumer(消费者)。对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。在Kafka中,规定了 每个消息分区 只能被同组的一个消费者进行消费 ,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] 其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。
7)消息的消费进度Offset 记录:在消费者对指定消息分区进行消息消费的过程中, 需要定时地将分区消息的消费进度Offset记录到Zookeeper上 ,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] 节点内容就是Offset的值。这是kafka0.9和之前版本offset记录的方式,之后的版本offset都改为存在kafka本地,当然了这里的本地是指磁盘不是内存。。。
8)消费者注册:每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。 对 消费者分组 中的 消费者 的变化注册监听 。每个 消费者 都需要关注所属 消费者分组 中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。 对Broker服务器变化注册监听 。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。 进行消费者负载均衡 。为了让同一个Topic下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。
❹ 如何使用python 连接kafka 并获取数据
连接
kafka
的库有两种类型,一种是直接连接
kafka
的,存储
offset
的事情要自己在客户端完成。还有一种是先连接
zookeeper
然后再通过
zookeeper
获取
kafka
的
brokers
信息,
offset
存放在
zookeeper
上面,由
zookeeper
来协调。
我现在使用
samsa
这个
highlevel
库
Procer示例
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']topic.publish('msg')
**
Consumer示例
**
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']consumer
=
topic.subscribe('groupname')for
msg
in
consumer:
print
msg
Tip
consumer
必需在
procer
向
kafka
的
topic
里面提交数据后才能连接,否则会出错。
在
Kafka
中一个
consumer
需要指定
groupname
,
groue
中保存着
offset
等信息,新开启一个
group
会从
offset
0
的位置重新开始获取日志。
kafka
的配置参数中有个
partition
,默认是
1
,这个会对数据进行分区,如果多个
consumer
想连接同个
group
就必需要增加
partition
,
partition
只能大于
consumer
的数量,否则多出来的
consumer
将无法获取到数据。
❺ Kafka核心组件之控制器和协调器
[TOC]
我们已经知道Kafka的集群由n个的broker所组成,每个broker就是一个kafka的实例或者称之为kafka的服务。其实控制器也是一个broker,控制器也叫leader broker。
他除了具有一般broker的功能外,还负责分区leader的选取,也就是负责选举partition的leader replica。
kafka每个broker启动的时候,都会实例化一个KafkaController,并将broker的id注册到zookeeper,集群在启动过程中,通过选举机制选举出其中一个broker作为leader,也就是前面所说的控制器。
包括集群启动在内,有三种情况触发控制器选举:
1、集群启动
2、控制器所在代理发生故障
3、zookeeper心跳感知,控制器与自己的session过期
按照惯例,先看图。我们根据下图来讲解集群启动时,控制器选举过程。
假设此集群有三个broker,同时启动。
(一)3个broker从zookeeper获取/controller临时节点信息。/controller存储的是选举出来的leader信息。此举是为了确认是否已经存在leader。
(二)如果还没有选举出leader,那么此节点是不存在的,返回-1。如果返回的不是-1,而是leader的json数据,那么说明已经有leader存在,选举结束。
(三)三个broker发现返回-1,了解到目前没有leader,于是均会触发向临时节点/controller写入自己的信息。最先写入的就会成为leader。
(四)假设broker 0的速度最快,他先写入了/controller节点,那么他就成为了leader。而broker1、broker2很不幸,因为晚了一步,他们在写/controller的过程中会抛出ZkNodeExistsException,也就是zk告诉他们,此节点已经存在了。
经过以上四步,broker 0成功写入/controller节点,其它broker写入失败了,所以broker 0成功当选leader。
此外zk中还有controller_epoch节点,存储了leader的变更次数,初始值为0,以后leader每变一次,该值+1。所有向控制器发起的请求,都会携带此值。如果控制器和自己内存中比较,请求值小,说明kafka集群已经发生了新的选举,此请求过期,此请求无效。如果请求值大于控制器内存的值,说明已经有新的控制器当选了,自己已经退位,请求无效。kafka通过controller_epoch保证集群控制器的唯一性及操作的一致性。
由此可见,Kafka控制器选举就是看谁先争抢到/controller节点写入自身信息。
控制器的初始化,其实是初始化控制器所用到的组件及监听器,准备元数据。
前面提到过每个broker都会实例化并启动一个KafkaController。KafkaController和他的组件关系,以及各个组件的介绍如下图:
图中箭头为组件层级关系,组件下面还会再初始化其他组件。可见控制器内部还是有些复杂的,主要有以下组件:
1、ControllerContext,此对象存储了控制器工作需要的所有上下文信息,包括存活的代理、所有主题及分区分配方案、每个分区的AR、leader、ISR等信息。
2、一系列的listener,通过对zookeeper的监听,触发相应的操作,黄色的框的均为listener
3、分区和副本状态机,管理分区和副本。
4、当前代理选举器ZookeeperLeaderElector,此选举器有上位和退位的相关回调方法。
5、分区leader选举器,PartitionLeaderSelector
6、主题删除管理器,TopicDeletetionManager
7、leader向broker批量通信的ControllerBrokerRequestBatch。缓存状态机处理后产生的request,然后统一发送出去。
8、控制器平衡操作的KafkaScheler,仅在broker作为leader时有效。
Kafka集群的一些重要信息都记录在ZK中,比如集群的所有代理节点、主题的所有分区、分区的副本信息(副本集、主副本、同步的副本集)。每个broker都有一个控制器,为了管理整个集群Kafka选利用zk选举模式,为整个集群选举一个“中央控制器”或”主控制器“,控制器其实就是一个broker节点,除了一般broker功能外,还具有分区首领选举功能。中央控制器管理所有节点的信息,并通过向ZK注册各种监听事件来管理整个集群节点、分区的leader的选举、再平衡等问题。外部事件会更新ZK的数据,ZK中的数据一旦发生变化,控制器都要做不同的响应处理。
故障转移其实就是leader所在broker发生故障,leader转移为其他的broker。转移的过程就是重新选举leader的过程。
重新选举leader后,需要为该broker注册相应权限,调用的是ZookeeperLeaderElector的onControllerFailover()方法。在这个方法中初始化和启动了一系列的组件来完成leader的各种操作。具体如下,其实和控制器初始化有很大的相似度。
1、注册分区管理的相关监听器
2、注册主题管理的相关监听
3、注册代理变化监听器
4、重新初始化ControllerContext,
5、启动控制器和其他代理之间通信的ControllerChannelManager
6、创建用于删除主题的TopicDeletionManager对象,并启动。
7、启动分区状态机和副本状态机
8、轮询每个主题,添加监听分区变化的
9、如果设置了分区平衡定时操作,那么创建分区平衡的定时任务,默认300秒检查并执行。
除了这些组件的启动外,onControllerFailover方法中还做了如下操作:
1、/controller_epoch值+1,并且更新到ControllerContext
2、检查是否出发分区重分配,并做相关操作
3、检查需要将优先副本选为leader,并做相关操作
4、向kafka集群所有代理发送更新元数据的请求。
下面来看leader权限被取消时,调用的方法onControllerResignation
1、该方法中注销了控制器的权限。取消在zookeeper中对于分区、副本感知的相应监听器的监听。
2、关闭启动的各个组件
3、最后把ControllerContext中记录控制器版本的数值清零,并设置当前broker为RunnignAsBroker,变为普通的broker。
通过对控制器启动过程的学习,我们应该已经对kafka工作的原理有了了解, 核心是监听zookeeper的相关节点,节点变化时触发相应的操作 。
有新的broker加入集群时,称为代理上线。反之,当broker关闭,推出集群时,称为代理下线。
代理上线:
1、新代理启动时向/brokers/ids写数据
2、BrokerChangeListener监听到变化。对新上线节点调用controllerChannelManager.addBroker(),完成新上线代理网络层初始化
3、调用KafkaController.onBrokerStartup()处理
3.5恢复因新代理上线暂停的删除主题操作线程
代理下线:
1、查找下线节点集合
2、轮询下线节点,调用controllerChannelManager.removeBroker(),关闭每个下线节点网络连接。清空下线节点消息队列,关闭下线节点request请求
3、轮询下线节点,调用KafkaController.onBrokerFailure处理
4、向集群全部存活代理发送updateMetadataRequest请求
顾名思义,协调器负责协调工作。本节所讲的协调器,是用来协调消费者工作分配的。简单点说,就是消费者启动后,到可以正常消费前,这个阶段的初始化工作。消费者能够正常运转起来,全有赖于协调器。
主要的协调器有如下两个:
1、消费者协调器(ConsumerCoordinator)
2、组协调器(GroupCoordinator)
kafka引入协调器有其历史过程,原来consumer信息依赖于zookeeper存储,当代理或消费者发生变化时,引发消费者平衡,此时消费者之间是互不透明的,每个消费者和zookeeper单独通信,容易造成羊群效应和脑裂问题。
为了解决这些问题,kafka引入了协调器。服务端引入组协调器(GroupCoordinator),消费者端引入消费者协调器(ConsumerCoordinator)。每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组(集群负载均衡)和组下每个消费者消费的偏移量(offset)。每个consumer实例化时,同时实例化一个ConsumerCoordinator对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信。如下图:
消费者协调器,可以看作是消费者做操作的代理类(其实并不是),消费者很多操作通过消费者协调器进行处理。
消费者协调器主要负责如下工作:
1、更新消费者缓存的MetaData
2、向组协调器申请加入组
3、消费者加入组后的相应处理
4、请求离开消费组
5、向组协调器提交偏移量
6、通过心跳,保持组协调器的连接感知。
7、被组协调器选为leader的消费者的协调器,负责消费者分区分配。分配结果发送给组协调器。
8、非leader的消费者,通过消费者协调器和组协调器同步分配结果。
消费者协调器主要依赖的组件和说明见下图:
可以看到这些组件和消费者协调器担负的工作是可以对照上的。
组协调器负责处理消费者协调器发过来的各种请求。它主要提供如下功能:
组协调器在broker启动的时候实例化,每个组协调器负责一部分消费组的管理。它主要依赖的组件见下图:
这些组件也是和组协调器的功能能够对应上的。具体内容不在详述。
下图展示了消费者启动选取leader、入组的过程。
消费者入组的过程,很好的展示了消费者协调器和组协调器之间是如何配合工作的。leader consumer会承担分区分配的工作,这样kafka集群的压力会小很多。同组的consumer通过组协调器保持同步。消费者和分区的对应关系持久化在kafka内部主题。
消费者消费时,会在本地维护消费到的位置(offset),就是偏移量,这样下次消费才知道从哪里开始消费。如果整个环境没有变化,这样做就足够了。但一旦消费者平衡操作或者分区变化后,消费者不再对应原来的分区,而每个消费者的offset也没有同步到服务器,这样就无法接着前任的工作继续进行了。
因此只有把消费偏移量定期发送到服务器,由GroupCoordinator集中式管理,分区重分配后,各个消费者从GroupCoordinator读取自己对应分区的offset,在新的分区上继续前任的工作。
下图展示了不提交offset到服务端的问题:
开始时,consumer 0消费partition 0 和1,后来由于新的consumer 2入组,分区重新进行了分配。consumer 0不再消费partition2,而由consumer 2来消费partition 2,但由于consumer之间是不能通讯的,所有consumer2并不知道从哪里开始自己的消费。
因此consumer需要定期提交自己消费的offset到服务端,这样在重分区操作后,每个consumer都能在服务端查到分配给自己的partition所消费到的offset,继续消费。
由于kafka有高可用和横向扩展的特性,当有新的分区出现或者新的消费入组后,需要重新分配消费者对应的分区,所以如果偏移量提交的有问题,会重复消费或者丢消息。偏移量提交的时机和方式要格外注意!!
1、自动提交偏移量
设置 enable.auto.commit为true,设定好周期,默认5s。消费者每次调用轮询消息的poll() 方法时,会检查是否超过了5s没有提交偏移量,如果是,提交上一次轮询返回的偏移量。
这样做很方便,但是会带来重复消费的问题。假如最近一次偏移量提交3s后,触发了再均衡,服务器端存储的还是上次提交的偏移量,那么再均衡结束后,新的消费者会从最后一次提交的偏移量开始拉取消息,此3s内消费的消息会被重复消费。
2、手动提交偏移量
设置 enable.auto.commit为false。程序中手动调用commitSync()提交偏移量,此时提交的是poll方法返回的最新的偏移量。
commitSync()是同步提交偏移量,主程序会一直阻塞,偏移量提交成功后才往下运行。这样会限制程序的吞吐量。如果降低提交频次,又很容易发生重复消费。
这里我们可以使用commitAsync()异步提交偏移量。只管提交,而不会等待broker返回提交结果
commitSync只要没有发生不可恢复错误,会进行重试,直到成功。而commitAsync不会进行重试,失败就是失败了。commitAsync不重试,是因为重试提交时,可能已经有其它更大偏移量已经提交成功了,如果此时重试提交成功,那么更小的偏移量会覆盖大的偏移量。那么如果此时发生再均衡,新的消费者将会重复消费消息。
❻ python 消费kafka 写入es 小记
# -*- coding: utf8 -*-
# __author__ = '小红帽'
# Date: 2020-05-11
"""Naval Fate.
Usage:
py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> --groupId=<groupId> --topic=<topic_name> --es-servers=<host:port> --index=<schema> --type=<doc> --id=<order_id>
py_kafka_protobuf_consume.py -h | --help
py_kafka_protobuf_consume.py --version
Options:
-h --help 打印帮助信息.
--bootstrap_servers=<host:port,host2:port2..> kafka servers
--groupId=<groupId> kafka消费组
--topic=<topic_name> topic名称
--es-servers=<host:port> ES 地址
--index=<index_name> ES 索引
--type=<doc> ES type
--id=<order_id> 指定id主键,快速更新
"""
import json
from kafka import KafkaConsumer
from docopt import docopt
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class Kafka_consumer():
def __init__(self,args):
self.topic = args['--topic']
self.bootstrapServers = args['--bootstrap-servers']
self.groupId = args['--groupId']
self.id = args['--id']
self.es_host = args['--es-servers'].split(':')[0]
self.es_port = args['--es-servers'].split(':')[1]
self.es_index = args['--index']
self.es_type = args['--type']
self.consumer = KafkaConsumer(
bootstrap_servers=self.bootstrapServers,
group_id=self.groupId,
enable_auto_commit = True,
auto_commit_interval_ms=5000,
consumer_timeout_ms=5000
)
def consume_data_es(self):
while True:
try:
es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)
self.consumer.subscribe([self.topic])
actions=[]
for message in self.consumer:
if message is not None:
query = json.loads(message.value)['data'][0]
action = {
"_index": self.es_index,
"_type": self.es_type,
"_id": json.loads(message.value)['data'][0][self.id],
"_source": query
}
actions.append(action)
if len(actions) > 50:
helpers.bulk(client=es, actions=actions)
print("插入es %s 条数据" % len(actions))
actions = []
if len(actions) > 0:
helpers.bulk(client=es, actions=actions)
print("等待超时时间,插入es %s 条数据" % len(actions))
actions=[]
except BaseException as e:
print(e)
if __name__ == '__main__':
arguments = docopt(__doc__,version='sbin 1.0')
consumer = Kafka_consumer(arguments)
consumer.consume_data_es()
❼ 服务端技术实战系列——Kafka篇
一.概念&原理
[if !supportLists]1. [endif]主题(topic):主题是对消息的分类。
[if !supportLists]2. [endif]消息(message):消息是kafka通信的基本单位。
[if !supportLists]3. [endif]分区(partition): 一组 消息对应 一个 主题, 一个 主题对应 一个或多个 分区。每个分区为一系列有序消息组成的 有序队列 ;每个分区在物理上对应一个文件夹。
[if !supportLists]4. [endif]副本(replica):每个分区有 一个或多个 副本,分区的副本分布在集群的 不同 代理(机器)上,以提高可用性;分区的副本与日志对象是一一对应的。
[if !supportLists]5. [endif]Kafka只保证一个 分区内 的消息 有序性 ,不保证跨分区消息的有序性。消息被追加到相应分区中, 顺序写入磁盘 ,效率非常高。
[if !supportLists]6. [endif]Kafka选取某个某个分区的 一个 副本作为leader副本,该分区的 其他 副本为follower副本。 只有leader副本负责处理客户端读/写请求 ,follower副本从leader副本同步数据。
[if !supportLists]7. [endif]任何发布到分区的消息都会追加到日志文件的尾部, 每条消息 在日志文件中的 位置 都对应一个 按序递增的偏移量 ;偏移量在一个分区下严格有序。
[if !supportLists]8. [endif]Kafka不允许对消息进行随机读写。
[if !supportLists]9. [endif]新版消费者将 消费偏移量 保存到kafka内部的一个主题中。
[if !supportLists]10. [endif]Kafka集群由 一个或多个代理 (Broker,也称为kafka实例)构成。可以在 一台 服务器上配置 一个或多个代理 ,每个代理具有唯一标识broker.id。
[if !supportLists]11. [endif]生产者将消息 发送给代理 (Broker)。
[if !supportLists]12. [endif]消费者以 拉取 (pull)方式拉取数据,每个消费者都属于一个消费组。
[if !supportLists]13. [endif]同一个主题的一条消息只能被 同一个消费组 下的某一个消费者消费,但 不同消费组 的消费者可以 同时 消费该消息。
[if !supportLists]14. [endif]消息 广播 :指定各消费者属于不同消费组;消息 单播 :指定各消费者属于同一个消费组。
[if !supportLists]15. [endif]Kafka启动时在Zookeeper上创建相应节点来保存 元数据 ,元数据包括:代理节点信息、集群信息、主题信息、分区状态信息、分区副本分配方案、动态配置等;
[if !supportLists]16. [endif]Kafka通过 监听 机制在节点注册监听器来监听节点元数据变化;
[if !supportLists]17. [endif]Kafka将数据写入 磁盘 ,以文件系统来存数据;
[if !supportLists]18. [endif]生产环境一般将zookeeper集群和kafka集群 分机架 部署;
[if !supportLists]二.[endif] Kafka Procer
配置:
/**
* xTestProxy——KafkaConfigConstant
*
* @author ZhangChi
* @date 2018年6月20日---下午5:50:44
* @version 1.0
*/
public class KafkaConfigConstant {
public static final String KAFKA_CLUSTER = "fa-common1.hangzhou-1.kafka.internal.lede.com:9200,fa-common2.hangzhou-1.kafka.internal.lede.com:9200,fa-common3.hangzhou-1.kafka.internal.lede.com:9200";
}
生产者配置:
/**
* xTestProxy——HttpKafkaProcerFactory
*
* @author ZhangChi
* @date 2018年6月11日---下午2:37:51
* @version 1.0
*/
public class HttpKafkaProcerFactory {
// 真正的KafkaProcer仅有一份
private static KafkaProcer kafkaProcer = null ;
private static Properties property ;
public static KafkaProcer getKafkaProcer() {
if ( kafkaProcer == null ) {
synchronized (HttpKafkaProcerFactory. class ) {
if ( kafkaProcer == null ) {
property = buildKafkaProperty ();
kafkaProcer = new KafkaProcer( property );
}
}
}
return kafkaProcer ;
}
public static Properties buildKafkaProperty() {
Properties props = new Properties();
props.put(ProcerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );
props.put(ProcerConfig. ACKS_CONFIG , "all");
props.put(ProcerConfig. RETRIES_CONFIG , 0);
props.put(ProcerConfig. BATCH_SIZE_CONFIG , 16384);
props.put(ProcerConfig. BUFFER_MEMORY_CONFIG , 33554432);
props.put(ProcerConfig. LINGER_MS_CONFIG , 1);
props.put(ProcerConfig. KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProcerConfig. VALUE_SERIALIZER_CLASS_CONFIG ,
"org.apache.kafka.common.serialization.StringSerializer");
return props;
}
}
生产者线程组:
/**
* xTestProxy——HttpKafkaProcerThread
* 多线程每次new一个实例
*
* @author ZhangChi
* @date 2018年6月25日---下午2:09:39
* @version 1.0
*/
public class HttpKafkaProcerThread implements Runnable {
private static Logger logger = LoggerFactory. getLogger ("HttpKafkaProcerThread");
private final String KAFKA_TOPIC = KafkaConstant. HTTP_REQ_RESP_TOPIC ;
private String kafkaMessageJson;
private KafkaProcer procer;
public String messageType;
public String originalMessage;
private static KafkaMessage kafkaMessage = new KafkaMessage();
public HttpKafkaProcerThread(KafkaProcer procer, String messageType, String originalMessage) {
this .procer = procer;
this .messageType = messageType;
this .originalMessage = originalMessage;
}
@Override
public void run() {
// TODO Auto-generated method stub
/* 1.构建kafka消息*/
kafkaMessageJson = generateKafkaMessage( this .messageType, this .originalMessage);
/* 2.发送kafka消息*/
if (kafkaMessageJson != null && !StringUtils. isEmpty (kafkaMessageJson)) {
logger .info("create message start:" + kafkaMessageJson);
procer.send( new ProcerRecord( this .KAFKA_TOPIC, kafkaMessageJson));
} else {
logger .info("kafkaMessageJson is null!");
}
}
private String generateKafkaMessage(String messageType, String originalMessage) {
if (StringUtils. isBlank (messageType) || StringUtils. isBlank (originalMessage)) {
return null ;
}
kafkaMessage .setMessageId(KafkaMessageUtils. generateId ());
kafkaMessage .setMessageTime(KafkaMessageUtils. generateTime ());
kafkaMessage .setMessageType(messageType);
kafkaMessage .setMessage(originalMessage);
String kafkaMessageToJson = null ;
try {
kafkaMessageToJson = KafkaMessageUtils. objectToJson ( kafkaMessage );
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaMessageJson = kafkaMessageToJson;
return kafkaMessageToJson;
}
}
[if !supportLists]三.[endif] Kafka Consumer
消费者配置:
private static Properties buildKafkaProperty() {
Properties properties = new Properties();
// 测试环境kafka的端口号是9200
properties.put(ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );
// 消费组名称
properties.put(ConsumerConfig. GROUP_ID_CONFIG , KafkaConfigConstant. GROUP_ID );
properties.put(ConsumerConfig. CLIENT_ID_CONFIG , "test");
// 从头消费
properties.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG , "earliest");
// 自动提交偏移量
properties.put(ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG , "true");
// 时间间隔1s
properties.put(ConsumerConfig. AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");
properties.put(ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG ,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig. VALUE_DESERIALIZER_CLASS_CONFIG ,
"org.apache.kafka.common.serialization.StringDeserializer");
return properties;
}
消费者线程组:
/**
* AnalysisEngine——HttpKafkaConsumerGroup
*
* @author ZhangChi
* @date 2018年6月11日---下午6:20:47
* @version 1.0
*/
@Service("httpKafkaConsumerGroup")
public class HttpKafkaConsumerGroup {
@Autowired
private RequestAnalyzer requestAnalyzer;
@Autowired
private EsDocumentServiceImpl esDocumentServiceImpl;
@Autowired
private AnalysisEngineClient analysisEngineClient;
@Autowired
private MongoTemplate mongoTemplate;
private List httpKafkaConsumerList = new ArrayList();
public void initHttpKafkaConsumerGroup( int consumerNumber, RunModeEnum mode) {
for ( int i = 0; i < consumerNumber; i++) {
/**
* 将注入的服务当做构造参数,这样保证每个子线程都能拿到服务实例而不是空指针!
*/
HttpKafkaConsumer consumerThread = new HttpKafkaConsumer(requestAnalyzer, esDocumentServiceImpl, mode, analysisEngineClient, mongoTemplate);
httpKafkaConsumerList.add(consumerThread);
}
}
public void consumeGroupStart() {
for (HttpKafkaConsumer item : httpKafkaConsumerList) {
LogConstant. runLog .info("httpKafkaConsumerList size : " + httpKafkaConsumerList.size());
Thread consumerThread = new Thread(item);
consumerThread.start();
}
}
}
先逐个初始化消费者实例,然后将这些消费者加入到消费组列表中。消费组启动后,会循环产生消费者线程。
❽ kafka入门:一个开源的、轻量级、高吞吐、高可用的分布式消息系统
随着信息技术的快速发展及互联网用户规模的急剧增长,计算机所存储的信息量正呈爆炸式增长,目前数据量已进入大规模和超大规模的海量数据时代, 如何高效地存储、分析、处理和挖掘海量数据 已成为技术研究领域的热点和难点问题。而 如何采集和运营管理、分析这些数据 也是大数据处理中一个至关重要的组成环节,这就需要相应的基础设施对其提供支持。针对这个需求,当前业界已有很多开源的消息系统应运而生,kafka就是一款当然非常流行的消息系统。
Kafka是一款开源的、轻量级的、分布式、可分区和具有复制备份的(Replicated)、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。作为一个流式处理平台,必须具备以下3个关键特性:
1) 能够允许发布和订阅流数据。
2) 存储流数据时提供相应的容错机制。
3) 当流数据到达时能够被及时处理。
消息流系统kafka的基本结构包括生产者和消费者,以及kafka集群。
生产者负责生产消息,将消息写入Kafka集群;消费者从Kafka集群中拉取消息。
消息是Kafka通信的基本单位 ,由一个 固定长度的消息头 和一个 可变长度的消息体 构成。
Kafka将 一组消息 抽象归纳为一个主题(Topic),也就是说,一个主题是对消息的一个分类。 生产者将消息指定主题发送到kafka集群,消费者订阅主题或主题的某些分区进行消费。
Kafka将一组消息归纳为一个主题,而 每个主题又被分成一个或多个分区(Partition) 。每个分区由一系列有序、不可变的消息组成,是一个有序队列。 每个分区在物理上对应为一个文件夹 ,分区的命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从0开始,编号最大值为分区的总数减1。
分区使得Kafka在并发处理上变得更加容易,理论上来说,分区数越多吞吐量越高,但这要根据集群实际环境及业务场景而定。同时,分区也是Kafka保证消息被顺序消费以及对消息进行负载均衡的基础。
疑问和答案 :分区如何保证消息被顺序消费?每个分区内的消息是有序的,但不同分区间如何保证?猜测是分区从存储空间上比较大,分区个数少。顺序消费的主要因素在分区内的消息,分区间的可以忽略。高吞吐率顺序写磁盘估计也是这个原因。
Kafka只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性。 每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这是Kafka高吞吐率的一个重要保证 。同时与传统消息系统不同的是,Kafka并不会立即删除已被消费的消息,由于磁盘的限制消息也不会一直被存储,因此 Kafka提供两种删除老数据的策略 ,一是基于消息已存储的时间长度,二是基于分区的大小。这两种策略都能通过配置文件进行配置。
每个分区又有一至多个副本(Replica),分区的副本分布在集群的不同代理上,以提高可用性。
从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志(Log)对象,即分区的副本与日志对象是一一对应的。每个主题对应的 分区数 可以在Kafka启动时所加载的配置文件中配置,也可以在创建主题时指定。当然,客户端还可以在主题创建后修改主题的分区数。
为什么副本要分Leader和Follower? 如果没有Leader副本,就需要所有的副本都同时负责读/写请求处理,同时还得保证这些副本之间数据的一致性,假设有n个副本则需要有n×n条通路来同步数据,这样数据的一致性和有序性就很难保证。
为解决这个问题,Kafka选择分区的一个副本为Leader,该分区其他副本为Follower,只有 Leader副本 才负责处理客户端 读/写请求 ,Follower副本从Leader副本同步数据。
引入Leader副本后客户端只需与Leader副本进行交互,这样数据一致性及顺序性就有了保证。Follower副本从Leader副本同步消息,对于n个副本只需n-1条通路即可,这样就使得系统更加简单而高效。
副本Follower与Leader的角色并不是固定不变的,如果Leader失效,通过相应的选举算法将从其他Follower副本中选出新的Leader副本。
疑问 :leader副本和follower副本是如何选出来的?通过zookeeper选举的嘛?
Kafka在ZooKeeper中动态维护了一个 ISR(In-sync Replica) ,即保存同步的副本列表,该列表中保存的是与Leader副本保持消息同步的所有副本对应的代理节点id。 如果一个Follower副本宕机或是落后太多 ,则该Follower副本节点将 从ISR列表中移除 。 本书用宕机 来特指某个代理失效的情景,包括但不限于代理被关闭,如代理被人为关闭或是发生物理故障、心跳检测过期、网络延迟、进程崩溃等。
任何发布到分区的消息会被直接追加到日志文件的尾部(分区目录下以“.log”为文件名后缀的数据文件),而每条 消息 在日志文件中的位置都会对应一个按序递增的 偏移量 。偏移量是一个分区下严格有序的 逻辑值 ,它并不表示消息在磁盘上的物理位置。由于Kafka几乎不允许对消息进行随机读写,因此Kafka并没有提供额外索引机制到存储偏移量。
消费者可以通过控制消息偏移量来对消息进行消费 ,如消费者可以指定消费的起始偏移量。 为了保证消息被顺序消费,消费者已消费的消息对应的偏移量也需要保存 。需要说明的是,消费者对消息偏移量的操作并不会影响消息本身的偏移量。旧版消费者将消费偏移量保存到ZooKeeper当中, 而新版消费者是将消费偏移量保存到Kafka内部一个主题当中。 当然,消费者也可以自己在外部系统保存消费偏移量,而无需保存到Kafka中。
推测 :一个主题有多个分区,一个分区有多个副本。一个主题(一类消息)有多个分区(消息被分段),一个分区(每段消息)有多个副本(每段消息的副本数)。消息一旦发给kafka,就会分配一个偏移量,在多个副本中的偏移量是一样的。这样的话,消费者通过偏移量消费时对于多个副本就没有差异性。
Kafka集群由一个或多个Kafka实例构成,每一个Kafka实例称为代理(Broker),通常也称代理为Kafka服务器(KafkaServer)。在生产环境中Kafka集群一般包括一台或多台服务器,我们可以在一台服务器上配置一个或多个代理。 每一个代理都有唯一的标识id,这个id是一个非负整数 。在一个Kafka集群中,每增加一个代理就需要为这个代理配置一个与该集群中其他代理不同的id, id值可以选择任意非负整数即可,只要保证它在整个Kafka集群中唯一,这个id就是代理的名字,也就是在启动代理时配置的broker.id对应的值。
生产者(Procer)负责将消息发送给代理,也就是向Kafka代理发送消息的客户端。
消费者(Comsumer)以拉取(pull)方式拉取数据,它是消费的客户端。在Kafka中 每一个消费者都属于一个特定消费组 (ConsumerGroup),可以为每个消费者指定一个消费组,以groupId代表消费组名称,通过group.id配置设置。 如果不指定消费组 ,则该消费者属于默认消费组test-consumer-group。
每个消费者有一个全局唯一的id ,通过配置项client.id指定, 如果客户端没有指定消费者的id, Kafka会自动为该消费者生成一个全局唯一的id,格式为${groupId}-${hostName}-${timestamp}-${UUID前8位字符}。 同一个主题的一条消息只能被同一个消费组下某一个消费者消费 ,但不同消费组的消费者可同时消费该消息。 消费组是Kafka用来实现对一个主题消息进行广播和单播的手段 ,实现消息广播只需指定各消费者均属于不同的消费组,消息单播则只需让各消费者属于同一个消费组。
推论: kafka消息是按照消息类型(主题),在一个消费者组中只能消费一次。也就是一个消费者组只消费一类型的消息。如果某个服务要消费一类消息,必须将自己置为不同的消费者组。
Kafka利用ZooKeeper保存相应元数据信息, Kafka元数据信息包括如代理节点信息、Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。 Kafka在启动或运行过程当中会在ZooKeeper上创建相应节点 来保存元数据信息, Kafka通过监听机制在这些节点注册相应监听器来监听节点元数据的变化 ,从而由ZooKeeper负责管理维护Kafka集群,同时通过ZooKeeper我们能够很方便地对Kafka集群进行水平扩展及数据迁移。
❾ kafka原理分析
作为一款典型的消息中间件产品,kafka系统仍然由procer、broker、consumer三部分组成。kafka涉及的几个常用概念和组件简单介绍如下:
当consumer group的状态发生变化(如有consumer故障、增减consumer成员等)或consumer group消费的topic状态发生变化(如增加了partition,消费的topic发生变化),kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡)。
__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumer group已经消费了每个topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id组成,格式为 {topic name}-${partition id},value值就是consumer提交的已消费的topic partition offset值。__consumer_offsets的分区数和副本数分别由offsets.topic.num.partitions(默认值为50)和offsets.topic.replication.factor(默认值为1)参数配置。我们通过公式 hash(group id) % offsets.topic.num.partitions 就可以计算出指定consumer group的已提交offset存储的partition。由于consumer group提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个compact topic,kafka集群会周期性的对__consumer_offsets执行compact操作,只保留最新的一次提交offset。
group coordinator运行在kafka某个broker上,负责consumer group内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。group coordinator管理partition分配时,会指定consumer group内某个consumer作为group leader执行具体的partition分配任务。存储某个consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是该consumer group的协调器运行的broker。
跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,2.8版本之前通过zookeeper进行选主,2.8版本后通过kafka raft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controller epoch值(相当于zookeeper ZAB协议的epoch,raft协议的term值)
当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供procer和consumer查询获取。
因为只有partition的leader副本才会处理procer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本分配时需要使partition的分布情况是如下这样的:
在默认情况下,kafka采用轮询(round-robin)的方式分配partition副本。由于partition leader副本承担的流量比follower副本大,kafka会先分配所有topic的partition leader副本,使所有partition leader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。
举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(编码格式为topic-partition-replia,编号均从1开始,第一个replica是leader replica,其他的是follower replica)。共有四个broker,编号是1-4。我们先对broker按broker id进行排序,然后分配leader副本,最后分配foller副本。
1)没有配置broker.rack的情况
现将副本1-1-1分配到broker 1,然后1-2-1分配到broker 2,依此类推,2-2-1会分配到broker 4。partition 1-1的leader副本分配在broker 1上,那么下一个可用节点是broker 2,所以将副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那么下一个可用节点是broker 3,所以将副本1-1-2分配到broker 3上。依此类推分配其他的副本分片。最后分配的结果如下图所示:
2)配置了broker.rack的情况
假设配置了两个rack,broker 1和broker 2属于Rack 1,broker 3和broker 4属于Rack 2。我们对rack和rack内的broker分别排序。然后先将副本1-1-1分配到Rack 1的broker 1,然后将副本1-2-1分配到下一个Rack的第一个broker,即Rack 2的broker 3。其他的parttition leader副本依此类推。然后分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一个可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此类推。最后分配的结果如下图所示:
kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本
kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preference leader副本。
当leader副本所在broker失效时(宕机或网络分区等),controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个unclean.leader.election配置参数,它的默认值为true。当unclean.leader.election值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当unclean.leader.election值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。
当preference leader失效后,controller重新选择一个新的leader,但是preference leader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preference leader仍然会成为实际的leader,原先的新leader变为follower。因为在partition leader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。
为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:
1)follower副本处于活跃状态,与zookeeper(2.8之前版本)或kafka raft master之间的心跳正常
2)follower副本最近replica.lag.time.max.ms(默认是10秒)时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。
follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(Last End Offset)和HW(High Watermark)。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式: follower HW <= leader HW <= follower LEO <= leader LEO 。HW对应的log又叫做committed log,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式committed log消息系统。
kafka的消息内容存储在log.dirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘log.dirs目录下的一个单独的目录下,目录命名规范为 ${topicName}-${partitionId} ,每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为: {baseOffset}.index)和一个时间戳索引文件(命名规范为:${baseOffset}.timeindex)组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。.index文件存储的是消息的offset到该消息在相应.log文件中的偏移,便于快速在.log文件中快速找到指定offset的消息。.index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔10条消息建立一个索引)。.timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。
可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,log.dirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。
kafka提供了两个参数log.segment.bytes和log.segment.ms来控制LogSegment文件的大小。log.segment.bytes默认值是1GB,当LogSegment大小达到log.segment.bytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(Active)LogSegment。log.segment.ms表示最大多长时间会生成一个新的LogSegment,log.segment.ms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。
kafka还提供了log.retention.ms和log.retention.bytes两个参数来控制消息的保留时间。当消息的时间超过了log.retention.ms配置的阈值(默认是168小时,也就是一周),则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了log.retention.bytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在log.retention.bytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过log.retention.ms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略
当我们使用KafkaProcer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProcerRecord对象就可以通过KafkaProcer的send()向kafka发送消息了,而且是线程安全的。KafkaProcer支持通过三种消息发送方式
KafkaProcer客户端虽然使用简单,但是一条消息从客户端到topic partition的日志文件,中间需要经历许多的处理过程。KafkaProcer的内部结构如下所示:
从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProcer.send()方法的应用程序线程,因为KafkaProcer.send()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProcer实例时,会创建一个Sender线程,通过该KafkaProcer实例发送的所有消息最终通过该Sender线程发送出去。RecordAccumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProcer.send()方法时,消息并没有直接发送出去,只是写入了RecordAccumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAccumulator队列取出来发送给kafka集群。
消息的发送过程如下:
在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送),所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumer.wakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。
跟procer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topic partition leader replica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。
我们知道,一个consumer group有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumer group内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过group coordinator来实现。基本过程如下:
我们可以通过实现接口org.apache.kafka.clients.consumer.internals.PartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。
partition分配完后,每个consumer知道了自己消费的topic partition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向broker poll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequest poll消息之前需要向Group Coordinator发送OffsetFetchRequest获取消费消息的起始位置。Group Coordinator会通过key {topic}-${partition}查询 __consumer_offsets topic中是否有offset的有效记录,如果存在,则将consumer所属consumer group最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumer group消费,也可能是该partition长时间没有被该consumer group消费),则根据consumer配置参数auto.offset.reset值确定consumer消费的其实offset。如果auto.offset.reset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumer.seek()方法人工设置消费的起始offset。
kafka broker在收到FetchRequest请求后,会使用请求中topic partition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值)确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在.index文件中),确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetch.min.bytes和max.partition.fetch.bytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息)。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetch.max.wait.ms告诉broker即使消息大小没有达到fetch.min.bytes值,在收到请求后最多等待fetch.max.wait.ms时间后,也将当前消息返回给consumer。fetch.min.bytes默认值为1MB,待fetch.max.wait.ms默认值为500ms。
为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了procer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。
为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给group coordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。
kafka提供了多种offset提交方式
partition offset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafka broker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset),所以应用程序可以先提交3000,然后提交2000,再平衡后从2000处开始消费,决定权完全在consumer这边。
kafka中的topic partition与consumer group中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。
触发再平衡的条件包括:
需要注意的是,kafka集群broker的增减或者topic partition leader重新选主这类集群状态的变化并不会触发在平衡
有两种情况与日常应用开发比较关系比较密切:
consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。
我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。
以下是保证kafka吞吐量大的一些设计考虑:
但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:
所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下procer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。
我们通过procer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是procer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。
kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:
kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。
如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数max.in.flight.requests.per.connection设置为1,而retries设置为大于1的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。
❿ 如何在kafka-python和confluent-kafka之间做出选择
kafka-python:蛮荒的西部
kafka-python是最受欢迎的Kafka Python客户端。我们过去使用时从未出现过任何问题,在我的《敏捷数据科学2.0》一书中我也用过它。然而在最近这个项目中,它却出现了一个严重的问题。我们发现,当以文档化的方式使用KafkaConsumer、Consumer迭代式地从消息队列中获取消息时,最终到达主题topic的由Consumer携带的消息通常会丢失。我们通过控制台Consumer的分析验证了这一点。
需要更详细说明的是,kafka-python和KafkaConsumer是与一个由SSL保护的Kafka服务(如Aiven Kafka)一同使用的,如下面这样:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)
for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...
当以这样的推荐方式使用时,KafkaConsumer会丢失消息。但有一个变通方案,就是保留所有消息。这个方案是Kafka服务提供商Aiven support提供给我们的。它看起来像这样:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...
虽然这个变通方案可能有用,但README中的方法会丢弃消息使我对其失去兴趣。所以我找到了一个替代方案。
confluent-kafka:企业支持
发现coufluent-kafka Python模块时,我感到无比惊喜。它既能做librdkafka的外封装,又非常小巧。librdkafka是一个用C语言写的kafka库,它是Go和.NET的基础。更重要的是,它由Confluent公司支持。我爱开源,但是当“由非正式社区拥有或支持”这种方式效果不行的时候,或许该考虑给替代方案印上公章、即该由某个公司拥有或支持了。不过,我们并未购买商业支持。我们知道有人会维护这个库的软件质量,而且可以选择买或不买商业支持,这一点真是太棒了。
用confluent-kafka替换kafka-python非常简单。confluent-kafka使用poll方法,它类似于上面提到的访问kafka-python的变通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())
kafka_consumer.close()
现在我们能收到所有消息了。我并不是说kafka-python工具不好,我相信社区会对它的问题做出反应并解决。但从现在开始,我会一直坚持使用confluent-kafka。
开源治理
开源是强大的,但是涉及到复杂的“大数据”和NoSQL工具时,通常需要有一家大公司在背后推动工具的开发。这样你就知道,如果那个公司可以使用工具,那么该工具应该拥有很好的基本功能。它的出现可能是非正式的,就像某公司发布类似FOSS的项目一样,但也可能是正式的,就像某公司为工具提供商业支持一样。当然,从另一个角度来看,如果一家与开源社区作对的公司负责开发某个工具,你便失去了控制权。你的意见可能无关紧要,除非你是付费客户。
理想情况是采取开源治理,就像Apache基金会一样,还有就是增加可用的商业支持选项。这对互联网上大部分的免费软件来说根本不可能。限制自己只使用那些公司盖章批准后的工具将非常限制你的自由。这对于一些商店可能是正确选择,但对于我们不是。我喜欢工具测试,如果工具很小,而且只专心做一件事,我就会使用它。
信任开源
对于更大型的工具,以上决策评估过程更为复杂。通常,我会看一下提交问题和贡献者的数量,以及最后一次commit的日期。我可能会问朋友某个工具的情况,有时也会在推特上问。当你进行嗅探检查后从Github选择了一个项目,即说明你信任社区可以产出好的工具。对于大多数工具来说,这是没问题的。
但信任社区可能存在问题。对于某个特定的工具,可能并没有充分的理由让你信任社区可以产出好的软件。社区在目标、经验和开源项目的投入时间方面各不相同。选择工具时保持审慎态度十分重要,不要让理想蒙蔽了判断。