导航:首页 > 编程语言 > java消费kafka

java消费kafka

发布时间:2023-10-26 16:35:26

java工程kafka传递自定义对象,消费端获取到的是null

3. 启服务
3.1 启zookeeper
启zk两种式第种使用kafka自带zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另种使用其zookeeper位于本机位于其址种情况需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 启 kafka
bin/kafka-server-start.sh config/server.properties
4.创建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
创建名testtopic副本区
通list命令查看刚刚创建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.启procer并发送消息启procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
启发送消息

test
hello boy
按Ctrl+C退发送消息
6.启consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
启consumerconsole看procer发送消息
启两终端发送消息接受消息
都行查看zookeeper进程kafkatopic步步排查原吧

❷ Kafka相关内容总结(Kafka集群搭建手记)

Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Procer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是procer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
入门请参照: https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html
在此不再赘述。

这部分不是本文的重点,但是kafka需要用到kafka集群,所以先搭建kafka集群。
从kafka官方文档看到,kafka似乎在未来的版本希望抛弃zookeep集群,自己维护集群的一致性,拭目以待吧。
我们搭建集群使用的是三台同机房的机器,因为zookeeper不怎么占资源也不怎么占空间(我们的业务目前比较简单),所以三台机器上都搭建了zookeeper集群。
搭建zookeeper集群没什么难度,参考文档: http://www.cnblogs.com/huangxincheng/p/5654170.html
下面列一下我的配置并解析:

一共用三台物理机器,搭建一个Kafka集群。
每台服务器的硬盘划分都是一样的,每个独立的物理磁盘挂在一个单独的分区里面,这样很方便用于Kafka多个partition的数据读写与冗余。
/data1比较小,为了不成为集群的瓶颈,所以/data1用于存放kafka以及Zookeeper
每台机器的磁盘分布如下:

下面是kafka的简单配置,三台服务器都一样,如有不一致的在下文有说明。
kafka安装在目录/usr/local/kafka/下,下面的说明以10.1.xxx.57为例。

最重要的配置文件server.properties,需要配置的信息如下:

从上面的配置看到,kafka集群不需要像hadoop集群那样,配置ssh通讯,而且一个kafka服务器(官方文档称之为broker,下面统一使用这个称呼)并不知道其他的kafka服务器的存在,因此你需要逐个broker去启动kafka。各个broker根据自己的配置,会自动去配置文件上的zk服务器报到,这就是一个有zk服务器粘合起来的kafka集群。
我写了一个启动脚本,放在 /usr/local/kafka/bin 下面。启动脚本每个broker都一样:

如同kafka集群里面每一个broker都需要单独启动一样,kafka集群里面每一个broker都需要单独关闭。
官方给出的关闭脚本是单独运行 bin/kafka-server-stop.sh
但是我运行的结果是无法关闭。打开脚本一看,才发现是最简单的办法,发一个TERM信号到kafka的java进程,官方脚本给出的grep有点问题。
发信号之后,一直tail着kafka日志,看到正常关闭。

指定zookeeper服务器,topic名称是LvsKafka(注意topic名称不能有英文句号(.)和下划线(_),否则会通不过,理由是名称会冲突,下文对此略有解析)
replication-factor指出重复因子是2,也就是每条数据有两个拷贝,可靠性考虑。
partitions 指出需要多少个partition,数据量大的多一点,无论生产和消费,这是负载均衡和高并发的需要。

可以看到刚才新建的24个partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59这台机器。
建立topic时我们指出需要2个拷贝,从上面的输出的Replicas字段看到,这两个拷贝放在59,58两个机器,也就是10.1.xxx.59和10.1.xxx.58.
Isr表示当前partition的所有拷贝所在的机器中,哪些是还活着(可以提供服务)的。现在是59和58都还存活。

这个命令另外还会看到一些类似于下面的内容:

__consumer_offsets到底是什么呢?其实就是客户端的消费进度,客户端会定时上报到kafka集群,而kafka集群会把每个客户端的消费进度放入一个自己内部的topic中,这个topic就是__consumer_offsets。我查看过__consumer_offsets的内容,其实就是每个客户端的消费进度作为一条消息,放入__consumer_offsets这个topic中。
这里给了我们两个提示:
1、kafka自己管理客户端的消费进度,而不是依靠zk,这就是kafka官方文档说的kafka未来会抛弃zk的底气之一;
2、留意到这个kafka自己的topic是带下划线的,也就是,kafka担心我们自己建的topic如果带下划线的话会跟这些内部自用的topic冲突;

❸ kafka是干嘛的

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

主要特性

Kafka是一种高吞吐量 的分布式发布订阅消息系统,有如下特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。

支持通过Kafka服务器和消费机集群来分区消息。

支持Hadoop并行数据加载。

Kafka通过官网发布了最新版本3.0.0。

以上内容来自 网络-kafka

❹ kafka——消费者原理解析

