導航:首頁 > 程序命令 > kafka消費命令

kafka消費命令

發布時間:2022-01-15 18:16:40

❶ 怎麼將shell腳本的變數內容,通過kafka命令發送到kafka的topic中呢,求助

kafka $變數名

java工程kafka傳遞自定義對象,消費端獲取到的是null

3. 啟服務
3.1 啟zookeeper
啟zk兩種式第種使用kafka自帶zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另種使用其zookeeper位於本機位於其址種情況需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 啟 kafka
bin/kafka-server-start.sh config/server.properties
4.創建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
創建名testtopic副本區
通list命令查看剛剛創建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.啟procer並發送消息啟procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
啟發送消息

test
hello boy
按Ctrl+C退發送消息
6.啟consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
啟consumerconsole看procer發送消息
啟兩終端發送消息接受消息
都行查看zookeeper進程kafkatopic步步排查原吧

❸ 如何利用pykafka遠程消費 zookeeper+kafka集群 python腳本

#從kafka消費
#consumer_area = topic_area.get_simple_consumer(auto_offset_reset=OffsetType.LATEST)

#從ZOOKEEPER消費
consumer_area = topic_area.get_balanced_consumer(
consumer_group=b'zs_download_04', # 自己命令
auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情況下,設置此變數,表示從最新的開始取
#auto_offset_reset=OffsetType.EARLIEST,
#reset_offset_on_start=True,
auto_commit_enable=True,
#auto_commit_interval_ms=1,
zookeeper_connect=ZK_LIST
)

❹ kafka查看消費了多少條數據

如何查看目前的消費者是否已經讀到最新的數據:

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
#kafka查看topic各個分區的消息的信息
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group ** --topic *** --zookeeper *:2181,*:2181,*:2181/kafka
--zookeeper 那裡是指kafka在zk中的path,即使zk有多個機器,如果在其中一台上執行此命令,顯示連接不上,只寫那台機器的地址埠+kafka的path即可
指定自己的分組 自己消費的topic會顯示kafka總共有多少數據,以及已經被消費了多少條
結果:
GROUP TOPIC PID OFFSET LOGSIZE LAG
消費者組 話題id 分區id 當前已消費的條數 總條數 未消費的條數

注意:以kafkaspout類作為消費者去讀kafka數據,相當於直接從kafka server上取文件,沒有消費者組的概念
每次讀的數據存在自己zk的offet中
所以不能通過上述命令查看

❺ kafka只能用命令來創建topic么

在Kafka上創建一個Topic的步驟:進入伺服器後,找到kafka安裝目錄進入bin文件夾,輸入命令--- 查看kafka現有主題命令:。/kafka-topics.sh --list --zookeeper zk_host:port望採納。

linux 怎樣查看kafka的某 topic數據

1、創建一個需要增加備份因子的topic列表的文件,文件格式是json格式的。

注意事項:

Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

❼ 如何查看kafka命令 找不到

基於0.8.0版本。

##查看topic分布情況kafka-list-topic.sh
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分區情況)
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --topic test (查看test的分區情況)

其實kafka-list-topic.sh裡面就一句
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@
實際是通過
kafka-run-class.sh腳本執行的包kafka.admin下面的類
##創建TOPIC kafka-create-topic.sh
bin/kafka-create-topic.sh --replica 2 --partition 8 --topic test --zookeeper 192.168.197.170:2181,192.168.197.171:2181
創建名為test的topic, 8個分區分別存放數據,數據備份總共2份

bin/kafka-create-topic.sh --replica 1 --partition 1 --topic test2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181
結果 topic: test2 partition: 0 leader: 170 replicas: 170 isr: 170
##重新分配分區kafka-reassign-partitions.sh
這個命令可以分區指定到想要的--broker-list上
bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "171" --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --execute
cat topic-to-move.json
{"topics":
[{"topic": "test2"}],
"version":1
}
##為Topic增加 partition數目kafka-add-partitions.sh
bin/kafka-add-partitions.sh --topic test --partition 2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (為topic test增加2個分區)

##控制台接收消息
bin/kafka-console-consumer.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --from-beginning --topic test
##控制台發送消息
bin/kafka-console-procer.sh --broker-list 192.168.197.170:9092,192.168.197.171: 9092 --topic test
##手動均衡topic, kafka-preferred-replica-election.sh
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --path-to-json-file preferred-click.json

