導航:首頁 > 編程語言 > kafkajava消費

kafkajava消費

發布時間:2022-09-10 15:02:20

① 消息隊列原理及選型

消息隊列(Message Queue)是一種進程間通信或同一進程的不同線程間的通信方式。

Broker(消息伺服器)
Broker的概念來自與Apache ActiveMQ,通俗的講就是MQ的伺服器。

Procer(生產者)
業務的發起方,負責生產消息傳輸給broker

Consumer(消費者)
業務的處理方,負責從broker獲取消息並進行業務邏輯處理

Topic(主題)
發布訂閱模式下的消息統一匯集地,不同生產者向topic發送消息,由MQ伺服器分發到不同的訂閱 者,實現消息的廣播

Queue(隊列)
PTP模式下,特定生產者向特定queue發送消息,消費者訂閱特定的queue完成指定消息的接收。

Message(消息體)
根據不同通信協議定義的固定格式進行編碼的數據包,來封裝業務數據,實現消息的傳輸

點對點模型用於消息生產者和消息消費者之間點到點的通信。

點對點模式包含三個角色:

每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,可以放在內存 中也可以持久化,直到他們被消費或超時。

特點:

發布訂閱模型包含三個角色:

多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。

特點:

AMQP即Advanced Message Queuing Protocol,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP 的主要特徵是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。

優點:可靠、通用

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。該協議支持所有平台,幾乎可以把所有聯網物品和外部連接起來,被用來當做感測器和致動器(比如通過Twitter讓房屋聯網)的通信協議。

優點:格式簡潔、佔用帶寬小、移動端通信、PUSH、嵌入式系統

STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息協議,是一種為MOM(Message Oriented Middleware,面向消息的中間件)設計的簡單文本協議。STOMP提供一個可互操作的連接格式,允許客戶端與任意STOMP消息代理(Broker)進行交互。

優點:命令模式(非topicqueue模式)

XMPP(可擴展消息處理現場協議,Extensible Messaging and Presence Protocol)是基於可擴展標記語言(XML)的協議,多用於即時消息(IM)以及在線現場探測。適用於伺服器之間的准即時操作。核心是基於XML流傳輸,這個協議可能最終允許網際網路用戶向網際網路上的其他任何人發送即時消息,即使其操作系統和瀏覽器不同。

優點:通用公開、兼容性強、可擴展、安全性高,但XML編碼格式佔用帶寬大

RabbitMQ 是實現 AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。 RabbitMQ 主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者無法快速消費,那麼需要一個中間層。保存這個數據。

RabbitMQ 是一個開源的 AMQP 實現,伺服器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

Channel(通道)
道是兩個管理器之間的一種單向點對點的的通信連接,如果需要雙向交流,可以建立一對通道。

Exchange(消息交換機)
Exchange類似於數據通信網路中的交換機,提供消息路由策略。

RabbitMq中,procer不是通過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行綁定,procer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由演算法,將消息路由給指定的queue。和Queue一樣,Exchange也可設置為持久化,臨時或者自動刪除。

Exchange有4種類型:direct(默認),fanout, topic, 和headers。
不同類型的Exchange轉發消息的策略有所區別:

Binding(綁定)
所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定可以是多對多的關系。

Routing Key(路由關鍵字)
exchange根據這個關鍵字進行消息投遞。

vhost(虛擬主機)
在RabbitMq server上可以創建多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當於物理的server,可以為不同app提供邊界隔離,使得應用安全的運行在不同的vhost實例上,相互之間不會干擾。procer和consumer連接rabbit server需要指定一個vhost。

假設P1和C1注冊了相同的Broker,Exchange和Queue。P1發送的消息最終會被C1消費。
基本的通信流程大概如下所示:

Consumer收到消息時需要顯式的向rabbit broker發送basic。ack消息或者consumer訂閱消息時設置auto_ack參數為true。

在通信過程中,隊列對ACK的處理有以下幾種情況:

