導航:首頁 > 編程語言 > java消費kafka

java消費kafka

發布時間:2023-10-26 16:35:26

java工程kafka傳遞自定義對象,消費端獲取到的是null

3. 啟服務
3.1 啟zookeeper
啟zk兩種式第種使用kafka自帶zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另種使用其zookeeper位於本機位於其址種情況需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 啟 kafka
bin/kafka-server-start.sh config/server.properties
4.創建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
創建名testtopic副本區
通list命令查看剛剛創建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.啟procer並發送消息啟procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
啟發送消息

test
hello boy
按Ctrl+C退發送消息
6.啟consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
啟consumerconsole看procer發送消息
啟兩終端發送消息接受消息
都行查看zookeeper進程kafkatopic步步排查原吧

❷ Kafka相關內容總結(Kafka集群搭建手記)

Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規范的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成為Procer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是procer和consumer都依賴於zookeeper來保證系統可用性集群保存一些meta信息。
入門請參照: https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html
在此不再贅述。

這部分不是本文的重點,但是kafka需要用到kafka集群,所以先搭建kafka集群。
從kafka官方文檔看到,kafka似乎在未來的版本希望拋棄zookeep集群,自己維護集群的一致性,拭目以待吧。
我們搭建集群使用的是三台同機房的機器,因為zookeeper不怎麼占資源也不怎麼占空間(我們的業務目前比較簡單),所以三台機器上都搭建了zookeeper集群。
搭建zookeeper集群沒什麼難度,參考文檔: http://www.cnblogs.com/huangxincheng/p/5654170.html
下面列一下我的配置並解析:

一共用三台物理機器,搭建一個Kafka集群。
每台伺服器的硬碟劃分都是一樣的,每個獨立的物理磁碟掛在一個單獨的分區裡面,這樣很方便用於Kafka多個partition的數據讀寫與冗餘。
/data1比較小,為了不成為集群的瓶頸,所以/data1用於存放kafka以及Zookeeper
每台機器的磁碟分布如下:

下面是kafka的簡單配置,三台伺服器都一樣,如有不一致的在下文有說明。
kafka安裝在目錄/usr/local/kafka/下,下面的說明以10.1.xxx.57為例。

最重要的配置文件server.properties,需要配置的信息如下:

從上面的配置看到,kafka集群不需要像hadoop集群那樣,配置ssh通訊,而且一個kafka伺服器(官方文檔稱之為broker,下面統一使用這個稱呼)並不知道其他的kafka伺服器的存在,因此你需要逐個broker去啟動kafka。各個broker根據自己的配置,會自動去配置文件上的zk伺服器報到,這就是一個有zk伺服器粘合起來的kafka集群。
我寫了一個啟動腳本,放在 /usr/local/kafka/bin 下面。啟動腳本每個broker都一樣:

如同kafka集群裡面每一個broker都需要單獨啟動一樣,kafka集群裡面每一個broker都需要單獨關閉。
官方給出的關閉腳本是單獨運行 bin/kafka-server-stop.sh
但是我運行的結果是無法關閉。打開腳本一看,才發現是最簡單的辦法,發一個TERM信號到kafka的java進程,官方腳本給出的grep有點問題。
發信號之後,一直tail著kafka日誌,看到正常關閉。

指定zookeeper伺服器,topic名稱是LvsKafka(注意topic名稱不能有英文句號(.)和下劃線(_),否則會通不過,理由是名稱會沖突,下文對此略有解析)
replication-factor指出重復因子是2,也就是每條數據有兩個拷貝,可靠性考慮。
partitions 指出需要多少個partition,數據量大的多一點,無論生產和消費,這是負載均衡和高並發的需要。

可以看到剛才新建的24個partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59這台機器。
建立topic時我們指出需要2個拷貝,從上面的輸出的Replicas欄位看到,這兩個拷貝放在59,58兩個機器,也就是10.1.xxx.59和10.1.xxx.58.
Isr表示當前partition的所有拷貝所在的機器中,哪些是還活著(可以提供服務)的。現在是59和58都還存活。