cat preferred-click.json
{
"partitions":
[
{"topic": "click", "partition": 0},
{"topic": "click", "partition": 1},
{"topic": "click", "partition": 2},
{"topic": "click", "partition": 3},
{"topic": "click", "partition": 4},
{"topic": "click", "partition": 5},
{"topic": "click", "partition": 6},
{"topic": "click", "partition": 7},
{"topic": "play", "partition": 0},
{"topic": "play", "partition": 1},
{"topic": "play", "partition": 2},
{"topic": "play", "partition": 3},
{"topic": "play", "partition": 4},
{"topic": "play", "partition": 5},
{"topic": "play", "partition": 6},
{"topic": "play", "partition": 7}

]
}

##刪除topic,慎用,只會刪除zookeeper中的元數據,消息文件須手動刪除
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181

❽ kafka 怎樣查看kafka狀態

輸入以下代碼即可查看kafka狀態:

BROKER_HOST是kafka server的ip地址,PORTt是server的監聽埠。多個host port之間用逗號隔開。

第一條命令是獲取group列表,一般而言,應用是知道消費者group的,通常在應用的配置里,如果已知,該步驟可以省略。

第二條命令是查看具體的消費者group的詳情信息,需要給出group的名稱。

Kafka是由Apache軟體基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。

這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。

對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

❾ 查看storm消費了多少kafka的數據

基於0.8.0版本。 ##查看topic分布情況kafka-list-topic.sh bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分區情況) bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --topic test (查看test的分區情況) 其實kafka-list-topic.sh裡面就一句 exec $(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@ 實際是通過 kafka-run-class.sh腳本執行的包kafka.admin下面的類 ##創建TOPIC kafka-create-topic.sh bin/kafka-create-topic.sh --replica 2 --partition 8 --topic test --zookeeper 192.168.197.170:2181,192.168.197.171:2181 創建名為test的topic, 8個分區分別存放數據,數據備份總共2份 bin/kafka-create-topic.sh --replica 1 --partition 1 --topic test2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181 結果 topic: test2 partition: 0 leader: 170 replicas: 170 isr: 170 ##重新分配分區kafka-reassign-partitions.sh 這個命令可以分區指定到想要的--broker-list上 bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "171" --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --execute cat topic-to-move.json {"topics": [{"topic": "test2"}], "version":1 } ##為Topic增加 partition數目kafka-add-partitions.sh bin/kafka-add-partitions.sh --topic test --partition 2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (為topic test增加2個分區) ##控制台接收消息 bin/kafka-console-consumer.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --from-beginning --topic test ##控制台發送消息 bin/kafka-console-procer.sh --broker-list 192.168.197.170:9092,192.168.197.171: 9092 --topic test ##手動均衡topic, kafka-preferred-replica-election.sh bin/kafka-preferred-replica-election.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --path-to-json-file preferred-click.json cat preferred-click.json { "partitions": [ {"topic": "click", "partition": 0}, {"topic": "click", "partition": 1}, {"topic": "click", "partition": 2}, {"topic": "click", "partition": 3}, {"topic": "click", "partition": 4}, {"topic": "click", "partition": 5}, {"topic": "click", "partition": 6}, {"topic": "click", "partition": 7}, {"topic": "play", "partition": 0}, {"topic": "play", "partition": 1}, {"topic": "play", "partition": 2}, {"topic": "play", "partition": 3}, {"topic": "play", "partition": 4}, {"topic": "play", "partition": 5}, {"topic": "play", "partition": 6}, {"topic": "play", "partition": 7} ] } ##刪除topic,慎用,只會刪除zookeeper中的元數據,消息文件須手動刪除 bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181

❿ 怎麼查看kafka集群中所有的broker節點

