❶ kafka在windows啟動下提示:命令語法不正確
Kafka是由LinkedIn設計的一個高吞吐量、分布式、基於發布訂閱模式的消息系統,使用Scala編寫,它以可水平擴展、可靠性、非同步通信和高吞吐率等特性而被廣泛使用。目前越來越多的開源分布式處理系統都支持與Kafka集成,其中Spark Streaming作為後端流引擎配合Kafka作為前端消息系統正成為當前流處理系統的主流架構之一。然而,當下越來越多的安全漏洞、數據泄露等問題的爆發,安全正成為系統選型不得不考慮的問題,Kafka由於其安全機制的匱乏,也導致其在數據敏感行業的部署存在嚴重的安全隱患。本文將圍繞Kafka,先介紹其整體架構和關鍵概念,再深入分析其架構之中存在的安全問題,最後分享下Transwarp在Kafka安全性上所做的工作及其使用方法。
❷ kafka的怪事
1、從zookeerer刪除信息./bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper 10.0.1.10:2181,10.0.1.11:2181,10.0.1.12:2181 --topic test成功後返回信息:deletion succeeded!2、利用JPS命令查看kafka和zookeeper進程,kill掉QuorumPeerMain和Kafka進程3、從kafka的log.dirs目錄刪除文件,可以看到多個子目錄名字如test-0,test-1…test-n(就是你topic的partition個數)進入到kafka的log.dirs目錄,執行rm –fr test-0……test-n(4) 修改日誌目錄的recovery-point-offset-checkpoint和replication-offset-checkpoint文件(要小心刪除,否則待會kafka不能正常啟動起來)replication-offset-checkpoint格式如下:04(partition總數)test 0 0test 3 0hehe 0 0hehe 1 0修改後如下:02(partition總數)hehe 0 0hehe 1 0把含有test行全部去掉,並且把partition總數修改為減去test的partition的剩餘數目,同理recovery-point-offset-checkpoint也是這樣修改。完成後就可以正常啟動zookeeper和kafka。
❸ 怎麼設置kafka topic數據存儲時間
1、Kafka創建topic命令很簡單,一條命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test 。
❹ 怎麼徹底刪除kafka的topic,然後重建
server.properties 中添加delete.topic.enable=true
執行D:\Developer\kafka_2.10-0.10.0.0\bin\windows>kafka-topics.bat --zookeeper 127.0.0.1:2181 --delete --topic TEST-TOPIC
執行zookeeper-server-stop.bat,後在重啟kafka
重新發送消息即可,會自動根據配置的partitions重建
注意:不執行1步驟,就是假刪除。
❺ kafka 重啟會把pagecache清理嗎
配置zookeeper集群和kafka集群中配置文件添加該伺服器的ip,類似的其他兩個配置文件中內容就不再寫了。因為同時打開了防火牆,修改完配置文件後重啟測試..
❻ kafka集群重啟,怎樣確
1、查詢當前線程是否包含kafka
2、查詢當前埠佔用線程是否歸屬kafka
3、開啟consumer線程查看運行
4、藉助第三方監控工具查看集群運行情況
❼ 在伺服器里怎麼查看kafka創建了哪些主題
進入伺服器後,找到kafka安裝目錄進入bin文件夾,輸入命令---
查看kafka現有主題命令:。/kafka-topics.sh --list --zookeeper zk_host:port
❽ Kafka總學不明白:
kafka的常用命令
新建一個topic
./bin/kafka-topics.sh --create --zookeeper hadoop04:2181,hadoop05:2181,hadoop06:2181 --partition 3 --replication-factor 3 --topic test-0
test-0是topic的名字
replication-factor 3 副本數
topic存儲的是元數據,通過它找到真實的數據
改變分區
./bin/kafka-topics.sh --alter --zookeeper 192.168.14.131:2181,192.168.14.131:2182,192.168.14.131:2183 --partition 5 --topic test03
分區信息查詢
./bin/kafka-topics.sh --describe --zookeeper hadoop04:2181,hadoop05:2181,hadoop06:2181 --topic test-0
結果為:leader主分區(broker Id) replicationfactor副本數 Isr是否存活(存儲的順序)
Topic:test-0 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test-0 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: test-0 Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test-0 Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
查詢語句:
bin/kafka-topics.sh --list --zookeeper hadoop04:2181,hadoop05:2181,hadoop06:2181
刪除語句
bin/kafka-topics.sh --delete --zookeeper hadoop04:2181,hadoop05:2181,hadoop06:2181 --topic test-0
結果是test-0 - marked for deletion
表示不能真正的刪除只是做了個標記要刪除取zookeeper去真正刪除(這個配置如果設置為true可以真正刪除,設置false不能刪除)
#刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除
delete.topic.enable=false
rmr /brokers/topics/test-0 ,刪除client中的brokers中topic對應的
rmr /config/topics/test-0 ,刪配置
rmr /admin/delete_topics,刪除admin中的標記為刪除的topics
分區設置後只可以增加不可以減少
六API操作
啟動生產者
./bin/kafka-console-procer.sh --broker-list 192.168.14.131:9092,192.168.14.132:9092,192.168.14.133:9092 --topic test03
啟動消費者
./bin/kafka-console-consumer.sh --zookeeper 192.168.247.131:2181,192.168.247.132:2181,192.168.247.133:2181 --topic test03 --from-beginning
❾ 如何查看kafka命令 找不到
基於0.8.0版本。
##查看topic分布情況kafka-list-topic.sh
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分區情況)
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --topic test (查看test的分區情況)
其實kafka-list-topic.sh裡面就一句
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@
實際是通過
kafka-run-class.sh腳本執行的包kafka.admin下面的類
##創建TOPIC kafka-create-topic.sh
bin/kafka-create-topic.sh --replica 2 --partition 8 --topic test --zookeeper 192.168.197.170:2181,192.168.197.171:2181
創建名為test的topic, 8個分區分別存放數據,數據備份總共2份
bin/kafka-create-topic.sh --replica 1 --partition 1 --topic test2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181
結果 topic: test2 partition: 0 leader: 170 replicas: 170 isr: 170
##重新分配分區kafka-reassign-partitions.sh
這個命令可以分區指定到想要的--broker-list上
bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "171" --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --execute
cat topic-to-move.json
{"topics":
[{"topic": "test2"}],
"version":1
}
##為Topic增加 partition數目kafka-add-partitions.sh
bin/kafka-add-partitions.sh --topic test --partition 2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (為topic test增加2個分區)
##控制台接收消息
bin/kafka-console-consumer.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --from-beginning --topic test
##控制台發送消息
bin/kafka-console-procer.sh --broker-list 192.168.197.170:9092,192.168.197.171: 9092 --topic test
##手動均衡topic, kafka-preferred-replica-election.sh
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --path-to-json-file preferred-click.json
cat preferred-click.json
{
"partitions":
[
{"topic": "click", "partition": 0},
{"topic": "click", "partition": 1},
{"topic": "click", "partition": 2},
{"topic": "click", "partition": 3},
{"topic": "click", "partition": 4},
{"topic": "click", "partition": 5},
{"topic": "click", "partition": 6},
{"topic": "click", "partition": 7},
{"topic": "play", "partition": 0},
{"topic": "play", "partition": 1},
{"topic": "play", "partition": 2},
{"topic": "play", "partition": 3},
{"topic": "play", "partition": 4},
{"topic": "play", "partition": 5},
{"topic": "play", "partition": 6},
{"topic": "play", "partition": 7}
]
}
##刪除topic,慎用,只會刪除zookeeper中的元數據,消息文件須手動刪除
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181
❿ kafka為什麼會自動關閉
Kafka在啟動一段時間後,如果出現服務自動關閉情況,可在啟動kafka的時使用守護進程模式啟動,即在原啟動命令中加 -daemon
kafka-server-start.sh -daemon config/server.properties &
原因參考:
kafka-run-class.sh
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
#使用daemon執行命令
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
#不使用daemon執行命令
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi