1. Kafka相關內容總結(Kafka集群搭建手記)
Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規范的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成為Procer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是procer和consumer都依賴於zookeeper來保證系統可用性集群保存一些meta信息。
入門請參照: https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html
在此不再贅述。
這部分不是本文的重點,但是kafka需要用到kafka集群,所以先搭建kafka集群。
從kafka官方文檔看到,kafka似乎在未來的版本希望拋棄zookeep集群,自己維護集群的一致性,拭目以待吧。
我們搭建集群使用的是三台同機房的機器,因為zookeeper不怎麼占資源也不怎麼占空間(我們的業務目前比較簡單),所以三台機器上都搭建了zookeeper集群。
搭建zookeeper集群沒什麼難度,參考文檔: http://www.cnblogs.com/huangxincheng/p/5654170.html
下面列一下我的配置並解析:
一共用三台物理機器,搭建一個Kafka集群。
每台伺服器的硬碟劃分都是一樣的,每個獨立的物理磁碟掛在一個單獨的分區裡面,這樣很方便用於Kafka多個partition的數據讀寫與冗餘。
/data1比較小,為了不成為集群的瓶頸,所以/data1用於存放kafka以及Zookeeper
每台機器的磁碟分布如下:
下面是kafka的簡單配置,三台伺服器都一樣,如有不一致的在下文有說明。
kafka安裝在目錄/usr/local/kafka/下,下面的說明以10.1.xxx.57為例。
最重要的配置文件server.properties,需要配置的信息如下:
從上面的配置看到,kafka集群不需要像hadoop集群那樣,配置ssh通訊,而且一個kafka伺服器(官方文檔稱之為broker,下面統一使用這個稱呼)並不知道其他的kafka伺服器的存在,因此你需要逐個broker去啟動kafka。各個broker根據自己的配置,會自動去配置文件上的zk伺服器報到,這就是一個有zk伺服器粘合起來的kafka集群。
我寫了一個啟動腳本,放在 /usr/local/kafka/bin 下面。啟動腳本每個broker都一樣:
如同kafka集群裡面每一個broker都需要單獨啟動一樣,kafka集群裡面每一個broker都需要單獨關閉。
官方給出的關閉腳本是單獨運行 bin/kafka-server-stop.sh
但是我運行的結果是無法關閉。打開腳本一看,才發現是最簡單的辦法,發一個TERM信號到kafka的java進程,官方腳本給出的grep有點問題。
發信號之後,一直tail著kafka日誌,看到正常關閉。
指定zookeeper伺服器,topic名稱是LvsKafka(注意topic名稱不能有英文句號(.)和下劃線(_),否則會通不過,理由是名稱會沖突,下文對此略有解析)
replication-factor指出重復因子是2,也就是每條數據有兩個拷貝,可靠性考慮。
partitions 指出需要多少個partition,數據量大的多一點,無論生產和消費,這是負載均衡和高並發的需要。
可以看到剛才新建的24個partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59這台機器。
建立topic時我們指出需要2個拷貝,從上面的輸出的Replicas欄位看到,這兩個拷貝放在59,58兩個機器,也就是10.1.xxx.59和10.1.xxx.58.
Isr表示當前partition的所有拷貝所在的機器中,哪些是還活著(可以提供服務)的。現在是59和58都還存活。
這個命令另外還會看到一些類似於下面的內容:
__consumer_offsets到底是什麼呢?其實就是客戶端的消費進度,客戶端會定時上報到kafka集群,而kafka集群會把每個客戶端的消費進度放入一個自己內部的topic中,這個topic就是__consumer_offsets。我查看過__consumer_offsets的內容,其實就是每個客戶端的消費進度作為一條消息,放入__consumer_offsets這個topic中。
這里給了我們兩個提示:
1、kafka自己管理客戶端的消費進度,而不是依靠zk,這就是kafka官方文檔說的kafka未來會拋棄zk的底氣之一;
2、留意到這個kafka自己的topic是帶下劃線的,也就是,kafka擔心我們自己建的topic如果帶下劃線的話會跟這些內部自用的topic沖突;
2. kafka入門:一個開源的、輕量級、高吞吐、高可用的分布式消息系統
隨著信息技術的快速發展及互聯網用戶規模的急劇增長,計算機所存儲的信息量正呈爆炸式增長,目前數據量已進入大規模和超大規模的海量數據時代, 如何高效地存儲、分析、處理和挖掘海量數據 已成為技術研究領域的熱點和難點問題。而 如何採集和運營管理、分析這些數據 也是大數據處理中一個至關重要的組成環節,這就需要相應的基礎設施對其提供支持。針對這個需求,當前業界已有很多開源的消息系統應運而生,kafka就是一款當然非常流行的消息系統。
Kafka是一款開源的、輕量級的、分布式、可分區和具有復制備份的(Replicated)、基於ZooKeeper協調管理的分布式流平台的功能強大的消息系統。作為一個流式處理平台,必須具備以下3個關鍵特性:
1) 能夠允許發布和訂閱流數據。
2) 存儲流數據時提供相應的容錯機制。
3) 當流數據到達時能夠被及時處理。
消息流系統kafka的基本結構包括生產者和消費者,以及kafka集群。
生產者負責生產消息,將消息寫入Kafka集群;消費者從Kafka集群中拉取消息。
消息是Kafka通信的基本單位 ,由一個 固定長度的消息頭 和一個 可變長度的消息體 構成。
Kafka將 一組消息 抽象歸納為一個主題(Topic),也就是說,一個主題是對消息的一個分類。 生產者將消息指定主題發送到kafka集群,消費者訂閱主題或主題的某些分區進行消費。
Kafka將一組消息歸納為一個主題,而 每個主題又被分成一個或多個分區(Partition) 。每個分區由一系列有序、不可變的消息組成,是一個有序隊列。 每個分區在物理上對應為一個文件夾 ,分區的命名規則為主題名稱後接「—」連接符,之後再接分區編號,分區編號從0開始,編號最大值為分區的總數減1。
分區使得Kafka在並發處理上變得更加容易,理論上來說,分區數越多吞吐量越高,但這要根據集群實際環境及業務場景而定。同時,分區也是Kafka保證消息被順序消費以及對消息進行負載均衡的基礎。
疑問和答案 :分區如何保證消息被順序消費?每個分區內的消息是有序的,但不同分區間如何保證?猜測是分區從存儲空間上比較大,分區個數少。順序消費的主要因素在分區內的消息,分區間的可以忽略。高吞吐率順序寫磁碟估計也是這個原因。
Kafka只能保證一個分區之內消息的有序性,並不能保證跨分區消息的有序性。 每條消息被追加到相應的分區中,是順序寫磁碟,因此效率非常高,這是Kafka高吞吐率的一個重要保證 。同時與傳統消息系統不同的是,Kafka並不會立即刪除已被消費的消息,由於磁碟的限制消息也不會一直被存儲,因此 Kafka提供兩種刪除老數據的策略 ,一是基於消息已存儲的時間長度,二是基於分區的大小。這兩種策略都能通過配置文件進行配置。
每個分區又有一至多個副本(Replica),分區的副本分布在集群的不同代理上,以提高可用性。
從存儲角度上分析,分區的每個副本在邏輯上抽象為一個日誌(Log)對象,即分區的副本與日誌對象是一一對應的。每個主題對應的 分區數 可以在Kafka啟動時所載入的配置文件中配置,也可以在創建主題時指定。當然,客戶端還可以在主題創建後修改主題的分區數。
為什麼副本要分Leader和Follower? 如果沒有Leader副本,就需要所有的副本都同時負責讀/寫請求處理,同時還得保證這些副本之間數據的一致性,假設有n個副本則需要有n×n條通路來同步數據,這樣數據的一致性和有序性就很難保證。
為解決這個問題,Kafka選擇分區的一個副本為Leader,該分區其他副本為Follower,只有 Leader副本 才負責處理客戶端 讀/寫請求 ,Follower副本從Leader副本同步數據。
引入Leader副本後客戶端只需與Leader副本進行交互,這樣數據一致性及順序性就有了保證。Follower副本從Leader副本同步消息,對於n個副本只需n-1條通路即可,這樣就使得系統更加簡單而高效。
副本Follower與Leader的角色並不是固定不變的,如果Leader失效,通過相應的選舉演算法將從其他Follower副本中選出新的Leader副本。
疑問 :leader副本和follower副本是如何選出來的?通過zookeeper選舉的嘛?
Kafka在ZooKeeper中動態維護了一個 ISR(In-sync Replica) ,即保存同步的副本列表,該列表中保存的是與Leader副本保持消息同步的所有副本對應的代理節點id。 如果一個Follower副本宕機或是落後太多 ,則該Follower副本節點將 從ISR列表中移除 。 本書用宕機 來特指某個代理失效的情景,包括但不限於代理被關閉,如代理被人為關閉或是發生物理故障、心跳檢測過期、網路延遲、進程崩潰等。
任何發布到分區的消息會被直接追加到日誌文件的尾部(分區目錄下以「.log」為文件名後綴的數據文件),而每條 消息 在日誌文件中的位置都會對應一個按序遞增的 偏移量 。偏移量是一個分區下嚴格有序的 邏輯值 ,它並不表示消息在磁碟上的物理位置。由於Kafka幾乎不允許對消息進行隨機讀寫,因此Kafka並沒有提供額外索引機制到存儲偏移量。
消費者可以通過控制消息偏移量來對消息進行消費 ,如消費者可以指定消費的起始偏移量。 為了保證消息被順序消費,消費者已消費的消息對應的偏移量也需要保存 。需要說明的是,消費者對消息偏移量的操作並不會影響消息本身的偏移量。舊版消費者將消費偏移量保存到ZooKeeper當中, 而新版消費者是將消費偏移量保存到Kafka內部一個主題當中。 當然,消費者也可以自己在外部系統保存消費偏移量,而無需保存到Kafka中。
推測 :一個主題有多個分區,一個分區有多個副本。一個主題(一類消息)有多個分區(消息被分段),一個分區(每段消息)有多個副本(每段消息的副本數)。消息一旦發給kafka,就會分配一個偏移量,在多個副本中的偏移量是一樣的。這樣的話,消費者通過偏移量消費時對於多個副本就沒有差異性。
Kafka集群由一個或多個Kafka實例構成,每一個Kafka實例稱為代理(Broker),通常也稱代理為Kafka伺服器(KafkaServer)。在生產環境中Kafka集群一般包括一台或多台伺服器,我們可以在一台伺服器上配置一個或多個代理。 每一個代理都有唯一的標識id,這個id是一個非負整數 。在一個Kafka集群中,每增加一個代理就需要為這個代理配置一個與該集群中其他代理不同的id, id值可以選擇任意非負整數即可,只要保證它在整個Kafka集群中唯一,這個id就是代理的名字,也就是在啟動代理時配置的broker.id對應的值。
生產者(Procer)負責將消息發送給代理,也就是向Kafka代理發送消息的客戶端。
消費者(Comsumer)以拉取(pull)方式拉取數據,它是消費的客戶端。在Kafka中 每一個消費者都屬於一個特定消費組 (ConsumerGroup),可以為每個消費者指定一個消費組,以groupId代表消費組名稱,通過group.id配置設置。 如果不指定消費組 ,則該消費者屬於默認消費組test-consumer-group。
每個消費者有一個全局唯一的id ,通過配置項client.id指定, 如果客戶端沒有指定消費者的id, Kafka會自動為該消費者生成一個全局唯一的id,格式為${groupId}-${hostName}-${timestamp}-${UUID前8位字元}。 同一個主題的一條消息只能被同一個消費組下某一個消費者消費 ,但不同消費組的消費者可同時消費該消息。 消費組是Kafka用來實現對一個主題消息進行廣播和單播的手段 ,實現消息廣播只需指定各消費者均屬於不同的消費組,消息單播則只需讓各消費者屬於同一個消費組。
推論: kafka消息是按照消息類型(主題),在一個消費者組中只能消費一次。也就是一個消費者組只消費一類型的消息。如果某個服務要消費一類消息,必須將自己置為不同的消費者組。
Kafka利用ZooKeeper保存相應元數據信息, Kafka元數據信息包括如代理節點信息、Kafka集群信息、舊版消費者信息及其消費偏移量信息、主題信息、分區狀態信息、分區副本分配方案信息、動態配置信息等。 Kafka在啟動或運行過程當中會在ZooKeeper上創建相應節點 來保存元數據信息, Kafka通過監聽機制在這些節點注冊相應監聽器來監聽節點元數據的變化 ,從而由ZooKeeper負責管理維護Kafka集群,同時通過ZooKeeper我們能夠很方便地對Kafka集群進行水平擴展及數據遷移。