『壹』 一文解密Kafka,Kafka源碼設計與實現原理剖析,真正的通俗易懂
Apache Kafka (簡稱Kafka )最早是由Linkedln開源出來的分布式消息系統,現在是Apache旗下的一個子項目,並且已經成為開冊、領域應用最廣泛的消息系統之 Kafka社區也非常活躍,從 版本開始, Kafka 的標語已經從「一個高吞吐量、分布式的消息系統」改為「一個分布式的流平台」
關於Kafka,我打算從入門開始講起,一直到它的底層實現邏輯個原理以及源碼,建議大家花點耐心,從頭開始看,相信會對你有所收獲。
作為 個流式數據平台,最重要的是要具備下面 個特點
消息系統:
消息系統 也叫作消息隊列)主要有兩種消息模型:隊列和發布訂Kafka使用消費組( consumer group )統 上面兩種消息模型 Kafka使用隊列模型時,它可以將處理 作為平均分配給消費組中的消費者成員
下面我們會從 個角度分析Kafka 的幾個基本概念,並嘗試解決下面 個問題
消息由生產者發布到 fk 集群後,會被消費者消費 消息的消費模型有兩種:推送模型( pu和拉取模型( pull 基於推送模型的消息系統,由消息代理記錄消費者的消費狀態 消息代理在將消息推送到消費者後 標記這條消息為已消費
但這種方式無法很好地保證消息的處理語義 比如,消息代理把消息發送出去後,當消費進程掛掉或者由於網路原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經 這條消息標記為自己消費了,但實際上這條消息並沒有被實際處理) 如果要保證消息的處理語義,消息代理發送完消息後,要設置狀態為「已發送」,只有收到消費者的確認請求後才更新為「已消費」,這就需要在消息代理中記錄所有消息的消費狀態,這種做法也是不可取的
Kafka每個主題的多個分區日誌分布式地存儲在Kafka集群上,同時為了故障容錯,每個分區都會以副本的方式復制到多個消息代理節點上 其中一個節點會作為主副本( Leader ),其 節點作為備份副本( Follower ,也叫作從副本)
主副本會負責所有的客戶端讀寫操作,備份副本僅僅從主副本同步數據 當主副本 IH 現在故障時,備份副本中的 副本會被選擇為新的主副本 因為每個分區的副本中只有主副本接受讀寫,所以每個服務端都會作為某些分區的主副本,以及另外一些分區的備份副本這樣Kafka集群的所有服務端整體上對客戶端是負載均衡的
消息系統通常由生產者「pro ucer 消費者( co sumer )和消息代理( broke 大部分組成,生產者會將消息寫入消息代理,消費者會從消息代理中讀取消息 對於消息代理而言,生產者和消費者都屬於客戶端:生產者和消費者會發送客戶端請求給服務端,服務端的處理分別是存儲消息和獲取消息,最後服務端返回響應結果給客戶端
新的生產者應用程序使用 af aP oce 對象代表 個生產者客戶端進程 生產者要發送消息,並不是直接發送給 務端 ,而是先在客戶端 消息放入隊列 然後 一個 息發送線程從隊列中消息,以 鹽的方式發送消息給服務端 Kafka的記 集器( Reco dACCUl'lUlato )負責緩存生產者客戶端產生的消息,發送線程( Sende )負責讀取 集器的批 過網路發送給服務端為了保證客戶端 絡請求 快速 應, Kafka 用選擇器( Selecto 絡連接 讀寫 理,使網路連接( Netwo kCl i.ent )處理客戶端 絡請求
追加消息到記錄收集器時按照分區進行分組,並放到batches集合中,每個分區的隊列都保存了將發送到這個分區對應節點上的 記錄,客戶端的發送線程可 只使用 Sende 線程迭 batches的每個分區,獲取分區對應的主劇本節點,取出分區對應的 列中的批記錄就可以發送消息了
消息發送線程有兩種消息發送方式 按照分區直接發送 按照分區的目標節點發迭 假設有兩台伺服器, 題有 個分區,那麼每台伺服器就有 個分區 ,消息發送線程迭代batches的每個分 接往分區的主副本節點發送消息,總共會有 個請求 所示,我 先按照分區的主副本節點進行分組, 屬於同 個節點的所有分區放在一起,總共只有兩個請求做法可以大大減少網路的開銷
消息系統由生產者 存儲系統和消費者組成 章分析了生產者發送消息給服務端的過程,本章分析消費者從服務端存儲系統讀取生產者寫入消息的過程 首先我 來了解消費者的 些基礎知識
作為分布式的消息系統, Kafka支持多個生產者和多個消費者,生產者可以將消息發布到集群中不同節點的不同分區上;「肖費者也可以消費集群中多個節點的多個分區上的消息 寫消息時,多個生產者可以 到同 個分區 讀消息時,如果多個消費者同時讀取 個分區,為了保證將日誌文件的不同數據分配給不同的消費者,需要採用加鎖 同步等方式,在分區級別的日誌文件上做些控制
相反,如果約定「同 個分區只可被 個消費者處理」,就不需要加鎖同步了,從而可提升消費者的處理能力 而且這也並不違反消息的處理語義:原先需要多個消費者處理,現在交給一個消費者處理也是可以的 3- 給出了 種最簡單的消息系統部署模式,生產者的數據源多種多樣,它們都統寫人Kafka集群 處理消息時有多個消費者分擔任務 ,這些消費者的處理邏輯都相同, 每個消費者處理的分區都不會重復
因為分區要被重新分配,分區的所有者都會發生變 ,所以在還沒有重新分配分區之前 所有消費者都要停止已有的拉取錢程 同時,分區分配給消費者都會在ZK中記錄所有者信息,所以也要先刪ZK上的節點數據 只有和分區相關的 所有者 拉取線程都釋放了,才可以開始分配分區
如果說在重新分配分區前沒有釋放這些信息,再平衡後就可能造成同 個分區被多個消費者所有的情況 比如分區Pl 原先歸消費者 所有,如果沒有釋放拉取錢程和ZK節點,再平衡後分區Pl 被分配給消費者 了,這樣消費者 和消費者 就共享了分區Pl ,而這顯然不符合 fka 中關於「一個分區只能被分配給 個消費者」的限制條件 執行再平衡操作的步驟如下
如果是協調者節點發生故障,服務端會有自己的故障容錯機制,選出管理消費組所有消費者的新協調者節,點消費者客戶端沒有權利做這個工作,它能做的只是等待一段時間,查詢服務端是否已經選出了新的協調節點如果消費者查到現在已經有管理協調者的協調節點,就會連接這個新協調節,哉由於這個協調節點是服務端新選出來的,所以每個消費者都應該重新連接協調節點
消費者重新加入消費組,在分配到分區的前後,都會對消費者的拉取工作產生影響 消費者發送「加入組請求」之前要停止拉取消息,在收到「加入組響應」中的分區之後要重新開始拉取消息時,為了能夠讓客戶端應用程序感知消費者管理的分區發生變化,在加入組前後,客戶端還可以設置自定義的「消費者再平衡監聽器」,以便對分區的變化做出合適的處理
『貳』 Kafka 源碼解析之 Consumer 兩種 commit 機制和 partition 分配機制
先看下兩種不同的 commit 機制,一種是同步 commit,一種是非同步 commit,既然其作用都是 offset commit,應該不難猜到它們底層使用介面都是一樣的
同步 commit
同步 commit 的實現方式,client.poll() 方法會阻塞直到這個request 完成或超時才會返回。
非同步 commit
而對於非同步的 commit,最後調用的都是 doCommitOffsetsAsync 方法,其具體實現如下:
在非同步 commit 中,可以添加相應的回調函數,如果 request 處理成功或處理失敗,ConsumerCoordinator 會通過 () 方法喚醒相應的回調函數。
關鍵區別在於future是否會get,同步提交就是future會get.
consumer 提供的兩種不同 partition 分配策略,可以通過 partition.assignment.strategy 參數進行配置,默認情況下使用的是 org.apache.kafka.clients.consumer.RangeAssignor,Kafka 中提供另一種 partition 的分配策略 org.apache.kafka.clients.consumer.RoundRobinAssignor
用戶可以自定義相應的 partition 分配機制,只需要繼承這個 AbstractPartitionAssignor 抽象類即可。
AbstractPartitionAssignor
AbstractPartitionAssignor 有一個抽象方法,如下所示:
assign() 這個方法,有兩個參數:
RangeAssignor 和 RoundRobinAssignor 通過這個方法 assign() 的實現,來進行相應的 partition 分配。
直接看一下這個方法的實現:
假設 topic 的 partition 數為 numPartitionsForTopic,group 中訂閱這個 topic 的 member 數為 consumersForTopic.size(),首先需要算出兩個值:
分配的規則是:對於剩下的那些 partition 分配到前 consumersWithExtraPartition 個 consumer 上,也就是前 consumersWithExtraPartition 個 consumer 獲得 topic-partition 列表會比後面多一個。
在上述的程序中,舉了一個例子,假設有一個 topic 有 7 個 partition,group 有5個 consumer,這個5個 consumer 都訂閱這個 topic,那麼 range 的分配方式如下:
而如果 group 中有 consumer 沒有訂閱這個 topic,那麼這個 consumer 將不會參與分配。下面再舉個例子,將有兩個 topic,一個 partition 有5個,一個 partition 有7個,group 有5個 consumer,但是只有前3個訂閱第一個 topic,而另一個 topic 是所有 consumer 都訂閱了,那麼其分配結果如下:
這個是 roundrobin 的實現,其實現方法如下:
roundrobin 的實現原則,簡單來說就是:列出所有 topic-partition 和列出所有的 consumer member,然後開始分配,一輪之後繼續下一輪,假設有有一個 topic,它有7個 partition,group 有3個 consumer 都訂閱了這個 topic,那麼其分配方式為:
對於多個 topic 的訂閱,將有兩個 topic,一個 partition 有5個,一個 partition 有7個,group 有5個 consumer,但是只有前3個訂閱第一個 topic,而另一個 topic 是所有 consumer 都訂閱了,那麼其分配結果如下:
roundrobin 分配方式與 range 的分配方式還是略有不同。
『叄』 使用IDEA 打開、調試kafka源碼
kafka的啟動類是隱激:
core/src/main/scala/kafka/Kafka.scala
我嘗試在本地運晌譽行,死灶謹襪活跑不起來,報錯如下。網上也沒有找到靠譜的解決辦法。
嘗試本地運行失敗後,又嘗試了遠程調試的方式。