kafka采用发布订阅模式:一对多。发布订阅模式又分两种:

Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了队列模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。 一个消费者组中消费者订阅同一个Topic,每个消费者接受Topic的一部分分区的消息,从而实现对消费者的横向扩展,对消息进行分流。

注意:当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。

消费者组的概念就是:当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力,消费者组中的每个消费者只处理每个Topic的一部分的消息,每个消费者对应一个线程。

在同一个群组中,无法让一个线程运行多个消费者,也无法让多线线程安全地共享一个消费者。按照规则,一个消费者使用一个线程,如果要在同一个消费者组中运行多个消费者,需要让每个消费者运行在自己的线程中。最好把消费者的逻辑封装在自己的对象中,然后使用java的ExecutorService启动多个线程,使每个消费者运行在自己的线程上,可参考 https://www.confluent.io/blog

一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。

关于如何设置partition值需要考虑的因素

Kafka 有两种分配策略,一个是 RoundRobin,一个是 Range,默认为Range,当消费者组内消费者发生变化时,会触发分区分配策略(方法重新分配)。

以上三种现象会使partition的所有权在消费者之间转移,这样的行为叫作再均衡。

再均衡的优点

再均衡的缺点

RoundRobin 轮询方式将分区所有作为一个整体进行 Hash 排序,消费者组内分配分区个数最大差别为 1,是按照组来分的,可以解决多个消费者消费数据不均衡的问题。

但是,当消费者组内订阅不同主题时,可能造成消费混乱,如下图所示,Consumer0 订阅主题 A,Consumer1 订阅主题 B。

将 A、B 主题的分区排序后分配给消费者组,TopicB 分区中的数据可能 分配到 Consumer0 中。

Range 方式是按照主题来分的,不会产生轮询方式的消费混乱问题。

但是,如下图所示,Consumer0、Consumer1 同时订阅了主题 A 和 B,可能造成消息分配不对等问题,当消费者组内订阅的主题越多,分区分配可能越不均衡。

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

consumer group +topic + partition 唯一确定一个offest

Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,
consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。

你如果特别好奇,实在想看看offset什么的,也可以执行下面操作:

修改配置文件 consumer.properties

再启动一个消费者

当消费者崩溃或者有新的消费者加入,那么就会触发再均衡(rebalance),完成再均衡后,每个消费者可能会分配到新的分区,而不是之前处理那个,为了能够继续之前的工作,消费者需要读取每个partition最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

case1:如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

case2:如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

自动提交的优点是方便,但是可能会重复处理消息

不足:broker在对提交请求作出回应之前,应用程序会一直阻塞,会限制应用程序的吞吐量。

因此,在消费者关闭之前一般会组合使用commitAsync和commitSync提交偏移量。

ConsumerRebalanceListener需要实现的两个方法

下面的例子演示如何在失去partition的所有权之前通过onPartitionRevoked()方法来提交偏移量。

Consumer有个Rebalance的特性,即重新负载均衡,该特性依赖于一个协调器来实现。每当Consumer Group中有Consumer退出或有新的Consumer加入都会触发Rebalance。

之所以要重新负载均衡,是为了将退出的Consumer所负责处理的数据再重新分配到组内的其他Consumer上进行处理。或当有新加入的Consumer时,将组内其他Consumer的负载压力,重新进均匀分配,而不会说新加入一个Consumer就闲在那。

下面就用几张图简单描述一下,各种情况触发Rebalance时,组内成员是如何与协调器进行交互的。

Tips :图中的Coordinator是协调器,而generation则类似于乐观锁中的版本号,每当成员入组成功就会更新,也是起到一个并发控制的作用。

参考:
https://blog.csdn.net/weixin_46122692/article/details/109270433

http://www.dockone.io/article/9956

https://www.cnblogs.com/sodawoods-blogs/p/8969774.html

https://blog.csdn.net/weixin_44367006/article/details/103075173

https://blog.51cto.com/zero01/2498017

阅读全文

与java消费kafka相关的资料

热点内容
linuxpython解释器 浏览:665
兴安得力软件加密狗 浏览:488
智能网络摄像头加密 浏览:570
软件毕业程序员培训 浏览:650
安卓陀螺仪低怎么办 浏览:245
一级建造师复习题集pdf 浏览:901
法理学pdf海默 浏览:390
服务器内存储器是用什么的 浏览:817
微帮同城分类信息源码 浏览:806
安卓系统ad是什么 浏览:471
python输出中不加占位符 浏览:594
linux文件夹权限控制 浏览:728
雅虎邮箱怎么加密码 浏览:819
为什么安卓手机登录不了苹果账号 浏览:535
如何复制usb加密狗 浏览:799
哪个app看你微笑时很美 浏览:908
mac启动命令 浏览:602
ngc服务器是什么的简称 浏览:73
深度系统如何创建文件夹 浏览:847
临汾单片机步进电机驱动电路 浏览:76