在Zookeeper的官 網上有這么一句話:ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
這大概描述了Zookeeper主要可以干哪些事情:配置管理,名字服務,提供分布式同步以及集群管理。那這些服務又到底是什麼呢?我們為什麼需要這樣的服務?我們又為什麼要使用Zookeeper來實現呢,使用Zookeeper有什麼優勢?接下來我會挨個介紹這些到底是什麼,以及有哪些開源系統中使用了。
配置管理
在我們的應用中除了代碼外,還有一些就是各種配置。比如資料庫連接等。一般我們都是使用配置文件的方式,在代碼中引入這些配置文件。但是當我們只有一種配置,只有一台伺服器,並且不經常修改的時候,使用配置文件是一個很好的做法,但是如果我們配置非常多,有很多伺服器都需要這個配置,而且還可能是動態的話使用配置文件就不是個好主意了。這個時候往往需要尋找一種集中管理配置的方法,我們在這個集中的地方修改了配置,所有對這個配置感興趣的都可以獲得變更。比如我們可以把配置放在資料庫里,然後所有需要配置的服務都去這個資料庫讀取配置。但是,因為很多服務的正常運行都非常依賴這個配置,所以需要這個集中提供配置服務的服務具備很高的可靠性。一般我們可以用一個集群來提供這個配置服務,但是用集群提升可靠性,那如何保證配置在集群中的一致性呢? 這個時候就需要使用一種實現了一致性協議的服務了。Zookeeper就是這種服務,它使用Zab這種一致性協議來提供一致性。現在有很多開源項目使用Zookeeper來維護配置,比如在HBase中,客戶端就是連接一個Zookeeper,獲得必要的HBase集群的配置信息,然後才可以進一步操作。還有在開源的消息隊列Kafka中,也使用Zookeeper來維護broker的信息。在Alibaba開源的SOA框架Dubbo中也廣泛的使用Zookeeper管理一些配置來實現服務治理。
名字服務
名字服務這個就很好理解了。比如為了通過網路訪問一個系統,我們得知道對方的IP地址,但是IP地址對人非常不友好,這個時候我們就需要使用域名來訪問。但是計算機是不能是別域名的。怎麼辦呢?如果我們每台機器里都備有一份域名到IP地址的映射,這個倒是能解決一部分問題,但是如果域名對應的IP發生變化了又該怎麼辦呢?於是我們有了DNS這個東西。我們只需要訪問一個大家熟知的(known)的點,它就會告訴你這個域名對應的IP是什麼。在我們的應用中也會存在很多這類問題,特別是在我們的服務特別多的時候,如果我們在本地保存服務的地址的時候將非常不方便,但是如果我們只需要訪問一個大家都熟知的訪問點,這里提供統一的入口,那麼維護起來將方便得多了。
分布式鎖
其實在第一篇文章中已經介紹了Zookeeper是一個分布式協調服務。這樣我們就可以利用Zookeeper來協調多個分布式進程之間的活動。比如在一個分布式環境中,為了提高可靠性,我們的集群的每台伺服器上都部署著同樣的服務。但是,一件事情如果集群中的每個伺服器都進行的話,那相互之間就要協調,編程起來將非常復雜。而如果我們只讓一個服務進行操作,那又存在單點。通常還有一種做法就是使用分布式鎖,在某個時刻只讓一個服務去幹活,當這台服務出問題的時候鎖釋放,立即fail over到另外的服務。這在很多分布式系統中都是這么做,這種設計有一個更好聽的名字叫Leader Election(leader選舉)。比如HBase的Master就是採用這種機制。但要注意的是分布式鎖跟同一個進程的鎖還是有區別的,所以使用的時候要比同一個進程里的鎖更謹慎的使用。
集群管理
在分布式的集群中,經常會由於各種原因,比如硬體故障,軟體故障,網路問題,有些節點會進進出出。有新的節點加入進來,也有老的節點退出集群。這個時候,集群中其他機器需要感知到這種變化,然後根據這種變化做出對應的決策。比如我們是一個分布式存儲系統,有一個中央控制節點負責存儲的分配,當有新的存儲進來的時候我們要根據現在集群目前的狀態來分配存儲節點。這個時候我們就需要動態感知到集群目前的狀態。還有,比如一個分布式的SOA架構中,服務是一個集群提供的,當消費者訪問某個服務時,就需要採用某種機制發現現在有哪些節點可以提供該服務(這也稱之為服務發現,比如Alibaba開源的SOA框架Dubbo就採用了Zookeeper作為服務發現的底層機制)。還有開源的Kafka隊列就採用了Zookeeper作為Cosnumer的上下線管理。
後記
在這篇文章中,列出了一些Zookeeper可以提供的服務,並給出了一些開源系統裡面的實例。後面我們從Zookeeper的安裝配置開始,並用示例進一步介紹Zookeeper如何使用。
(轉載)

閱讀全文

與kafka消費命令相關的資料

熱點內容
為什麼vc6編譯是灰色 瀏覽:385
python音標讀法 瀏覽:573
反轉語句python 瀏覽:21
哪個app搞英雄聯盟手游活動 瀏覽:583
如何查看郵箱收發伺服器 瀏覽:519
極簡歐洲史中文版pdf 瀏覽:908
python顯示變數值 瀏覽:387
副路由器為什麼要關伺服器 瀏覽:575
國家反詐騙app蘋果怎麼設置 瀏覽:464
我的世界如何用指令造伺服器方熊 瀏覽:304
鴨題庫是哪裡的培訓機構app 瀏覽:689
如何對伺服器取證 瀏覽:440
有什麼系統像友價源碼 瀏覽:570
圓柱彈簧壓縮量 瀏覽:811
我的世界國際版為什麼沒法進去伺服器 瀏覽:103
我的世界如何創造一個伺服器地址 瀏覽:837
皮皮蝦app怎麼玩視頻教程 瀏覽:253
python整型轉化字元串 瀏覽:804
android數據共享方式 瀏覽:375
編譯環境控制台 瀏覽:621