這個命令另外還會看到一些類似於下面的內容:

__consumer_offsets到底是什麼呢?其實就是客戶端的消費進度,客戶端會定時上報到kafka集群,而kafka集群會把每個客戶端的消費進度放入一個自己內部的topic中,這個topic就是__consumer_offsets。我查看過__consumer_offsets的內容,其實就是每個客戶端的消費進度作為一條消息,放入__consumer_offsets這個topic中。
這里給了我們兩個提示:
1、kafka自己管理客戶端的消費進度,而不是依靠zk,這就是kafka官方文檔說的kafka未來會拋棄zk的底氣之一;
2、留意到這個kafka自己的topic是帶下劃線的,也就是,kafka擔心我們自己建的topic如果帶下劃線的話會跟這些內部自用的topic沖突;

❸ kafka是幹嘛的

Kafka是由Apache軟體基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。

這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。

對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

主要特性

Kafka是一種高吞吐量 的分布式發布訂閱消息系統,有如下特性:

通過O(1)的磁碟數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。

高吞吐量:即使是非常普通的硬體Kafka也可以支持每秒數百萬的消息。

支持通過Kafka伺服器和消費機集群來分區消息。

支持Hadoop並行數據載入。

Kafka通過官網發布了最新版本3.0.0。

以上內容來自 網路-kafka

❹ kafka——消費者原理解析

kafka採用發布訂閱模式:一對多。發布訂閱模式又分兩種:

Kafka為這兩種模型提供了單一的消費者抽象模型: 消費者組 (consumer group)。 消費者用一個消費者組名標記自己。 一個發布在Topic上消息被分發給此消費者組中的一個消費者。 假如所有的消費者都在一個組中,那麼這就變成了隊列模型。 假如所有的消費者都在不同的組中,那麼就完全變成了發布-訂閱模型。 一個消費者組中消費者訂閱同一個Topic,每個消費者接受Topic的一部分分區的消息,從而實現對消費者的橫向擴展,對消息進行分流。

注意:當單個消費者無法跟上數據生成的速度,就可以增加更多的消費者分擔負載,每個消費者只處理部分partition的消息,從而實現單個應用程序的橫向伸縮。但是不要讓消費者的數量多於partition的數量,此時多餘的消費者會空閑。此外,Kafka還允許多個應用程序從同一個Topic讀取所有的消息,此時只要保證每個應用程序有自己的消費者組即可。

消費者組的概念就是:當有多個應用程序都需要從Kafka獲取消息時,讓每個app對應一個消費者組,從而使每個應用程序都能獲取一個或多個Topic的全部消息;在每個消費者組中,往消費者組中添加消費者來伸縮讀取能力和處理能力,消費者組中的每個消費者只處理每個Topic的一部分的消息,每個消費者對應一個線程。

在同一個群組中,無法讓一個線程運行多個消費者,也無法讓多線線程安全地共享一個消費者。按照規則,一個消費者使用一個線程,如果要在同一個消費者組中運行多個消費者,需要讓每個消費者運行在自己的線程中。最好把消費者的邏輯封裝在自己的對象中,然後使用java的ExecutorService啟動多個線程,使每個消費者運行在自己的線程上,可參考 https://www.confluent.io/blog

一個 consumer group 中有多個 consumer,一個 topic 有多個 partition,所以必然會涉及到 partition 的分配問題,即確定哪個 partition 由哪個 consumer 來消費。

關於如何設置partition值需要考慮的因素

Kafka 有兩種分配策略,一個是 RoundRobin,一個是 Range,默認為Range,當消費者組內消費者發生變化時,會觸發分區分配策略(方法重新分配)。

以上三種現象會使partition的所有權在消費者之間轉移,這樣的行為叫作再均衡。

再均衡的優點

再均衡的缺點