即消息的Ackownledge確認機制,為了保證消息不丟失,消息隊列提供了消息Acknowledge機制,即ACK機制,當Consumer確認消息已經被消費處理,發送一個ACK給消息隊列,此時消息隊列便可以刪除這個消息了。如果Consumer宕機/關閉,沒有發送ACK,消息隊列將認為這個消息沒有被處理,會將這個消息重新發送給其他的Consumer重新消費處理。

消息的收發處理支持事務,例如:在任務中心場景中,一次處理可能涉及多個消息的接收、處理,這應該處於同一個事務范圍內,如果一個消息處理失敗,事務回滾,消息重新回到隊列中。

消息的持久化,對於一些關鍵的核心業務來說是非常重要的,啟用消息持久化後,消息隊列宕機重啟後,消息可以從持久化存儲恢復,消息不丟失,可以繼續消費處理。

fanout 模式
模式特點:

direct 模式
任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue。

如果一個exchange 聲明為direct,並且bind中指定了routing_key,那麼發送消息時需要同時指明該exchange和routing_key。

簡而言之就是:生產者生成消息發送給Exchange, Exchange根據Exchange類型和basic_publish中的routing_key進行消息發送 消費者:訂閱Exchange並根據Exchange類型和binding key(bindings 中的routing key) ,如果生產者和訂閱者的routing_key相同,Exchange就會路由到那個隊列。

topic 模式
前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。

topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同。
它約定:

以上圖中的配置為例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,因為它們沒有匹配任何bindingKey。

RabbitMQ,部署分三種模式:單機模式,普通集群模式,鏡像集群模式。

普通集群模式
多台機器部署,每個機器放一個rabbitmq實例,但是創建的queue只會放在一個rabbitmq實例上,每個實例同步queue的元數據。

如果消費時連的是其他實例,那個實例會從queue所在實例拉取數據。這就會導致拉取數據的開銷,如果那個放queue的實例宕機了,那麼其他實例就無法從那個實例拉取,即便開啟了消息持久化,讓rabbitmq落地存儲消息的話,消息不一定會丟,但得等這個實例恢復了,然後才可以繼續從這個queue拉取數據, 這就沒什麼高可用可言,主要是提供吞吐量 ,讓集群中多個節點來服務某個queue的讀寫操作。

鏡像集群模式

queue的元數據和消息都會存放在多個實例,每次寫消息就自動同步到多個queue實例里。這樣任何一個機器宕機,其他機器都可以頂上,但是性能開銷太大,消息同步導致網路帶寬壓力和消耗很重,另外,沒有擴展性可言,如果queue負載很重,加機器,新增的機器也包含了這個queue的所有數據,並沒有辦法線性擴展你的queue。此時,需要開啟鏡像集群模式,在rabbitmq管理控制台新增一個策略,將數據同步到指定數量的節點,然後你再次創建queue的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。

Kafka 是 Apache 的子項目,是一個高性能跨語言的分布式發布/訂閱消息隊列系統(沒有嚴格實現 JMS 規范的點對點模型,但可以實現其效果),在企業開發中有廣泛的應用。高性能是其最大優勢,劣勢是消息的可靠性(丟失或重復),這個劣勢是為了換取高性能,開發者可以以稍降低性能,來換取消息的可靠性。

一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息。它唯一的標記一條消息。kafka並沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行「隨機讀寫」。

Kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即刪除。日誌文件將會根據broker中的配置要求,保留一定的時間之後刪除;比如log文件保留2天,那麼兩天後,文件會被清除,無論其中的消息是否被消費。kafka通過這種簡單的手段,來釋放磁碟空間,以及減少消息消費之後對文件內容改動的磁碟IO開支。

對於consumer而言,它需要保存消費消息的offset,對於offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費。事實上consumer可以使用任意順序消費消息,它只需要將offset重置為任意值。(offset將會保存在zookeeper中,參見下文)

kafka集群幾乎不需要維護任何consumer和procer狀態信息,這些信息有zookeeper保存;因此procer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響。

partitions的設計目的有多個。最根本原因是kafka基於文件存儲。通過分區,可以將日誌內容分散到多個server上,來避免文件尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions,來消息保存/消費的效率。此外越多的partitions意味著可以容納更多的consumer,有效提升並發消費的能力。(具體原理參見下文)。

一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實例)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多台機器上,以提高可用性。

基於replicated方案,那麼就意味著需要對多個備份進行調度;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步消息即可。由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定。

Procers
Procer將消息發布到指定的Topic中,同時Procer也能決定將此消息歸屬於哪個partition;比如基於"round-robin"方式或者通過其他的一些演算法等。

Consumers
本質上kafka只支持Topic。每個consumer屬於一個consumer group;反過來說,每個group中可以有多個consumer。發送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費。

如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負載均衡。

如果所有的consumer都具有不同的group,那這就是"發布-訂閱";消息將會廣播給所有的消費者。

在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息。kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的。事實上,從Topic角度來說,消息仍不是有序的。

Kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到消息。

Guarantees

Kafka就比較適合高吞吐量並且允許少量數據丟失的場景,如果非要保證「消息可靠傳輸」,可以使用JMS。

Kafka Procer 消息發送有兩種方式(配置參數 procer.type):

對於同步方式(procer.type=sync)?Kafka Procer 消息發送有三種確認方式(配置參數 acks):

kafka的設計初衷是希望作為一個統一的信息收集平台,能夠實時的收集反饋信息,並需要能夠支撐較大的數據量,且具備良好的容錯能力。

持久性
kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的本身特性。且無論任何OS下,對文件系統本身的優化幾乎沒有可能。文件緩存/直接內存映射等是常用的手段。因為kafka是對日誌文件進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO調用的次數。

性能
需要考慮的影響性能點很多,除磁碟IO之外,我們還需要考慮網路IO,這直接關繫到kafka的吞吐量問題。kafka並沒有提供太多高超的技巧;對於procer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對於consumer端也是一樣,批量fetch多條消息。不過消息量的大小可以通過配置文件來指定。對於kafka broker端,似乎有個sendfile系統調用可以潛在的提升網路IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次和交換。 其實對於procer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮。可以將任何在網路上傳輸的消息都經過壓縮。kafka支持gzip/snappy等多種壓縮方式。

生產者
負載均衡: procer將會和Topic下所有partition leader保持socket連接;消息由procer直接通過socket發送到broker,中間不會經過任何「路由層「。事實上,消息被路由到哪個partition上,有procer客戶端決定。比如可以採用「random「「key-hash「「輪詢「等,如果一個topic中有多個partitions,那麼在procer端實現「消息均衡分發「是必要的。

其中partition leader的位置(host:port)注冊在zookeeper中,procer作為zookeeper client,已經注冊了watch用來監聽partition leader的變更事件。
非同步發送:將多條消息暫且在客戶端buffer起來,並將他們批量的發送到broker,小數據IO太多,會拖慢整體的網路延遲,批量延遲發送事實上提升了網路效率。不過這也有一定的隱患,比如說當procer失效時,那些尚未發送的消息將會丟失。

消費者
consumer端向broker發送「fetch」請求,並告知其獲取消息的offset;此後consumer將會獲得一定條數的消息;consumer端也可以重置offset來重新消費消息。

在JMS實現中,Topic模型基於push方式,即broker將消息推送給consumer端。不過在kafka中,採用了pull方式,即consumer在和broker建立連接之後,主動去pull(或者說fetch)消息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch消息並處理,且可以控制消息消費的進度(offset);此外,消費者可以良好的控制消息消費的數量,batch fetch。

其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態。這就要求JMS broker需要太多額外的工作。在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的。當消息被consumer接收之後,consumer可以在本地保存最後消息的offset,並間歇性的向zookeeper注冊offset。由此可見,consumer客戶端也很輕量級。

對於JMS實現,消息傳輸擔保非常直接:有且只有一次(exactly once)。
在kafka中稍有不同:

at most once: 消費者fetch消息,然後保存offset,然後處理消息;當client保存offset之後,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理。那麼此後"未處理"的消息將不能被fetch到,這就是"at most once"。

at least once: 消費者fetch消息,然後處理消息,然後保存offset。如果消息處理成功之後,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。

exactly once: kafka中並沒有嚴格的去實現(基於2階段提交,事務),我們認為這種策略在kafka中是沒有必要的。

通常情況下「at-least-once」是我們首選。(相比at most once而言,重復接收數據總比丟失數據要好)。

kafka高可用由多個broker組成,每個broker是一個節點;

創建一個topic,這個topic會劃分為多個partition,每個partition存在於不同的broker上,每個partition就放一部分數據。

kafka是一個分布式消息隊列,就是說一個topic的數據,是分散放在不同的機器上,每個機器就放一部分數據。

在0.8版本以前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。

0.8版本以後,才提供了HA機制,也就是就是replica副本機制。每個partition的數據都會同步到其他的機器上,形成自己的多個replica副本。然後所有replica會選舉一個leader出來,那麼生產和消費都跟這個leader打交道,然後其他replica就是follower。

寫的時候,leader會負責把數據同步到所有follower上去,讀的時候就直接讀leader上數據即可。

kafka會均勻的將一個partition的所有replica分布在不同的機器上,從而提高容錯性。

如果某個broker宕機了也沒事,它上面的partition在其他機器上都有副本的,如果這上面有某個partition的leader,那麼此時會重新選舉一個新的leader出來,大家繼續讀寫那個新的leader即可。這就有所謂的高可用性了。

寫數據的時候,生產者就寫leader,然後leader將數據落地寫本地磁碟,接著其他follower自己主動從leader來pull數據。一旦所有follower同步好數據了,就會發送ack給leader,leader收到所有follower的ack之後,就會返回寫成功的消息給生產者。

消息丟失會出現在三個環節,分別是生產者、mq中間件、消費者:

RabbitMQ

Kafka
大體和RabbitMQ相同。

Rabbitmq
需要保證順序的消息投遞到同一個queue中,這個queue只能有一個consumer,如果需要提升性能,可以用內存隊列做排隊,然後分發給底層不同的worker來處理。

Kafka
寫入一個partition中的數據一定是有序的。生產者在寫的時候 ,可以指定一個key,比如指定訂單id作為key,這個訂單相關數據一定會被分發到一個partition中去。消費者從partition中取出數據的時候也一定是有序的,把每個數據放入對應的一個內存隊列,一個partition中有幾條相關數據就用幾個內存隊列,消費者開啟多個線程,每個線程處理一個內存隊列。

② java工程kafka傳遞自定義對象,消費端獲取到的是null

3. 啟服務
3.1 啟zookeeper
啟zk兩種式第種使用kafka自帶zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另種使用其zookeeper位於本機位於其址種情況需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 啟 kafka
bin/kafka-server-start.sh config/server.properties
4.創建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
創建名testtopic副本區
通list命令查看剛剛創建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.啟procer並發送消息啟procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
啟發送消息

test
hello boy
按Ctrl+C退發送消息
6.啟consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
啟consumerconsole看procer發送消息
啟兩終端發送消息接受消息
都行查看zookeeper進程kafkatopic步步排查原吧

③ 用Kafka和Java搭建的項目,Kafka管理中心在什麼情況下會重復發送消息消費端的程序接收到消息,進入方法

非手動提交offset

消費者只要讀取到數據,就會修改offset,不需要方法體執行完

手動提交

需要手動提交代碼執行完畢

針對你的問題,情況有很多種可能。

  1. 你是否開啟手動提交offset

  2. 你的消費者,有幾個?是否是同一個組?

④ clickhouse與kafka集成

clickhouse支持與多種存儲引擎集成,可以從集成的引擎裡面讀取消息,然後寫到真正的數據存儲表裡。

clickhouse批量寫入的性能比較好,我們的業務場景下會大批量的產生數據,如果使用clickhouse-jdbc去寫的,寫入時機和每批次寫入的數量不好把控,最終選擇了先將消息寫入kafka,然後由clickhouse從kafka消費數據,clickhouse server消費到數據之後寫入真正的數據表。

clickhouse集成kafka引擎見官方文檔:
https://clickhouse.com/docs/zh/engines/table-engines/integrations/kafka/

下面的介紹會與官方文檔有重復,然後補充一些集成過程中遇到的坑。

