❶ Kafka簡介+Kafka Tool使用簡介+使用實例
詳細安裝訪問: https://www.jianshu.com/p/c74e0ec577b0
macOS 可以用homebrew快速安裝,訪問地址: https://www.jianshu.com/p/cddd25da8061
原文鏈接: https://www.jianshu.com/p/06884c5bf3f1
查看topic列表:
創建topic:
--create :創建命令;
--topic :後面指定topic名稱;
--replication-factor :後面指定副本數;
--partitions :指定分區數,根據broker的數量決定;
--zookeeper :後面指定 zookeeper.connect 的zk鏈接
查看某個topic:
Kafka 作為消息系統的一種, 當然可 以像其他消 息中 間件一樣作為消息數據中轉的平台。 下面以 Java 語言為例,看一下如何使用 Kafka 來發送和接收消息。
1、引入依賴
2、消息生產者
示例 中用 KafkaProcer 類來創建一個消息生產者,該類的構造函數入參是一系列屬性值。下面看一下這些屬性具體都是什麼含義。
bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理伺服器,則伺服器地址之間用逗號分隔, 比如」 192.168.1.1 :9092,192.168.1.2:9092」 。 localhost 是筆者電腦的地址,9092 是 Kafka 伺服器默認監聽的埠號。
key.serializer 和 value.serializer 表示消息的序列化類型 。 Kafka 的消息是以鍵值對的形式發送到 Kafka 伺服器的,在消息被發送到伺服器之前,消息生產者需要把不同類型的 消息序列化為 二 進制類型,示例中是發送文本消息到伺服器 , 所以使用的是StringSerializer。
key.deserializer 和 value.deserializer 表示消息的反序列化類型。把來自 Kafka 集群的二進制消 息反序列 化 為指定 的 類型,因為序列化用的是String類型,所以用StringDeserializer 來反序列化。
zk.connect 用於指定 Kafka 連接 ZooKeeper 的 URL ,提供了基於 ZooKeeper 的集群伺服器自動感知功能, 可以動態從 ZooKeeper 中讀取 Kafka 集群配置信息。
有 了 消息生產者之後 , 就可以調用 send 方法發送消息了。該方法的入參是 ProcerRecord類型對象 , ProcerRecord 類提供了多種構造函數形參,常見的有如下三種 :
ProcerRecord(topic,partition,key,value);
ProcerRecord(topic,key,value);
ProcerRecord(topic, value) ;
其中 topic 和 value 是必填的, partition 和 key 是可選的 。如果指定了 pa時tion,那麼消息會被發送至指定的 partition ;如果沒指定 partition 但指定了 Key,那麼消息會按照 hash(key)發送至對應的 partition: 如果既沒指定 partition 也沒指定 key,那麼 消息會按照 round-robin 模式發送(即以輪詢的方式依次發送〉到每一個 partition。示例中將向 test-topic 主題發送三條消息。
3、消息消費者
和消息生產者類似,這里用 KafkaConsumer 類來創建一個消息消費者,該類的構造函數入參也是一系列屬性值。
bootstrap. servers 和生產者一樣,表示 Kafka 集群。
group.id 表示消費者的分組 ID。
enable.auto.commit 表示 Consumer 的 offset 是否自 動提交 。
auto.commit.interval .ms 用於設置自動提交 offset 到 ZooKeeper 的時間間隔,時間單位是毫秒。
key. deserializer 和 value.deserializer 表示用字元串來反序列化消息數據。
消息消費者使用 subscribe 方法 訂閱了 Topic 為 test-topic 的消息。 Consumer 調用poll 方法來輪詢 Kafka 集群的消息, 一直等到 Kafka 集群中沒有消息或達到超時時間(示例中設置超時時間為 100 毫秒)為止 。 如果讀取到消息,則列印出消息記錄的 pa此ition, offset、key 等。
❷ 一文解密Kafka,Kafka源碼設計與實現原理剖析,真正的通俗易懂
Apache Kafka (簡稱Kafka )最早是由Linkedln開源出來的分布式消息系統,現在是Apache旗下的一個子項目,並且已經成為開冊、領域應用最廣泛的消息系統之 Kafka社區也非常活躍,從 版本開始, Kafka 的標語已經從「一個高吞吐量、分布式的消息系統」改為「一個分布式的流平台」
關於Kafka,我打算從入門開始講起,一直到它的底層實現邏輯個原理以及源碼,建議大家花點耐心,從頭開始看,相信會對你有所收獲。
作為 個流式數據平台,最重要的是要具備下面 個特點
消息系統:
消息系統 也叫作消息隊列)主要有兩種消息模型:隊列和發布訂Kafka使用消費組( consumer group )統 上面兩種消息模型 Kafka使用隊列模型時,它可以將處理 作為平均分配給消費組中的消費者成員
下面我們會從 個角度分析Kafka 的幾個基本概念,並嘗試解決下面 個問題
消息由生產者發布到 fk 集群後,會被消費者消費 消息的消費模型有兩種:推送模型( pu和拉取模型( pull 基於推送模型的消息系統,由消息代理記錄消費者的消費狀態 消息代理在將消息推送到消費者後 標記這條消息為已消費
但這種方式無法很好地保證消息的處理語義 比如,消息代理把消息發送出去後,當消費進程掛掉或者由於網路原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經 這條消息標記為自己消費了,但實際上這條消息並沒有被實際處理) 如果要保證消息的處理語義,消息代理發送完消息後,要設置狀態為「已發送」,只有收到消費者的確認請求後才更新為「已消費」,這就需要在消息代理中記錄所有消息的消費狀態,這種做法也是不可取的
Kafka每個主題的多個分區日誌分布式地存儲在Kafka集群上,同時為了故障容錯,每個分區都會以副本的方式復制到多個消息代理節點上 其中一個節點會作為主副本( Leader ),其 節點作為備份副本( Follower ,也叫作從副本)
主副本會負責所有的客戶端讀寫操作,備份副本僅僅從主副本同步數據 當主副本 IH 現在故障時,備份副本中的 副本會被選擇為新的主副本 因為每個分區的副本中只有主副本接受讀寫,所以每個服務端都會作為某些分區的主副本,以及另外一些分區的備份副本這樣Kafka集群的所有服務端整體上對客戶端是負載均衡的
消息系統通常由生產者「pro ucer 消費者( co sumer )和消息代理( broke 大部分組成,生產者會將消息寫入消息代理,消費者會從消息代理中讀取消息 對於消息代理而言,生產者和消費者都屬於客戶端:生產者和消費者會發送客戶端請求給服務端,服務端的處理分別是存儲消息和獲取消息,最後服務端返回響應結果給客戶端
新的生產者應用程序使用 af aP oce 對象代表 個生產者客戶端進程 生產者要發送消息,並不是直接發送給 務端 ,而是先在客戶端 消息放入隊列 然後 一個 息發送線程從隊列中消息,以 鹽的方式發送消息給服務端 Kafka的記 集器( Reco dACCUl'lUlato )負責緩存生產者客戶端產生的消息,發送線程( Sende )負責讀取 集器的批 過網路發送給服務端為了保證客戶端 絡請求 快速 應, Kafka 用選擇器( Selecto 絡連接 讀寫 理,使網路連接( Netwo kCl i.ent )處理客戶端 絡請求
追加消息到記錄收集器時按照分區進行分組,並放到batches集合中,每個分區的隊列都保存了將發送到這個分區對應節點上的 記錄,客戶端的發送線程可 只使用 Sende 線程迭 batches的每個分區,獲取分區對應的主劇本節點,取出分區對應的 列中的批記錄就可以發送消息了
消息發送線程有兩種消息發送方式 按照分區直接發送 按照分區的目標節點發迭 假設有兩台伺服器, 題有 個分區,那麼每台伺服器就有 個分區 ,消息發送線程迭代batches的每個分 接往分區的主副本節點發送消息,總共會有 個請求 所示,我 先按照分區的主副本節點進行分組, 屬於同 個節點的所有分區放在一起,總共只有兩個請求做法可以大大減少網路的開銷
消息系統由生產者 存儲系統和消費者組成 章分析了生產者發送消息給服務端的過程,本章分析消費者從服務端存儲系統讀取生產者寫入消息的過程 首先我 來了解消費者的 些基礎知識
作為分布式的消息系統, Kafka支持多個生產者和多個消費者,生產者可以將消息發布到集群中不同節點的不同分區上;「肖費者也可以消費集群中多個節點的多個分區上的消息 寫消息時,多個生產者可以 到同 個分區 讀消息時,如果多個消費者同時讀取 個分區,為了保證將日誌文件的不同數據分配給不同的消費者,需要採用加鎖 同步等方式,在分區級別的日誌文件上做些控制
相反,如果約定「同 個分區只可被 個消費者處理」,就不需要加鎖同步了,從而可提升消費者的處理能力 而且這也並不違反消息的處理語義:原先需要多個消費者處理,現在交給一個消費者處理也是可以的 3- 給出了 種最簡單的消息系統部署模式,生產者的數據源多種多樣,它們都統寫人Kafka集群 處理消息時有多個消費者分擔任務 ,這些消費者的處理邏輯都相同, 每個消費者處理的分區都不會重復
因為分區要被重新分配,分區的所有者都會發生變 ,所以在還沒有重新分配分區之前 所有消費者都要停止已有的拉取錢程 同時,分區分配給消費者都會在ZK中記錄所有者信息,所以也要先刪ZK上的節點數據 只有和分區相關的 所有者 拉取線程都釋放了,才可以開始分配分區
如果說在重新分配分區前沒有釋放這些信息,再平衡後就可能造成同 個分區被多個消費者所有的情況 比如分區Pl 原先歸消費者 所有,如果沒有釋放拉取錢程和ZK節點,再平衡後分區Pl 被分配給消費者 了,這樣消費者 和消費者 就共享了分區Pl ,而這顯然不符合 fka 中關於「一個分區只能被分配給 個消費者」的限制條件 執行再平衡操作的步驟如下
如果是協調者節點發生故障,服務端會有自己的故障容錯機制,選出管理消費組所有消費者的新協調者節,點消費者客戶端沒有權利做這個工作,它能做的只是等待一段時間,查詢服務端是否已經選出了新的協調節點如果消費者查到現在已經有管理協調者的協調節點,就會連接這個新協調節,哉由於這個協調節點是服務端新選出來的,所以每個消費者都應該重新連接協調節點
消費者重新加入消費組,在分配到分區的前後,都會對消費者的拉取工作產生影響 消費者發送「加入組請求」之前要停止拉取消息,在收到「加入組響應」中的分區之後要重新開始拉取消息時,為了能夠讓客戶端應用程序感知消費者管理的分區發生變化,在加入組前後,客戶端還可以設置自定義的「消費者再平衡監聽器」,以便對分區的變化做出合適的處理
❸ Kafka 基礎原理及工作流程簡述
Kafka 工作流程
基礎總結:
1)broker :broker代表kafka的節點, Broker是分布式部署並且相互之間相互獨立的, 啟動的時候向zookeeper 注冊,在Zookeeper上會有一個專門 用來進行Broker伺服器列表記錄 的節點:/brokers/ids。每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創建屬於自己的節點,如/brokers/ids/[0...N]。Kafka使用了全局唯一的數字來指代每個Broker伺服器,不同的Broker必須使用不同的Broker ID進行注冊,創建完節點後, 每個Broker就會將自己的IP地址和埠信息記錄 到該節點中去。其中,Broker創建的節點類型是 臨時節點 ,一旦Broker 宕機 ,則 對應的臨時節點也會被自動刪除 。
2)topic:消息主題,在Kafka中,同一個 Topic的消息會被分成多個分區 並將其分布在多個Broker上, 這些分區信息及與Broker的對應關系 也都是由Zookeeper在維護,由專門的節點來記錄,如:/borkers/topics Kafka中每個Topic都會以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker伺服器啟動後,會到對應Topic節點(/brokers/topics)上注冊自己的Broker ID並寫入針對該Topic的分區總數,如/brokers/topics/login/3->2,這個節點表示Broker ID為3的一個Broker伺服器,對於"login"這個Topic的消息,提供了2個分區進行消息存儲,同樣,這個分區節點也是臨時節點。
3)partition :同一topic類型消息的分區,如圖,每個分區都存在一個leader 和N個follower(副本),副本個數在創建topic的時候可以指定創建多少個。消息生產者生產消息和消費組消費消息都是通過leader完成,副本的存在是為了防止發生節點宕機,導致leader掛了,follower隨時頂上去變成leader,繼續恢復生產。重點來了,leader所在節點掛了,會有follower變成leader,所以同一個topic的同一個partition的leader與follower不可能在同一個broker,這樣才能做到這個broker上的某個topic的某個partition的leader掛了,其他正常節點上的這個topic的這個partition的follower會頂上來。
4)生產者發送消息的 負載均衡 :由於同一個Topic消息會被分區並將其分布在多個Broker上,因此, 生產者需要將消息合理地發送到這些分布式的Broker上 ,那麼如何實現生產者的負載均衡,Kafka支持傳統的四層負載均衡,也支持Zookeeper方式實現負載均衡。 (4.1) 四層負載均衡,根據生產者的IP地址和埠來為其確定一個相關聯的Broker。通常,一個生產者只會對應單個Broker,然後該生產者產生的消息都發往該Broker。這種方式邏輯簡單,每個生產者不需要同其他系統建立額外的TCP連接,只需要和Broker維護單個TCP連接即可。但是,其無法做到真正的負載均衡,因為實際系統中的每個生產者產生的消息量及每個Broker的消息存儲量都是不一樣的,如果有些生產者產生的消息遠多於其他生產者的話,那麼會導致不同的Broker接收到的消息總數差異巨大,同時,生產者也無法實時感知到Broker的新增和刪除。 (4.2) 使用Zookeeper進行負載均衡,由於每個Broker啟動時,都會完成Broker注冊過程,生產者會通過該節點的變化來動態地感知到Broker伺服器列表的變更,這樣就可以實現動態的負載均衡機制。
5)消費者負載均衡:與生產者類似,Kafka中的消費者同樣需要進行負載均衡來實現多個消費者合理地從對應的Broker伺服器上接收消息,每個消費組分組包含若干消費者, 每條消息都只會發送給分組中的一個消費者 ,不同的消費者分組消費自己特定的Topic下面的消息,互不幹擾。
6)分區與消費者 的關系: 消費組 (Consumer Group) consumer group 下有多個 Consumer(消費者)。對於每個消費者組 (Consumer Group),Kafka都會為其分配一個全局唯一的Group ID,Group 內部的所有消費者共享該 ID。訂閱的topic下的每個分區只能分配給某個 group 下的一個consumer(當然該分區還可以被分配給其他group)。同時,Kafka為每個消費者分配一個Consumer ID,通常採用"Hostname:UUID"形式表示。在Kafka中,規定了 每個消息分區 只能被同組的一個消費者進行消費 ,因此,需要在 Zookeeper 上記錄 消息分區 與 Consumer 之間的關系,每個消費者一旦確定了對一個消息分區的消費權力,需要將其Consumer ID 寫入到 Zookeeper 對應消息分區的臨時節點上,例如:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] 其中,[broker_id-partition_id]就是一個 消息分區 的標識,節點內容就是該 消息分區 上 消費者的Consumer ID。
7)消息的消費進度Offset 記錄:在消費者對指定消息分區進行消息消費的過程中, 需要定時地將分區消息的消費進度Offset記錄到Zookeeper上 ,以便在該消費者進行重啟或者其他消費者重新接管該消息分區的消息消費後,能夠從之前的進度開始繼續進行消息消費。Offset在Zookeeper中由一個專門節點進行記錄,其節點路徑為:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] 節點內容就是Offset的值。這是kafka0.9和之前版本offset記錄的方式,之後的版本offset都改為存在kafka本地,當然了這里的本地是指磁碟不是內存。。。
8)消費者注冊:每個消費者伺服器啟動時,都會到Zookeeper的指定節點下創建一個屬於自己的消費者節點,例如/consumers/[group_id]/ids/[consumer_id],完成節點創建後,消費者就會將自己訂閱的Topic信息寫入該臨時節點。 對 消費者分組 中的 消費者 的變化注冊監聽 。每個 消費者 都需要關注所屬 消費者分組 中其他消費者伺服器的變化情況,即對/consumers/[group_id]/ids節點注冊子節點變化的Watcher監聽,一旦發現消費者新增或減少,就觸發消費者的負載均衡。 對Broker伺服器變化注冊監聽 。消費者需要對/broker/ids/[0-N]中的節點進行監聽,如果發現Broker伺服器列表發生變化,那麼就根據具體情況來決定是否需要進行消費者負載均衡。 進行消費者負載均衡 。為了讓同一個Topic下不同分區的消息盡量均衡地被多個 消費者 消費而進行 消費者 與 消息 分區分配的過程,通常,對於一個消費者分組,如果組內的消費者伺服器發生變更或Broker伺服器發生變更,會發出消費者負載均衡。
❹ 如何使用python 連接kafka 並獲取數據
連接
kafka
的庫有兩種類型,一種是直接連接
kafka
的,存儲
offset
的事情要自己在客戶端完成。還有一種是先連接
zookeeper
然後再通過
zookeeper
獲取
kafka
的
brokers
信息,
offset
存放在
zookeeper
上面,由
zookeeper
來協調。
我現在使用
samsa
這個
highlevel
庫
Procer示例
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']topic.publish('msg')
**
Consumer示例
**
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']consumer
=
topic.subscribe('groupname')for
msg
in
consumer:
print
msg
Tip
consumer
必需在
procer
向
kafka
的
topic
裡面提交數據後才能連接,否則會出錯。
在
Kafka
中一個
consumer
需要指定
groupname
,
groue
中保存著
offset
等信息,新開啟一個
group
會從
offset
0
的位置重新開始獲取日誌。
kafka
的配置參數中有個
partition
,默認是
1
,這個會對數據進行分區,如果多個
consumer
想連接同個
group
就必需要增加
partition
,
partition
只能大於
consumer
的數量,否則多出來的
consumer
將無法獲取到數據。
❺ Kafka核心組件之控制器和協調器
[TOC]
我們已經知道Kafka的集群由n個的broker所組成,每個broker就是一個kafka的實例或者稱之為kafka的服務。其實控制器也是一個broker,控制器也叫leader broker。
他除了具有一般broker的功能外,還負責分區leader的選取,也就是負責選舉partition的leader replica。
kafka每個broker啟動的時候,都會實例化一個KafkaController,並將broker的id注冊到zookeeper,集群在啟動過程中,通過選舉機制選舉出其中一個broker作為leader,也就是前面所說的控制器。
包括集群啟動在內,有三種情況觸發控制器選舉:
1、集群啟動
2、控制器所在代理發生故障
3、zookeeper心跳感知,控制器與自己的session過期
按照慣例,先看圖。我們根據下圖來講解集群啟動時,控制器選舉過程。
假設此集群有三個broker,同時啟動。
(一)3個broker從zookeeper獲取/controller臨時節點信息。/controller存儲的是選舉出來的leader信息。此舉是為了確認是否已經存在leader。
(二)如果還沒有選舉出leader,那麼此節點是不存在的,返回-1。如果返回的不是-1,而是leader的json數據,那麼說明已經有leader存在,選舉結束。
(三)三個broker發現返回-1,了解到目前沒有leader,於是均會觸發向臨時節點/controller寫入自己的信息。最先寫入的就會成為leader。
(四)假設broker 0的速度最快,他先寫入了/controller節點,那麼他就成為了leader。而broker1、broker2很不幸,因為晚了一步,他們在寫/controller的過程中會拋出ZkNodeExistsException,也就是zk告訴他們,此節點已經存在了。
經過以上四步,broker 0成功寫入/controller節點,其它broker寫入失敗了,所以broker 0成功當選leader。
此外zk中還有controller_epoch節點,存儲了leader的變更次數,初始值為0,以後leader每變一次,該值+1。所有向控制器發起的請求,都會攜帶此值。如果控制器和自己內存中比較,請求值小,說明kafka集群已經發生了新的選舉,此請求過期,此請求無效。如果請求值大於控制器內存的值,說明已經有新的控制器當選了,自己已經退位,請求無效。kafka通過controller_epoch保證集群控制器的唯一性及操作的一致性。
由此可見,Kafka控制器選舉就是看誰先爭搶到/controller節點寫入自身信息。
控制器的初始化,其實是初始化控制器所用到的組件及監聽器,准備元數據。
前面提到過每個broker都會實例化並啟動一個KafkaController。KafkaController和他的組件關系,以及各個組件的介紹如下圖:
圖中箭頭為組件層級關系,組件下面還會再初始化其他組件。可見控制器內部還是有些復雜的,主要有以下組件:
1、ControllerContext,此對象存儲了控制器工作需要的所有上下文信息,包括存活的代理、所有主題及分區分配方案、每個分區的AR、leader、ISR等信息。
2、一系列的listener,通過對zookeeper的監聽,觸發相應的操作,黃色的框的均為listener
3、分區和副本狀態機,管理分區和副本。
4、當前代理選舉器ZookeeperLeaderElector,此選舉器有上位和退位的相關回調方法。
5、分區leader選舉器,PartitionLeaderSelector
6、主題刪除管理器,TopicDeletetionManager
7、leader向broker批量通信的ControllerBrokerRequestBatch。緩存狀態機處理後產生的request,然後統一發送出去。
8、控制器平衡操作的KafkaScheler,僅在broker作為leader時有效。
Kafka集群的一些重要信息都記錄在ZK中,比如集群的所有代理節點、主題的所有分區、分區的副本信息(副本集、主副本、同步的副本集)。每個broker都有一個控制器,為了管理整個集群Kafka選利用zk選舉模式,為整個集群選舉一個「中央控制器」或」主控制器「,控制器其實就是一個broker節點,除了一般broker功能外,還具有分區首領選舉功能。中央控制器管理所有節點的信息,並通過向ZK注冊各種監聽事件來管理整個集群節點、分區的leader的選舉、再平衡等問題。外部事件會更新ZK的數據,ZK中的數據一旦發生變化,控制器都要做不同的響應處理。
故障轉移其實就是leader所在broker發生故障,leader轉移為其他的broker。轉移的過程就是重新選舉leader的過程。
重新選舉leader後,需要為該broker注冊相應許可權,調用的是ZookeeperLeaderElector的onControllerFailover()方法。在這個方法中初始化和啟動了一系列的組件來完成leader的各種操作。具體如下,其實和控制器初始化有很大的相似度。
1、注冊分區管理的相關監聽器
2、注冊主題管理的相關監聽
3、注冊代理變化監聽器
4、重新初始化ControllerContext,
5、啟動控制器和其他代理之間通信的ControllerChannelManager
6、創建用於刪除主題的TopicDeletionManager對象,並啟動。
7、啟動分區狀態機和副本狀態機
8、輪詢每個主題,添加監聽分區變化的
9、如果設置了分區平衡定時操作,那麼創建分區平衡的定時任務,默認300秒檢查並執行。
除了這些組件的啟動外,onControllerFailover方法中還做了如下操作:
1、/controller_epoch值+1,並且更新到ControllerContext
2、檢查是否出發分區重分配,並做相關操作
3、檢查需要將優先副本選為leader,並做相關操作
4、向kafka集群所有代理發送更新元數據的請求。
下面來看leader許可權被取消時,調用的方法onControllerResignation
1、該方法中注銷了控制器的許可權。取消在zookeeper中對於分區、副本感知的相應監聽器的監聽。
2、關閉啟動的各個組件
3、最後把ControllerContext中記錄控制器版本的數值清零,並設置當前broker為RunnignAsBroker,變為普通的broker。
通過對控制器啟動過程的學習,我們應該已經對kafka工作的原理有了了解, 核心是監聽zookeeper的相關節點,節點變化時觸發相應的操作 。
有新的broker加入集群時,稱為代理上線。反之,當broker關閉,推出集群時,稱為代理下線。
代理上線:
1、新代理啟動時向/brokers/ids寫數據
2、BrokerChangeListener監聽到變化。對新上線節點調用controllerChannelManager.addBroker(),完成新上線代理網路層初始化
3、調用KafkaController.onBrokerStartup()處理
3.5恢復因新代理上線暫停的刪除主題操作線程
代理下線:
1、查找下線節點集合
2、輪詢下線節點,調用controllerChannelManager.removeBroker(),關閉每個下線節點網路連接。清空下線節點消息隊列,關閉下線節點request請求
3、輪詢下線節點,調用KafkaController.onBrokerFailure處理
4、向集群全部存活代理發送updateMetadataRequest請求
顧名思義,協調器負責協調工作。本節所講的協調器,是用來協調消費者工作分配的。簡單點說,就是消費者啟動後,到可以正常消費前,這個階段的初始化工作。消費者能夠正常運轉起來,全有賴於協調器。
主要的協調器有如下兩個:
1、消費者協調器(ConsumerCoordinator)
2、組協調器(GroupCoordinator)
kafka引入協調器有其歷史過程,原來consumer信息依賴於zookeeper存儲,當代理或消費者發生變化時,引發消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通信,容易造成羊群效應和腦裂問題。
為了解決這些問題,kafka引入了協調器。服務端引入組協調器(GroupCoordinator),消費者端引入消費者協調器(ConsumerCoordinator)。每個broker啟動的時候,都會創建GroupCoordinator實例,管理部分消費組(集群負載均衡)和組下每個消費者消費的偏移量(offset)。每個consumer實例化時,同時實例化一個ConsumerCoordinator對象,負責同一個消費組下各個消費者和服務端組協調器之前的通信。如下圖:
消費者協調器,可以看作是消費者做操作的代理類(其實並不是),消費者很多操作通過消費者協調器進行處理。
消費者協調器主要負責如下工作:
1、更新消費者緩存的MetaData
2、向組協調器申請加入組
3、消費者加入組後的相應處理
4、請求離開消費組
5、向組協調器提交偏移量
6、通過心跳,保持組協調器的連接感知。
7、被組協調器選為leader的消費者的協調器,負責消費者分區分配。分配結果發送給組協調器。
8、非leader的消費者,通過消費者協調器和組協調器同步分配結果。
消費者協調器主要依賴的組件和說明見下圖:
可以看到這些組件和消費者協調器擔負的工作是可以對照上的。
組協調器負責處理消費者協調器發過來的各種請求。它主要提供如下功能:
組協調器在broker啟動的時候實例化,每個組協調器負責一部分消費組的管理。它主要依賴的組件見下圖:
這些組件也是和組協調器的功能能夠對應上的。具體內容不在詳述。
下圖展示了消費者啟動選取leader、入組的過程。
消費者入組的過程,很好的展示了消費者協調器和組協調器之間是如何配合工作的。leader consumer會承擔分區分配的工作,這樣kafka集群的壓力會小很多。同組的consumer通過組協調器保持同步。消費者和分區的對應關系持久化在kafka內部主題。
消費者消費時,會在本地維護消費到的位置(offset),就是偏移量,這樣下次消費才知道從哪裡開始消費。如果整個環境沒有變化,這樣做就足夠了。但一旦消費者平衡操作或者分區變化後,消費者不再對應原來的分區,而每個消費者的offset也沒有同步到伺服器,這樣就無法接著前任的工作繼續進行了。
因此只有把消費偏移量定期發送到伺服器,由GroupCoordinator集中式管理,分區重分配後,各個消費者從GroupCoordinator讀取自己對應分區的offset,在新的分區上繼續前任的工作。
下圖展示了不提交offset到服務端的問題:
開始時,consumer 0消費partition 0 和1,後來由於新的consumer 2入組,分區重新進行了分配。consumer 0不再消費partition2,而由consumer 2來消費partition 2,但由於consumer之間是不能通訊的,所有consumer2並不知道從哪裡開始自己的消費。
因此consumer需要定期提交自己消費的offset到服務端,這樣在重分區操作後,每個consumer都能在服務端查到分配給自己的partition所消費到的offset,繼續消費。
由於kafka有高可用和橫向擴展的特性,當有新的分區出現或者新的消費入組後,需要重新分配消費者對應的分區,所以如果偏移量提交的有問題,會重復消費或者丟消息。偏移量提交的時機和方式要格外注意!!
1、自動提交偏移量
設置 enable.auto.commit為true,設定好周期,默認5s。消費者每次調用輪詢消息的poll() 方法時,會檢查是否超過了5s沒有提交偏移量,如果是,提交上一次輪詢返回的偏移量。
這樣做很方便,但是會帶來重復消費的問題。假如最近一次偏移量提交3s後,觸發了再均衡,伺服器端存儲的還是上次提交的偏移量,那麼再均衡結束後,新的消費者會從最後一次提交的偏移量開始拉取消息,此3s內消費的消息會被重復消費。
2、手動提交偏移量
設置 enable.auto.commit為false。程序中手動調用commitSync()提交偏移量,此時提交的是poll方法返回的最新的偏移量。
commitSync()是同步提交偏移量,主程序會一直阻塞,偏移量提交成功後才往下運行。這樣會限製程序的吞吐量。如果降低提交頻次,又很容易發生重復消費。
這里我們可以使用commitAsync()非同步提交偏移量。只管提交,而不會等待broker返回提交結果
commitSync只要沒有發生不可恢復錯誤,會進行重試,直到成功。而commitAsync不會進行重試,失敗就是失敗了。commitAsync不重試,是因為重試提交時,可能已經有其它更大偏移量已經提交成功了,如果此時重試提交成功,那麼更小的偏移量會覆蓋大的偏移量。那麼如果此時發生再均衡,新的消費者將會重復消費消息。
❻ python 消費kafka 寫入es 小記
# -*- coding: utf8 -*-
# __author__ = '小紅帽'
# Date: 2020-05-11
"""Naval Fate.
Usage:
py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> --groupId=<groupId> --topic=<topic_name> --es-servers=<host:port> --index=<schema> --type=<doc> --id=<order_id>
py_kafka_protobuf_consume.py -h | --help
py_kafka_protobuf_consume.py --version
Options:
-h --help 列印幫助信息.
--bootstrap_servers=<host:port,host2:port2..> kafka servers
--groupId=<groupId> kafka消費組
--topic=<topic_name> topic名稱
--es-servers=<host:port> ES 地址
--index=<index_name> ES 索引
--type=<doc> ES type
--id=<order_id> 指定id主鍵,快速更新
"""
import json
from kafka import KafkaConsumer
from docopt import docopt
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class Kafka_consumer():
def __init__(self,args):
self.topic = args['--topic']
self.bootstrapServers = args['--bootstrap-servers']
self.groupId = args['--groupId']
self.id = args['--id']
self.es_host = args['--es-servers'].split(':')[0]
self.es_port = args['--es-servers'].split(':')[1]
self.es_index = args['--index']
self.es_type = args['--type']
self.consumer = KafkaConsumer(
bootstrap_servers=self.bootstrapServers,
group_id=self.groupId,
enable_auto_commit = True,
auto_commit_interval_ms=5000,
consumer_timeout_ms=5000
)
def consume_data_es(self):
while True:
try:
es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)
self.consumer.subscribe([self.topic])
actions=[]
for message in self.consumer:
if message is not None:
query = json.loads(message.value)['data'][0]
action = {
"_index": self.es_index,
"_type": self.es_type,
"_id": json.loads(message.value)['data'][0][self.id],
"_source": query
}
actions.append(action)
if len(actions) > 50:
helpers.bulk(client=es, actions=actions)
print("插入es %s 條數據" % len(actions))
actions = []
if len(actions) > 0:
helpers.bulk(client=es, actions=actions)
print("等待超時時間,插入es %s 條數據" % len(actions))
actions=[]
except BaseException as e:
print(e)
if __name__ == '__main__':
arguments = docopt(__doc__,version='sbin 1.0')
consumer = Kafka_consumer(arguments)
consumer.consume_data_es()
❼ 服務端技術實戰系列——Kafka篇
一.概念&原理
[if !supportLists]1. [endif]主題(topic):主題是對消息的分類。
[if !supportLists]2. [endif]消息(message):消息是kafka通信的基本單位。
[if !supportLists]3. [endif]分區(partition): 一組 消息對應 一個 主題, 一個 主題對應 一個或多個 分區。每個分區為一系列有序消息組成的 有序隊列 ;每個分區在物理上對應一個文件夾。
[if !supportLists]4. [endif]副本(replica):每個分區有 一個或多個 副本,分區的副本分布在集群的 不同 代理(機器)上,以提高可用性;分區的副本與日誌對象是一一對應的。
[if !supportLists]5. [endif]Kafka只保證一個 分區內 的消息 有序性 ,不保證跨分區消息的有序性。消息被追加到相應分區中, 順序寫入磁碟 ,效率非常高。
[if !supportLists]6. [endif]Kafka選取某個某個分區的 一個 副本作為leader副本,該分區的 其他 副本為follower副本。 只有leader副本負責處理客戶端讀/寫請求 ,follower副本從leader副本同步數據。
[if !supportLists]7. [endif]任何發布到分區的消息都會追加到日誌文件的尾部, 每條消息 在日誌文件中的 位置 都對應一個 按序遞增的偏移量 ;偏移量在一個分區下嚴格有序。
[if !supportLists]8. [endif]Kafka不允許對消息進行隨機讀寫。
[if !supportLists]9. [endif]新版消費者將 消費偏移量 保存到kafka內部的一個主題中。
[if !supportLists]10. [endif]Kafka集群由 一個或多個代理 (Broker,也稱為kafka實例)構成。可以在 一台 伺服器上配置 一個或多個代理 ,每個代理具有唯一標識broker.id。
[if !supportLists]11. [endif]生產者將消息 發送給代理 (Broker)。
[if !supportLists]12. [endif]消費者以 拉取 (pull)方式拉取數據,每個消費者都屬於一個消費組。
[if !supportLists]13. [endif]同一個主題的一條消息只能被 同一個消費組 下的某一個消費者消費,但 不同消費組 的消費者可以 同時 消費該消息。
[if !supportLists]14. [endif]消息 廣播 :指定各消費者屬於不同消費組;消息 單播 :指定各消費者屬於同一個消費組。
[if !supportLists]15. [endif]Kafka啟動時在Zookeeper上創建相應節點來保存 元數據 ,元數據包括:代理節點信息、集群信息、主題信息、分區狀態信息、分區副本分配方案、動態配置等;
[if !supportLists]16. [endif]Kafka通過 監聽 機制在節點注冊監聽器來監聽節點元數據變化;
[if !supportLists]17. [endif]Kafka將數據寫入 磁碟 ,以文件系統來存數據;
[if !supportLists]18. [endif]生產環境一般將zookeeper集群和kafka集群 分機架 部署;
[if !supportLists]二.[endif] Kafka Procer
配置:
/**
* xTestProxy——KafkaConfigConstant
*
* @author ZhangChi
* @date 2018年6月20日---下午5:50:44
* @version 1.0
*/
public class KafkaConfigConstant {
public static final String KAFKA_CLUSTER = "fa-common1.hangzhou-1.kafka.internal.lede.com:9200,fa-common2.hangzhou-1.kafka.internal.lede.com:9200,fa-common3.hangzhou-1.kafka.internal.lede.com:9200";
}
生產者配置:
/**
* xTestProxy——HttpKafkaProcerFactory
*
* @author ZhangChi
* @date 2018年6月11日---下午2:37:51
* @version 1.0
*/
public class HttpKafkaProcerFactory {
// 真正的KafkaProcer僅有一份
private static KafkaProcer kafkaProcer = null ;
private static Properties property ;
public static KafkaProcer getKafkaProcer() {
if ( kafkaProcer == null ) {
synchronized (HttpKafkaProcerFactory. class ) {
if ( kafkaProcer == null ) {
property = buildKafkaProperty ();
kafkaProcer = new KafkaProcer( property );
}
}
}
return kafkaProcer ;
}
public static Properties buildKafkaProperty() {
Properties props = new Properties();
props.put(ProcerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );
props.put(ProcerConfig. ACKS_CONFIG , "all");
props.put(ProcerConfig. RETRIES_CONFIG , 0);
props.put(ProcerConfig. BATCH_SIZE_CONFIG , 16384);
props.put(ProcerConfig. BUFFER_MEMORY_CONFIG , 33554432);
props.put(ProcerConfig. LINGER_MS_CONFIG , 1);
props.put(ProcerConfig. KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProcerConfig. VALUE_SERIALIZER_CLASS_CONFIG ,
"org.apache.kafka.common.serialization.StringSerializer");
return props;
}
}
生產者線程組:
/**
* xTestProxy——HttpKafkaProcerThread
* 多線程每次new一個實例
*
* @author ZhangChi
* @date 2018年6月25日---下午2:09:39
* @version 1.0
*/
public class HttpKafkaProcerThread implements Runnable {
private static Logger logger = LoggerFactory. getLogger ("HttpKafkaProcerThread");
private final String KAFKA_TOPIC = KafkaConstant. HTTP_REQ_RESP_TOPIC ;
private String kafkaMessageJson;
private KafkaProcer procer;
public String messageType;
public String originalMessage;
private static KafkaMessage kafkaMessage = new KafkaMessage();
public HttpKafkaProcerThread(KafkaProcer procer, String messageType, String originalMessage) {
this .procer = procer;
this .messageType = messageType;
this .originalMessage = originalMessage;
}
@Override
public void run() {
// TODO Auto-generated method stub
/* 1.構建kafka消息*/
kafkaMessageJson = generateKafkaMessage( this .messageType, this .originalMessage);
/* 2.發送kafka消息*/
if (kafkaMessageJson != null && !StringUtils. isEmpty (kafkaMessageJson)) {
logger .info("create message start:" + kafkaMessageJson);
procer.send( new ProcerRecord( this .KAFKA_TOPIC, kafkaMessageJson));
} else {
logger .info("kafkaMessageJson is null!");
}
}
private String generateKafkaMessage(String messageType, String originalMessage) {
if (StringUtils. isBlank (messageType) || StringUtils. isBlank (originalMessage)) {
return null ;
}
kafkaMessage .setMessageId(KafkaMessageUtils. generateId ());
kafkaMessage .setMessageTime(KafkaMessageUtils. generateTime ());
kafkaMessage .setMessageType(messageType);
kafkaMessage .setMessage(originalMessage);
String kafkaMessageToJson = null ;
try {
kafkaMessageToJson = KafkaMessageUtils. objectToJson ( kafkaMessage );
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaMessageJson = kafkaMessageToJson;
return kafkaMessageToJson;
}
}
[if !supportLists]三.[endif] Kafka Consumer
消費者配置:
private static Properties buildKafkaProperty() {
Properties properties = new Properties();
// 測試環境kafka的埠號是9200
properties.put(ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );
// 消費組名稱
properties.put(ConsumerConfig. GROUP_ID_CONFIG , KafkaConfigConstant. GROUP_ID );
properties.put(ConsumerConfig. CLIENT_ID_CONFIG , "test");
// 從頭消費
properties.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG , "earliest");
// 自動提交偏移量
properties.put(ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG , "true");
// 時間間隔1s
properties.put(ConsumerConfig. AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");
properties.put(ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG ,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig. VALUE_DESERIALIZER_CLASS_CONFIG ,
"org.apache.kafka.common.serialization.StringDeserializer");
return properties;
}
消費者線程組:
/**
* AnalysisEngine——HttpKafkaConsumerGroup
*
* @author ZhangChi
* @date 2018年6月11日---下午6:20:47
* @version 1.0
*/
@Service("httpKafkaConsumerGroup")
public class HttpKafkaConsumerGroup {
@Autowired
private RequestAnalyzer requestAnalyzer;
@Autowired
private EsDocumentServiceImpl esDocumentServiceImpl;
@Autowired
private AnalysisEngineClient analysisEngineClient;
@Autowired
private MongoTemplate mongoTemplate;
private List httpKafkaConsumerList = new ArrayList();
public void initHttpKafkaConsumerGroup( int consumerNumber, RunModeEnum mode) {
for ( int i = 0; i < consumerNumber; i++) {
/**
* 將注入的服務當做構造參數,這樣保證每個子線程都能拿到服務實例而不是空指針!
*/
HttpKafkaConsumer consumerThread = new HttpKafkaConsumer(requestAnalyzer, esDocumentServiceImpl, mode, analysisEngineClient, mongoTemplate);
httpKafkaConsumerList.add(consumerThread);
}
}
public void consumeGroupStart() {
for (HttpKafkaConsumer item : httpKafkaConsumerList) {
LogConstant. runLog .info("httpKafkaConsumerList size : " + httpKafkaConsumerList.size());
Thread consumerThread = new Thread(item);
consumerThread.start();
}
}
}
先逐個初始化消費者實例,然後將這些消費者加入到消費組列表中。消費組啟動後,會循環產生消費者線程。
❽ kafka入門:一個開源的、輕量級、高吞吐、高可用的分布式消息系統
隨著信息技術的快速發展及互聯網用戶規模的急劇增長,計算機所存儲的信息量正呈爆炸式增長,目前數據量已進入大規模和超大規模的海量數據時代, 如何高效地存儲、分析、處理和挖掘海量數據 已成為技術研究領域的熱點和難點問題。而 如何採集和運營管理、分析這些數據 也是大數據處理中一個至關重要的組成環節,這就需要相應的基礎設施對其提供支持。針對這個需求,當前業界已有很多開源的消息系統應運而生,kafka就是一款當然非常流行的消息系統。
Kafka是一款開源的、輕量級的、分布式、可分區和具有復制備份的(Replicated)、基於ZooKeeper協調管理的分布式流平台的功能強大的消息系統。作為一個流式處理平台,必須具備以下3個關鍵特性:
1) 能夠允許發布和訂閱流數據。
2) 存儲流數據時提供相應的容錯機制。
3) 當流數據到達時能夠被及時處理。
消息流系統kafka的基本結構包括生產者和消費者,以及kafka集群。
生產者負責生產消息,將消息寫入Kafka集群;消費者從Kafka集群中拉取消息。
消息是Kafka通信的基本單位 ,由一個 固定長度的消息頭 和一個 可變長度的消息體 構成。
Kafka將 一組消息 抽象歸納為一個主題(Topic),也就是說,一個主題是對消息的一個分類。 生產者將消息指定主題發送到kafka集群,消費者訂閱主題或主題的某些分區進行消費。
Kafka將一組消息歸納為一個主題,而 每個主題又被分成一個或多個分區(Partition) 。每個分區由一系列有序、不可變的消息組成,是一個有序隊列。 每個分區在物理上對應為一個文件夾 ,分區的命名規則為主題名稱後接「—」連接符,之後再接分區編號,分區編號從0開始,編號最大值為分區的總數減1。
分區使得Kafka在並發處理上變得更加容易,理論上來說,分區數越多吞吐量越高,但這要根據集群實際環境及業務場景而定。同時,分區也是Kafka保證消息被順序消費以及對消息進行負載均衡的基礎。
疑問和答案 :分區如何保證消息被順序消費?每個分區內的消息是有序的,但不同分區間如何保證?猜測是分區從存儲空間上比較大,分區個數少。順序消費的主要因素在分區內的消息,分區間的可以忽略。高吞吐率順序寫磁碟估計也是這個原因。
Kafka只能保證一個分區之內消息的有序性,並不能保證跨分區消息的有序性。 每條消息被追加到相應的分區中,是順序寫磁碟,因此效率非常高,這是Kafka高吞吐率的一個重要保證 。同時與傳統消息系統不同的是,Kafka並不會立即刪除已被消費的消息,由於磁碟的限制消息也不會一直被存儲,因此 Kafka提供兩種刪除老數據的策略 ,一是基於消息已存儲的時間長度,二是基於分區的大小。這兩種策略都能通過配置文件進行配置。
每個分區又有一至多個副本(Replica),分區的副本分布在集群的不同代理上,以提高可用性。
從存儲角度上分析,分區的每個副本在邏輯上抽象為一個日誌(Log)對象,即分區的副本與日誌對象是一一對應的。每個主題對應的 分區數 可以在Kafka啟動時所載入的配置文件中配置,也可以在創建主題時指定。當然,客戶端還可以在主題創建後修改主題的分區數。
為什麼副本要分Leader和Follower? 如果沒有Leader副本,就需要所有的副本都同時負責讀/寫請求處理,同時還得保證這些副本之間數據的一致性,假設有n個副本則需要有n×n條通路來同步數據,這樣數據的一致性和有序性就很難保證。
為解決這個問題,Kafka選擇分區的一個副本為Leader,該分區其他副本為Follower,只有 Leader副本 才負責處理客戶端 讀/寫請求 ,Follower副本從Leader副本同步數據。
引入Leader副本後客戶端只需與Leader副本進行交互,這樣數據一致性及順序性就有了保證。Follower副本從Leader副本同步消息,對於n個副本只需n-1條通路即可,這樣就使得系統更加簡單而高效。
副本Follower與Leader的角色並不是固定不變的,如果Leader失效,通過相應的選舉演算法將從其他Follower副本中選出新的Leader副本。
疑問 :leader副本和follower副本是如何選出來的?通過zookeeper選舉的嘛?
Kafka在ZooKeeper中動態維護了一個 ISR(In-sync Replica) ,即保存同步的副本列表,該列表中保存的是與Leader副本保持消息同步的所有副本對應的代理節點id。 如果一個Follower副本宕機或是落後太多 ,則該Follower副本節點將 從ISR列表中移除 。 本書用宕機 來特指某個代理失效的情景,包括但不限於代理被關閉,如代理被人為關閉或是發生物理故障、心跳檢測過期、網路延遲、進程崩潰等。
任何發布到分區的消息會被直接追加到日誌文件的尾部(分區目錄下以「.log」為文件名後綴的數據文件),而每條 消息 在日誌文件中的位置都會對應一個按序遞增的 偏移量 。偏移量是一個分區下嚴格有序的 邏輯值 ,它並不表示消息在磁碟上的物理位置。由於Kafka幾乎不允許對消息進行隨機讀寫,因此Kafka並沒有提供額外索引機制到存儲偏移量。
消費者可以通過控制消息偏移量來對消息進行消費 ,如消費者可以指定消費的起始偏移量。 為了保證消息被順序消費,消費者已消費的消息對應的偏移量也需要保存 。需要說明的是,消費者對消息偏移量的操作並不會影響消息本身的偏移量。舊版消費者將消費偏移量保存到ZooKeeper當中, 而新版消費者是將消費偏移量保存到Kafka內部一個主題當中。 當然,消費者也可以自己在外部系統保存消費偏移量,而無需保存到Kafka中。
推測 :一個主題有多個分區,一個分區有多個副本。一個主題(一類消息)有多個分區(消息被分段),一個分區(每段消息)有多個副本(每段消息的副本數)。消息一旦發給kafka,就會分配一個偏移量,在多個副本中的偏移量是一樣的。這樣的話,消費者通過偏移量消費時對於多個副本就沒有差異性。
Kafka集群由一個或多個Kafka實例構成,每一個Kafka實例稱為代理(Broker),通常也稱代理為Kafka伺服器(KafkaServer)。在生產環境中Kafka集群一般包括一台或多台伺服器,我們可以在一台伺服器上配置一個或多個代理。 每一個代理都有唯一的標識id,這個id是一個非負整數 。在一個Kafka集群中,每增加一個代理就需要為這個代理配置一個與該集群中其他代理不同的id, id值可以選擇任意非負整數即可,只要保證它在整個Kafka集群中唯一,這個id就是代理的名字,也就是在啟動代理時配置的broker.id對應的值。
生產者(Procer)負責將消息發送給代理,也就是向Kafka代理發送消息的客戶端。
消費者(Comsumer)以拉取(pull)方式拉取數據,它是消費的客戶端。在Kafka中 每一個消費者都屬於一個特定消費組 (ConsumerGroup),可以為每個消費者指定一個消費組,以groupId代表消費組名稱,通過group.id配置設置。 如果不指定消費組 ,則該消費者屬於默認消費組test-consumer-group。
每個消費者有一個全局唯一的id ,通過配置項client.id指定, 如果客戶端沒有指定消費者的id, Kafka會自動為該消費者生成一個全局唯一的id,格式為${groupId}-${hostName}-${timestamp}-${UUID前8位字元}。 同一個主題的一條消息只能被同一個消費組下某一個消費者消費 ,但不同消費組的消費者可同時消費該消息。 消費組是Kafka用來實現對一個主題消息進行廣播和單播的手段 ,實現消息廣播只需指定各消費者均屬於不同的消費組,消息單播則只需讓各消費者屬於同一個消費組。
推論: kafka消息是按照消息類型(主題),在一個消費者組中只能消費一次。也就是一個消費者組只消費一類型的消息。如果某個服務要消費一類消息,必須將自己置為不同的消費者組。
Kafka利用ZooKeeper保存相應元數據信息, Kafka元數據信息包括如代理節點信息、Kafka集群信息、舊版消費者信息及其消費偏移量信息、主題信息、分區狀態信息、分區副本分配方案信息、動態配置信息等。 Kafka在啟動或運行過程當中會在ZooKeeper上創建相應節點 來保存元數據信息, Kafka通過監聽機制在這些節點注冊相應監聽器來監聽節點元數據的變化 ,從而由ZooKeeper負責管理維護Kafka集群,同時通過ZooKeeper我們能夠很方便地對Kafka集群進行水平擴展及數據遷移。
❾ kafka原理分析
作為一款典型的消息中間件產品,kafka系統仍然由procer、broker、consumer三部分組成。kafka涉及的幾個常用概念和組件簡單介紹如下:
當consumer group的狀態發生變化(如有consumer故障、增減consumer成員等)或consumer group消費的topic狀態發生變化(如增加了partition,消費的topic發生變化),kafka集群會自動調整和重新分配consumer消費的partition,這個過程就叫做rebalance(再平衡)。
__consumer_offsets是kafka集群自己維護的一個特殊的topic,它裡面存儲的是每個consumer group已經消費了每個topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id組成,格式為 {topic name}-${partition id},value值就是consumer提交的已消費的topic partition offset值。__consumer_offsets的分區數和副本數分別由offsets.topic.num.partitions(默認值為50)和offsets.topic.replication.factor(默認值為1)參數配置。我們通過公式 hash(group id) % offsets.topic.num.partitions 就可以計算出指定consumer group的已提交offset存儲的partition。由於consumer group提交的offset消息只有最後一條消息有意義,所以__consumer_offsets是一個compact topic,kafka集群會周期性的對__consumer_offsets執行compact操作,只保留最新的一次提交offset。
group coordinator運行在kafka某個broker上,負責consumer group內所有的consumer成員管理、所有的消費的topic的partition的消費關系分配、offset管理、觸發rebalance等功能。group coordinator管理partition分配時,會指定consumer group內某個consumer作為group leader執行具體的partition分配任務。存儲某個consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是該consumer group的協調器運行的broker。
跟大多數分布式系統一樣,集群有一個master角色管理整個集群,協調集群中各個成員的行為。kafka集群中的controller就相當於其它分布式系統的master,用來負責集群topic的分區分配,分區leader選舉以及維護集群的所有partition的ISR等集群協調功能。集群中哪個borker是controller也是通過一致性協議選舉產生的,2.8版本之前通過zookeeper進行選主,2.8版本後通過kafka raft協議進行選舉。如果controller崩潰,集群會重新選舉一個broker作為新的controller,並增加controller epoch值(相當於zookeeper ZAB協議的epoch,raft協議的term值)
當kafka集群新建了topic或為一個topic新增了partition,controller需要為這些新增加的partition分配到具體的broker上,並把分配結果記錄下來,供procer和consumer查詢獲取。
因為只有partition的leader副本才會處理procer和consumer的讀寫請求,而partition的其他follower副本需要從相應的leader副本同步消息,為了盡量保證集群中所有broker的負載是均衡的,controller在進行集群全局partition副本分配時需要使partition的分布情況是如下這樣的:
在默認情況下,kafka採用輪詢(round-robin)的方式分配partition副本。由於partition leader副本承擔的流量比follower副本大,kafka會先分配所有topic的partition leader副本,使所有partition leader副本全局盡量平衡,然後再分配各個partition的follower副本。partition第一個follower副本的位置是相應leader副本的下一個可用broker,後面的副本位置依此類推。
舉例來說,假設我們有兩個topic,每個topic有兩個partition,每個partition有兩個副本,這些副本分別標記為1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(編碼格式為topic-partition-replia,編號均從1開始,第一個replica是leader replica,其他的是follower replica)。共有四個broker,編號是1-4。我們先對broker按broker id進行排序,然後分配leader副本,最後分配foller副本。
1)沒有配置broker.rack的情況
現將副本1-1-1分配到broker 1,然後1-2-1分配到broker 2,依此類推,2-2-1會分配到broker 4。partition 1-1的leader副本分配在broker 1上,那麼下一個可用節點是broker 2,所以將副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那麼下一個可用節點是broker 3,所以將副本1-1-2分配到broker 3上。依此類推分配其他的副本分片。最後分配的結果如下圖所示:
2)配置了broker.rack的情況
假設配置了兩個rack,broker 1和broker 2屬於Rack 1,broker 3和broker 4屬於Rack 2。我們對rack和rack內的broker分別排序。然後先將副本1-1-1分配到Rack 1的broker 1,然後將副本1-2-1分配到下一個Rack的第一個broker,即Rack 2的broker 3。其他的parttition leader副本依此類推。然後分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一個可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此類推。最後分配的結果如下圖所示:
kafka除了按照集群情況自動分配副本,也提供了reassign工具人工分配和遷移副本到指定broker,這樣用戶可以根據集群實際的狀態和各partition的流量情況分配副本
kafka集群controller的一項功能是在partition的副本中選擇一個副本作為leader副本。在topic的partition創建時,controller首先分配的副本就是leader副本,這個副本又叫做preference leader副本。
當leader副本所在broker失效時(宕機或網路分區等),controller需要為在該broker上的有leader副本的所有partition重新選擇一個leader,選擇方法就是在該partition的ISR中選擇第一個副本作為新的leader副本。但是,如果ISR成員只有一個,就是失效的leader自身,其餘的副本都落後於leader怎麼辦?kafka提供了一個unclean.leader.election配置參數,它的默認值為true。當unclean.leader.election值為true時,controller還是會在非ISR副本中選擇一個作為leader,但是這時候使用者需要承擔數據丟失和數據不一致的風險。當unclean.leader.election值為false時,則不會選擇新的leader,該partition處於不可用狀態,只能恢復失效的leader使partition重新變為可用。
當preference leader失效後,controller重新選擇一個新的leader,但是preference leader又恢復了,而且同步上了新的leader,是ISR的成員,這時候preference leader仍然會成為實際的leader,原先的新leader變為follower。因為在partition leader初始分配時,使按照集群副本均衡規則進行分配的,這樣做可以讓集群盡量保持平衡。
為了保證topic的高可用,topic的partition往往有多個副本,所有的follower副本像普通的consumer一樣不斷地從相應的leader副本pull消息。每個partition的leader副本會維護一個ISR列表存儲到集群信息庫里,follower副本成為ISR成員或者說與leader是同步的,需要滿足以下條件:
1)follower副本處於活躍狀態,與zookeeper(2.8之前版本)或kafka raft master之間的心跳正常
2)follower副本最近replica.lag.time.max.ms(默認是10秒)時間內從leader同步過最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms時間內拉取過消息,但不是最新的,比如落後follower在追趕leader過程中,也不會成為ISR。
follower在同步leader過程中,follower和leader都會維護幾個參數,來表示他們之間的同步情況。leader和follower都會為自己的消息隊列維護LEO(Last End Offset)和HW(High Watermark)。leader還會為每一個follower維護一個LEO。LEO表示leader或follower隊列寫入的最後一條消息的offset。HW表示的offset對應的消息寫入了所有的ISR。當leader發現所有follower的LEO的最小值大於HW時,則會增加HW值到這個最小值LEO。follower拉取leader的消息時,同時能獲取到leader維護的HW值,如果follower發現自己維護的HW值小於leader發送過來的HW值,也會增加本地的HW值到leader的HW值。這樣我們可以得到一個不等式: follower HW <= leader HW <= follower LEO <= leader LEO 。HW對應的log又叫做committed log,consumer消費partititon的消息時,只能消費到offset值小於或等於HW值的消息的,由於這個原因,kafka系統又稱為分布式committed log消息系統。
kafka的消息內容存儲在log.dirs參數配置的目錄下。kafka每個partition的數據存放在本地磁碟log.dirs目錄下的一個單獨的目錄下,目錄命名規范為 ${topicName}-${partitionId} ,每個partition由多個LogSegment組成,每個LogSegment由一個數據文件(命名規范為: {baseOffset}.index)和一個時間戳索引文件(命名規范為:${baseOffset}.timeindex)組成,文件名的baseOffset就是相應LogSegment中第一條消息的offset。.index文件存儲的是消息的offset到該消息在相應.log文件中的偏移,便於快速在.log文件中快速找到指定offset的消息。.index是一個稀疏索引,每隔一定間隔大小的offset才會建立相應的索引(比如每間隔10條消息建立一個索引)。.timeindex也是一個稀疏索引文件,這樣可以根據消息的時間找到對應的消息。
可以考慮將消息日誌存放到多個磁碟中,這樣多個磁碟可以並發訪問,增加消息讀寫的吞吐量。這種情況下,log.dirs配置的是一個目錄列表,kafka會根據每個目錄下partition的數量,將新分配的partition放到partition數最少的目錄下。如果我們新增了一個磁碟,你會發現新分配的partition都出現在新增的磁碟上。
kafka提供了兩個參數log.segment.bytes和log.segment.ms來控制LogSegment文件的大小。log.segment.bytes默認值是1GB,當LogSegment大小達到log.segment.bytes規定的閾值時,kafka會關閉當前LogSegment,生成一個新的LogSegment供消息寫入,當前供消息寫入的LogSegment稱為活躍(Active)LogSegment。log.segment.ms表示最大多長時間會生成一個新的LogSegment,log.segment.ms沒有默認值。當這兩個參數都配置了值,kafka看哪個閾值先達到,觸發生成新的LogSegment。
kafka還提供了log.retention.ms和log.retention.bytes兩個參數來控制消息的保留時間。當消息的時間超過了log.retention.ms配置的閾值(默認是168小時,也就是一周),則會被認為是過期的,會被kafka自動刪除。或者是partition的總的消息大小超過了log.retention.bytes配置的閾值時,最老的消息也會被kafka自動刪除,使相應partition保留的總消息大小維持在log.retention.bytes閾值以下。這個地方需要注意的是,kafka並不是以消息為粒度進行刪除的,而是以LogSegment為粒度刪除的。也就是說,只有當一個LogSegment的最後一條消息的時間超過log.retention.ms閾值時,該LogSegment才會被刪除。這兩個參數都配置了值時,也是只要有一個先達到閾值,就會執行相應的刪除策略
當我們使用KafkaProcer向kafka發送消息時非常簡單,只要構造一個包含消息key、value、接收topic信息的ProcerRecord對象就可以通過KafkaProcer的send()向kafka發送消息了,而且是線程安全的。KafkaProcer支持通過三種消息發送方式
KafkaProcer客戶端雖然使用簡單,但是一條消息從客戶端到topic partition的日誌文件,中間需要經歷許多的處理過程。KafkaProcer的內部結構如下所示:
從圖中可以看出,消息的發送涉及兩類線程,一類是調用KafkaProcer.send()方法的應用程序線程,因為KafkaProcer.send()是多線程安全的,所以這樣的線程可以有多個;另一類是與kafka集群通信,實際將消息發送給kafka集群的Sender線程,當我們創建一個KafkaProcer實例時,會創建一個Sender線程,通過該KafkaProcer實例發送的所有消息最終通過該Sender線程發送出去。RecordAccumulator則是一個消息隊列,是應用程序線程與Sender線程之間消息傳遞的橋梁。當我們調用KafkaProcer.send()方法時,消息並沒有直接發送出去,只是寫入了RecordAccumulator中相應的隊列中,最終需要Sender線程在適當的時機將消息從RecordAccumulator隊列取出來發送給kafka集群。
消息的發送過程如下:
在使用KafkaConsumer實例消費kafka消息時,有一個特性我們要特別注意,就是KafkaConsumer不是多線程安全的,KafkaConsumer方法都在調用KafkaConsumer的應用程序線程中運行(除了consumer向kafka集群發送的心跳,心跳在一個專門的單獨線程中發送),所以我們調用KafkaConsumer的所有方法均需要保證在同一個線程中調用,除了KafkaConsumer.wakeup()方法,它設計用來通過其它線程向consumer線程發送信號,從而終止consumer執行。
跟procer一樣,consumer要與kafka集群通信,消費kafka消息,首先需要獲取消費的topic partition leader replica所在的broker地址等信息,這些信息可以通過向kafka集群任意broker發送Metadata請求消息獲取。
我們知道,一個consumer group有多個consumer,一個topic有多個partition,而且topic的partition在同一時刻只能被consumer group內的一個consumer消費,那麼consumer在消費partition消息前需要先確定消費topic的哪個partition。partition的分配通過group coordinator來實現。基本過程如下:
我們可以通過實現介面org.apache.kafka.clients.consumer.internals.PartitionAssignor自定義partition分配策略,但是kafka已經提供了三種分配策略可以直接使用。
partition分配完後,每個consumer知道了自己消費的topic partition,通過metadata請求可以獲取相應partition的leader副本所在的broker信息,然後就可以向broker poll消息了。但是consumer從哪個offset開始poll消息?所以consumer在第一次向broker發送FetchRequest poll消息之前需要向Group Coordinator發送OffsetFetchRequest獲取消費消息的起始位置。Group Coordinator會通過key {topic}-${partition}查詢 __consumer_offsets topic中是否有offset的有效記錄,如果存在,則將consumer所屬consumer group最近已提交的offset返回給consumer。如果沒有(可能是該partition是第一次分配給該consumer group消費,也可能是該partition長時間沒有被該consumer group消費),則根據consumer配置參數auto.offset.reset值確定consumer消費的其實offset。如果auto.offset.reset值為latest,表示從partition的末尾開始消費,如果值為earliest,則從partition的起始位置開始消費。當然,consumer也可以隨時通過KafkaConsumer.seek()方法人工設置消費的起始offset。
kafka broker在收到FetchRequest請求後,會使用請求中topic partition的offset查一個skiplist表(該表的節點key值是該partition每個LogSegment中第一條消息的offset值)確定消息所屬的LogSegment,然後繼續查LogSegment的稀疏索引表(存儲在.index文件中),確定offset對應的消息在LogSegment文件中的位置。為了提升消息消費的效率,consumer通過參數fetch.min.bytes和max.partition.fetch.bytes告訴broker每次拉取的消息總的最小值和每個partition的最大值(consumer一次會拉取多個partition的消息)。當kafka中消息較少時,為了讓broker及時將消息返回給consumer,consumer通過參數fetch.max.wait.ms告訴broker即使消息大小沒有達到fetch.min.bytes值,在收到請求後最多等待fetch.max.wait.ms時間後,也將當前消息返回給consumer。fetch.min.bytes默認值為1MB,待fetch.max.wait.ms默認值為500ms。
為了提升消息的傳輸效率,kafka採用零拷貝技術讓內核通過DMA把磁碟中的消息讀出來直接發送到網路上。因為kafka寫入消息時將消息寫入內存中就返回了,如果consumer跟上了procer的寫入速度,拉取消息時不需要讀磁碟,直接從內存獲取消息發送出去就可以了。
為了避免發生再平衡後,consumer重復拉取消息,consumer需要將已經消費完的消息的offset提交給group coordinator。這樣發生再平衡後,consumer可以從上次已提交offset出繼續拉取消息。
kafka提供了多種offset提交方式
partition offset提交和管理對kafka消息系統效率來說非常關鍵,它直接影響了再平衡後consumer是否會重復拉取消息以及重復拉取消息的數量。如果offset提交的比較頻繁,會增加consumer和kafka broker的消息處理負載,降低消息處理效率;如果offset提交的間隔比較大,再平衡後重復拉取的消息就會比較多。還有比較重要的一點是,kafka只是簡單的記錄每次提交的offset值,把最後一次提交的offset值作為最新的已提交offset值,作為再平衡後消息的起始offset,而什麼時候提交offset,每次提交的offset值具體是多少,kafka幾乎不關心(這個offset對應的消息應該存儲在kafka中,否則是無效的offset),所以應用程序可以先提交3000,然後提交2000,再平衡後從2000處開始消費,決定權完全在consumer這邊。
kafka中的topic partition與consumer group中的consumer的消費關系其實是一種配對關系,當配對雙方發生了變化時,kafka會進行再平衡,也就是重新確定這種配對關系,以提升系統效率、高可用性和伸縮性。當然,再平衡也會帶來一些負面效果,比如在再平衡期間,consumer不能消費kafka消息,相當於這段時間內系統是不可用的。再平衡後,往往會出現消息的重復拉取和消費的現象。
觸發再平衡的條件包括:
需要注意的是,kafka集群broker的增減或者topic partition leader重新選主這類集群狀態的變化並不會觸發在平衡
有兩種情況與日常應用開發比較關系比較密切:
consumer在調用subscribe()方法時,支持傳入一個ConsumerRebalanceListener監聽器,ConsumerRebalanceListener提供了兩個方法,onPartitionRevoked()方法在consumer停止消費之後,再平衡開始之前被執行。可以發現,這個地方是提交offset的好時機。onPartitonAssigned()方法則會在重新進行partition分配好了之後,但是新的consumer還未消費之前被執行。
我們在提到kafka時,首先想到的是它的吞吐量非常大,這也是很多人選擇kafka作為消息傳輸組件的重要原因。
以下是保證kafka吞吐量大的一些設計考慮:
但是kafka是不是總是這么快?我們同時需要看到kafka為了追求快舍棄了一些特性:
所以,kafka在消息獨立、允許少量消息丟失或重復、不關心消息順序的場景下可以保證非常高的吞吐量,但是在需要考慮消息事務、嚴格保證消息順序等場景下procer和consumer端需要進行復雜的考慮和處理,可能會比較大的降低kafka的吞吐量,例如對可靠性和保序要求比較高的控制類消息需要非常謹慎的權衡是否適合使用kafka。
我們通過procer向kafka集群發送消息,總是期望消息能被consumer成功消費到。最不能忍的是procer收到了kafka集群消息寫入的正常響應,但是consumer仍然沒有消費到消息。
kafka提供了一些機制來保證消息的可靠傳遞,但是有一些因素需要仔細權衡考慮,這些因素往往會影響kafka的吞吐量,需要在可靠性與吞吐量之間求得平衡:
kafka只保證partition消息順序,不保證topic級別的順序,而且保證的是partition寫入順序與讀取順序一致,不是業務端到端的保序。
如果對保序要求比較高,topic需要只設置一個partition。這時可以把參數max.in.flight.requests.per.connection設置為1,而retries設置為大於1的數。這樣即使發生了可恢復型錯誤,仍然能保證消息順序,但是如果發生不可恢復錯誤,應用層進行重試的話,就無法保序了。也可以採用同步發送的方式,但是這樣也極大的降低了吞吐量。如果消息攜帶了表示順序的欄位,可以在接收端對消息進行重新排序以保證最終的有序。
❿ 如何在kafka-python和confluent-kafka之間做出選擇
kafka-python:蠻荒的西部
kafka-python是最受歡迎的Kafka Python客戶端。我們過去使用時從未出現過任何問題,在我的《敏捷數據科學2.0》一書中我也用過它。然而在最近這個項目中,它卻出現了一個嚴重的問題。我們發現,當以文檔化的方式使用KafkaConsumer、Consumer迭代式地從消息隊列中獲取消息時,最終到達主題topic的由Consumer攜帶的消息通常會丟失。我們通過控制台Consumer的分析驗證了這一點。
需要更詳細說明的是,kafka-python和KafkaConsumer是與一個由SSL保護的Kafka服務(如Aiven Kafka)一同使用的,如下面這樣:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)
for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...
當以這樣的推薦方式使用時,KafkaConsumer會丟失消息。但有一個變通方案,就是保留所有消息。這個方案是Kafka服務提供商Aiven support提供給我們的。它看起來像這樣:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...
雖然這個變通方案可能有用,但README中的方法會丟棄消息使我對其失去興趣。所以我找到了一個替代方案。
confluent-kafka:企業支持
發現coufluent-kafka Python模塊時,我感到無比驚喜。它既能做librdkafka的外封裝,又非常小巧。librdkafka是一個用C語言寫的kafka庫,它是Go和.NET的基礎。更重要的是,它由Confluent公司支持。我愛開源,但是當「由非正式社區擁有或支持」這種方式效果不行的時候,或許該考慮給替代方案印上公章、即該由某個公司擁有或支持了。不過,我們並未購買商業支持。我們知道有人會維護這個庫的軟體質量,而且可以選擇買或不買商業支持,這一點真是太棒了。
用confluent-kafka替換kafka-python非常簡單。confluent-kafka使用poll方法,它類似於上面提到的訪問kafka-python的變通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())
kafka_consumer.close()
現在我們能收到所有消息了。我並不是說kafka-python工具不好,我相信社區會對它的問題做出反應並解決。但從現在開始,我會一直堅持使用confluent-kafka。
開源治理
開源是強大的,但是涉及到復雜的「大數據」和NoSQL工具時,通常需要有一家大公司在背後推動工具的開發。這樣你就知道,如果那個公司可以使用工具,那麼該工具應該擁有很好的基本功能。它的出現可能是非正式的,就像某公司發布類似FOSS的項目一樣,但也可能是正式的,就像某公司為工具提供商業支持一樣。當然,從另一個角度來看,如果一家與開源社區作對的公司負責開發某個工具,你便失去了控制權。你的意見可能無關緊要,除非你是付費客戶。
理想情況是採取開源治理,就像Apache基金會一樣,還有就是增加可用的商業支持選項。這對互聯網上大部分的免費軟體來說根本不可能。限制自己只使用那些公司蓋章批准後的工具將非常限制你的自由。這對於一些商店可能是正確選擇,但對於我們不是。我喜歡工具測試,如果工具很小,而且只專心做一件事,我就會使用它。
信任開源
對於更大型的工具,以上決策評估過程更為復雜。通常,我會看一下提交問題和貢獻者的數量,以及最後一次commit的日期。我可能會問朋友某個工具的情況,有時也會在推特上問。當你進行嗅探檢查後從Github選擇了一個項目,即說明你信任社區可以產出好的工具。對於大多數工具來說,這是沒問題的。
但信任社區可能存在問題。對於某個特定的工具,可能並沒有充分的理由讓你信任社區可以產出好的軟體。社區在目標、經驗和開源項目的投入時間方面各不相同。選擇工具時保持審慎態度十分重要,不要讓理想蒙蔽了判斷。