RoundRobin 輪詢方式將分區所有作為一個整體進行 Hash 排序,消費者組內分配分區個數最大差別為 1,是按照組來分的,可以解決多個消費者消費數據不均衡的問題。

但是,當消費者組內訂閱不同主題時,可能造成消費混亂,如下圖所示,Consumer0 訂閱主題 A,Consumer1 訂閱主題 B。

將 A、B 主題的分區排序後分配給消費者組,TopicB 分區中的數據可能 分配到 Consumer0 中。

Range 方式是按照主題來分的,不會產生輪詢方式的消費混亂問題。

但是,如下圖所示,Consumer0、Consumer1 同時訂閱了主題 A 和 B,可能造成消息分配不對等問題,當消費者組內訂閱的主題越多,分區分配可能越不均衡。

由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復後,需要從故障前的位置繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復後繼續消費。

consumer group +topic + partition 唯一確定一個offest

Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中,從 0.9 版本開始,
consumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic 為__consumer_offsets。

你如果特別好奇,實在想看看offset什麼的,也可以執行下面操作:

修改配置文件 consumer.properties

再啟動一個消費者

當消費者崩潰或者有新的消費者加入,那麼就會觸發再均衡(rebalance),完成再均衡後,每個消費者可能會分配到新的分區,而不是之前處理那個,為了能夠繼續之前的工作,消費者需要讀取每個partition最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。

case1:如果提交的偏移量小於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息就會被重復處理。

case2:如果提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息將會丟失。

自動提交的優點是方便,但是可能會重復處理消息

不足:broker在對提交請求作出回應之前,應用程序會一直阻塞,會限制應用程序的吞吐量。

因此,在消費者關閉之前一般會組合使用commitAsync和commitSync提交偏移量。

ConsumerRebalanceListener需要實現的兩個方法

下面的例子演示如何在失去partition的所有權之前通過onPartitionRevoked()方法來提交偏移量。

Consumer有個Rebalance的特性,即重新負載均衡,該特性依賴於一個協調器來實現。每當Consumer Group中有Consumer退出或有新的Consumer加入都會觸發Rebalance。

之所以要重新負載均衡,是為了將退出的Consumer所負責處理的數據再重新分配到組內的其他Consumer上進行處理。或當有新加入的Consumer時,將組內其他Consumer的負載壓力,重新進均勻分配,而不會說新加入一個Consumer就閑在那。

下面就用幾張圖簡單描述一下,各種情況觸發Rebalance時,組內成員是如何與協調器進行交互的。

Tips :圖中的Coordinator是協調器,而generation則類似於樂觀鎖中的版本號,每當成員入組成功就會更新,也是起到一個並發控制的作用。

參考:
https://blog.csdn.net/weixin_46122692/article/details/109270433

http://www.dockone.io/article/9956

https://www.cnblogs.com/sodawoods-blogs/p/8969774.html

https://blog.csdn.net/weixin_44367006/article/details/103075173

https://blog.51cto.com/zero01/2498017

閱讀全文

與java消費kafka相關的資料

熱點內容
銀行人員如何解壓 瀏覽:827
newfile命令快捷鍵 瀏覽:567
阿里雲物理伺服器 瀏覽:953
靈狐視頻app哪個好 瀏覽:257
大廠退役程序員自述 瀏覽:252
linux命令watch 瀏覽:889
加密幣哪些平台不撤出中國 瀏覽:553
max加線命令 瀏覽:424
app胖瘦模式哪個好用 瀏覽:724
可以下載源碼的軟體 瀏覽:487
程序員寫一天代碼累嗎 瀏覽:628
ie文件夾禁止訪問 瀏覽:545
百川互聯網程序員 瀏覽:785
linuxpython解釋器 瀏覽:669
興安得力軟體加密狗 瀏覽:494
智能網路攝像頭加密 瀏覽:574
軟體畢業程序員培訓 瀏覽:654
安卓陀螺儀低怎麼辦 瀏覽:248
一級建造師復習題集pdf 瀏覽:905
法理學pdf海默 瀏覽:394