下面介紹clickhouse與kafka集成的步驟,clickhouse版本是22.1.3.7

必要參數

可選參數

關於必選參數中的kafka_format參數,參見Formats部分,format具體解釋如下
https://clickhouse.com/docs/zh/interfaces/formats/ 。

JSONEachRow, JSONStringsEachRow, JSONCompactEachRow, JSONCompactStringsEachRow
這幾種格式,ClickHouse會將行輸出為用換行符分隔的JSON值,這些輸出數據作為一個整體時,由於沒有分隔符(,)因而不是有效的JSON文檔。
官方文檔給了一些示例。

由於我的真實的數據表,有一個欄位是json類型的字元串,但是一開始設置kafka_format的類型為JSONEachRow時,從kafka消費數據會報錯,所以kafka_format格式設置成了JSONAsString,具體的錯誤後面貼出來。

創建kafka引擎表,用於從kafka消費數據

由於我的數據結構里有嵌套json,如果使用JSONEachRow,有個欄位是json類型的字元串,帶轉義字元,導致clickhouse解析失敗,沒找到解決辦法,所以使用了JSONAsString格式。

一個簡單的MergeTree引擎的表,其中content是json格式的字元串。

創建的物化視圖用於把從kafka消費到的數據,寫到真實的數據表裡,在這個例子里,msg_json_source從kafka消費到數據,然後通過物化視圖msg_json_source_consumer將消費到的數據寫到真實的數據表msg_target中。

由於從kafka消費到的數據就是一個json字元串,在這里使用JSONExtractString等json欄位提取工具,提取msg里的欄位,比如biz,sender_id,content等欄位。

status_time原本計劃用DatTime64類型的,但是這個時間格式有坑,最終選擇了使用UInt64存毫秒級時間戳,具體的問題下面再介紹。

在clickhouse創建好3張表之後(kafka引擎表,真實數據表,物化視圖表),往kafka發消息
本地安裝一個簡易的kafka服務端,然後創建topic

創建好topic之後,使用Java客戶端往kafka發消息,使用confluent client發也可以。
添加kafka依賴

實體類,使用fastjson的@JSONField註解,實體類轉字元串的時候,將駝峰轉換為下劃線

測試類

最終發送完,我們查看一下clickhouse里的數據表的數據,可以發現我們發送到kakfa里的數據,已經成功的消費,並且寫入到真實的數據表裡了。

當時測試環境部署的版本是21.9,但是這個版本有問題,不推薦安裝,建議直接部署22以上的clickhouse

我一開始就是使用的JSONEachRow格式,但是我的消息體里還有嵌套的json,類似下面這種格式,裡面有個欄位還是個json,轉行成字元串帶轉義字元。
然後消息體的string字元串貼一條在這里

然後clickhouse解析消息體報錯,當時的錯找不到了,現在復現不出來了,非常的難頂。。。。
後來因為趕版本的原因把kafka_format換成了JSONAsString。

clickhouse是支持DateTime64格式的,可以到毫秒級,但是實際使用過程中卻有些坑在,

首先是有的客戶端解析毫秒字元串有問題,其次是使用JSONExtract*的方法,會有差異,再然後是jdbc查詢的時候,也會導致時間查詢有問題。
拿毫秒時間戳和秒級時間戳做試驗,clickhouse-server版本是22.3.1.1

把上面的kafka引擎表拿出來改一下

其中status_time這個欄位的類型改成DateTime64(3, 'Asia/Shanghai'),使用JSONExtractUInt提取時間,看下效果

首先發條數據,數據內容如下

傳入的是毫秒級時間戳,然後數據表存儲的時候就變成了2282年

然後如果傳入秒級的時間戳,真實的數據是這樣

clickhouse存儲的時候看著時間正常了,但是毫秒丟失了

然後修改一下物化視圖的欄位提取方式,之前是 JSONExtractUInt(msg,'status_time') as status_time,現在改成使用 JSONExtractString(msg,'status_time') as status_time提取時間
會發現時間類型又正常了。

這一條數據內容如下

最終使用JSONExtractString提取毫秒時間戳,得到了正確的DateTime64的時間,非常的神奇

最終我決定來了個釜底抽薪的方法,時間直接用UInt64存,因為我發送出去的數據是毫秒級時間戳,最終存時間戳,查詢時間范圍的時候直接用long類型的數據between好了。

這也是無奈之舉,萬一哪天server更新版本,導致時間出現問題,那就完蛋了,希望後面時間可以穩定一點吧。

⑤ kafka消費者java版本讀取不到消息怎麼辦

可以連接到一個網路伺服器並且能夠從這個伺服器下載指定的URL,程序中直接使用HTTP協議。程序將定義一個輸出流,下載的URL的內容將來被寫入這個流,通過socket來獲得輸入和輸出流:viewsourceprint?01importjava.io.*;02importjava.net.*;03publicclassHttpClient{04publicstaticvoidmain(String[]args){05try{06//Demo參數:07if((args.length!=1)&&(args.length!=2))("Wrongnumberofargs");09//定義輸出流,下載的URL內容被寫入這個流10OutputStreamto_file;11if(args.length==2)to_file=newFileOutputStream(args[1]);12elseto_file=System.out;13//使用URL類來把用戶指定的URL解析成幾個部分14URLurl=newURL(args[0]);15Stringprotocol=url.getProtocol();16if(!protocol.equals("http"))//檢驗是否滿足支持的協議("Mustuse'http:'protocol");18Stringhost=url.getHost();19intport=url.getPort();20if(port==-1)port=80;//如果沒有指定埠,用默認埠21Stringfilename=url.getFile();22//打開一個連接到指定主機和埠的網路socket連接23Socketsocket=newSocket(host,port);24//通過socket來獲得輸入和輸出流25InputStreamfrom_server=socket.getInputStream();26PrintWriterto_server=newPrintWriter(socket.getOutputStream());2728//發送HTTPGET命令給網路伺服器,指定要下載的文件29//使用了一個老版本非常簡單的HTTP協議30to_server.print("GET"+filename+"\n\n");31to_server.flush();//立即發送32//現在讀取伺服器的響應,把接收到的內容寫入文件33byte[]buffer=newbyte[4096];34intbytes_read;35while((bytes_read=from_server.read(buffer))!=-1)36to_file.write(buffer,0,bytes_read);37//當伺服器關閉連接時,也關閉stuff38socket.close();39to_file.close();40}41catch(Exceptione){//發布錯誤42System.err.println(e);43System.err.println("Usage:javaHttpClient[]");44}45}46}

⑥ kafka——消費者原理解析

kafka採用發布訂閱模式:一對多。發布訂閱模式又分兩種:

Kafka為這兩種模型提供了單一的消費者抽象模型: 消費者組 (consumer group)。 消費者用一個消費者組名標記自己。 一個發布在Topic上消息被分發給此消費者組中的一個消費者。 假如所有的消費者都在一個組中,那麼這就變成了隊列模型。 假如所有的消費者都在不同的組中,那麼就完全變成了發布-訂閱模型。 一個消費者組中消費者訂閱同一個Topic,每個消費者接受Topic的一部分分區的消息,從而實現對消費者的橫向擴展,對消息進行分流。

注意:當單個消費者無法跟上數據生成的速度,就可以增加更多的消費者分擔負載,每個消費者只處理部分partition的消息,從而實現單個應用程序的橫向伸縮。但是不要讓消費者的數量多於partition的數量,此時多餘的消費者會空閑。此外,Kafka還允許多個應用程序從同一個Topic讀取所有的消息,此時只要保證每個應用程序有自己的消費者組即可。

消費者組的概念就是:當有多個應用程序都需要從Kafka獲取消息時,讓每個app對應一個消費者組,從而使每個應用程序都能獲取一個或多個Topic的全部消息;在每個消費者組中,往消費者組中添加消費者來伸縮讀取能力和處理能力,消費者組中的每個消費者只處理每個Topic的一部分的消息,每個消費者對應一個線程。

在同一個群組中,無法讓一個線程運行多個消費者,也無法讓多線線程安全地共享一個消費者。按照規則,一個消費者使用一個線程,如果要在同一個消費者組中運行多個消費者,需要讓每個消費者運行在自己的線程中。最好把消費者的邏輯封裝在自己的對象中,然後使用java的ExecutorService啟動多個線程,使每個消費者運行在自己的線程上,可參考 https://www.confluent.io/blog

一個 consumer group 中有多個 consumer,一個 topic 有多個 partition,所以必然會涉及到 partition 的分配問題,即確定哪個 partition 由哪個 consumer 來消費。

關於如何設置partition值需要考慮的因素

Kafka 有兩種分配策略,一個是 RoundRobin,一個是 Range,默認為Range,當消費者組內消費者發生變化時,會觸發分區分配策略(方法重新分配)。

以上三種現象會使partition的所有權在消費者之間轉移,這樣的行為叫作再均衡。

再均衡的優點

再均衡的缺點

RoundRobin 輪詢方式將分區所有作為一個整體進行 Hash 排序,消費者組內分配分區個數最大差別為 1,是按照組來分的,可以解決多個消費者消費數據不均衡的問題。

但是,當消費者組內訂閱不同主題時,可能造成消費混亂,如下圖所示,Consumer0 訂閱主題 A,Consumer1 訂閱主題 B。

將 A、B 主題的分區排序後分配給消費者組,TopicB 分區中的數據可能 分配到 Consumer0 中。

Range 方式是按照主題來分的,不會產生輪詢方式的消費混亂問題。

但是,如下圖所示,Consumer0、Consumer1 同時訂閱了主題 A 和 B,可能造成消息分配不對等問題,當消費者組內訂閱的主題越多,分區分配可能越不均衡。

由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復後,需要從故障前的位置繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復後繼續消費。

consumer group +topic + partition 唯一確定一個offest

Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中,從 0.9 版本開始,
consumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic 為__consumer_offsets。

你如果特別好奇,實在想看看offset什麼的,也可以執行下面操作:

修改配置文件 consumer.properties

再啟動一個消費者

當消費者崩潰或者有新的消費者加入,那麼就會觸發再均衡(rebalance),完成再均衡後,每個消費者可能會分配到新的分區,而不是之前處理那個,為了能夠繼續之前的工作,消費者需要讀取每個partition最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。

case1:如果提交的偏移量小於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息就會被重復處理。

case2:如果提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息將會丟失。

自動提交的優點是方便,但是可能會重復處理消息

不足:broker在對提交請求作出回應之前,應用程序會一直阻塞,會限制應用程序的吞吐量。

因此,在消費者關閉之前一般會組合使用commitAsync和commitSync提交偏移量。

ConsumerRebalanceListener需要實現的兩個方法

下面的例子演示如何在失去partition的所有權之前通過onPartitionRevoked()方法來提交偏移量。

Consumer有個Rebalance的特性,即重新負載均衡,該特性依賴於一個協調器來實現。每當Consumer Group中有Consumer退出或有新的Consumer加入都會觸發Rebalance。

之所以要重新負載均衡,是為了將退出的Consumer所負責處理的數據再重新分配到組內的其他Consumer上進行處理。或當有新加入的Consumer時,將組內其他Consumer的負載壓力,重新進均勻分配,而不會說新加入一個Consumer就閑在那。

下面就用幾張圖簡單描述一下,各種情況觸發Rebalance時,組內成員是如何與協調器進行交互的。

Tips :圖中的Coordinator是協調器,而generation則類似於樂觀鎖中的版本號,每當成員入組成功就會更新,也是起到一個並發控制的作用。

參考:
https://blog.csdn.net/weixin_46122692/article/details/109270433

http://www.dockone.io/article/9956

https://www.cnblogs.com/sodawoods-blogs/p/8969774.html

https://blog.csdn.net/weixin_44367006/article/details/103075173

https://blog.51cto.com/zero01/2498017

⑦ kafka在java應用中怎麼設置每次只消費一條消息

:數據直接從通信網關過來?那最近每個map得到的數據是怎麼區分的是發數據端按規則把數據配發到每個map?咱整過的一個例子是多個map同時從一張數據表取數進行數據處理在hdfs的輸入目錄給每個map指定一個輸入文件map讀取這個輸入文件

⑧ kafka消費者java版本讀取不到消息怎麼辦

3.啟動服務3.1啟動zookeeper啟動zk有兩種方式,第一種是使用kafka自己帶的一個zk。bin/zookeeper-server-start.shconfig/zookeeper.properties&另一種是使用其它的zookeeper,可以位於本機也可以位於其它地址。這種情況需要修改config下面的sercer.properties裡面的zookeeper地址。例如zookeeper.connect=10.202.4.179:21813.2啟動kafkabin/kafka-server-start.shconfig/server.properties4.創建topicbin/kafka-topics.sh--create--zookeeper10.202.4.179:2181--replication-factor1--partitions1--topictest創建一個名為test的topic,只有一個副本,一個分區。通過list命令查看剛剛創建的topicbin/kafka-topics.sh-list-zookeeper10.202.4.179:21815.啟動procer並發送消息啟動procerbin/kafka-console-procer.sh--broker-listlocalhost:9092--topictest啟動之後就可以發送消息了比如testhelloboy按Ctrl+C退出發送消息6.啟動consumerbin/kafka-console-consumer.sh--zookeeper10.202.4.179:2181--topictest--from-beginning啟動consumer之後就可以在console中看到procer發送的消息了可以開啟兩個終端,一個發送消息,一個接受消息。如果這樣都不行的話,查看zookeeper進程和kafka的topic,一步步排查原因吧。

⑨ kafka消費者java版本讀取不到消息怎麼辦

主要屬性設置如下:
Properties pro = new Properties();
pro.put("auto.offset.reset", "smallest");
pro.put("zookeeper.connect", 「ip:port」);
pro.put("zookeeper.session.timeout.ms", "20000");
pro.put("zookeeper.sync.time.ms", "10000");
pro.put("group.id", consumerId);
pro.put("auto.commit.enable", "true");

⑩ kafka消費者java版本讀取不到消息怎麼辦

3. 啟動服務
3.1 啟動zookeeper
啟動zk有兩種方式,第一種是使用kafka自己帶的一個zk。
bin/zookeeper-server-start.sh config/zookeeper.properties&
另一種是使用其它的zookeeper,可以位於本機也可以位於其它地址。這種情況需要修改config下面的sercer.properties裡面的zookeeper地址
。例如zookeeper.connect=10.202.4.179:2181
3.2 啟動 kafka
bin/kafka-server-start.sh config/server.properties
4.創建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
創建一個名為test的topic,只有一個副本,一個分區。
通過list命令查看剛剛創建的topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.啟動procer並發送消息啟動procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
啟動之後就可以發送消息了
比如
test
hello boy
按Ctrl+C退出發送消息
6.啟動consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
啟動consumer之後就可以在console中看到procer發送的消息了
可以開啟兩個終端,一個發送消息,一個接受消息。
如果這樣都不行的話,查看zookeeper進程和kafka的topic,一步步排查原因吧。

閱讀全文

與kafkajava消費相關的資料

熱點內容
華為交換機dhcp配置命令 瀏覽:314
androidbitmap縮小 瀏覽:270
單片機串口控制燈 瀏覽:83
大訊雲伺服器安裝視頻 瀏覽:783
華為演算法領先世界 瀏覽:653
linux路由重啟 瀏覽:565
php的模板編程 瀏覽:319
編譯器原理與實現書 瀏覽:708
dos選擇命令 瀏覽:16
apm固件編譯到單片機 瀏覽:120
聯通深藍卡都包含什麼app 瀏覽:263
如何判斷網路伺服器正常 瀏覽:649
路由器搭橋遠端伺服器地址是什麼 瀏覽:516
編譯動態庫時會連接依賴庫嗎 瀏覽:708
淘寶手機加密是隨機的嗎 瀏覽:672
解壓包子怎麼裝飾 瀏覽:586
四個數湊24演算法 瀏覽:676
哪一種不是vi編譯器的模式 瀏覽:170
xp在此處打開命令窗口 瀏覽:128
代碼編譯運行用什麼軟體 瀏覽:999