導航:首頁 > 配伺服器 > kafka伺服器需要什麼配置

kafka伺服器需要什麼配置

發布時間:2023-01-22 14:41:49

① kafka參數配置

配置日誌落在哪些磁碟

配置使用哪個zookeeper

注意 ,只需要在最後面追加一個/chroot即可

具體配置格式 協議名稱,主機名稱, 埠號 寫法 protocol://hostname:port

這個是broker的全局配置,也可以在創建topic的時候 指定每個topic的配置,默認topic的配置覆蓋broker的配置。

kafka使用的scale編寫的,最終是通過jvm運行,所以需要設置jvm參數對kafka調優。kafka啟動的時候會讀取兩個環境變數

kafka並不需要設置太多的OS參數,通常需要關注下面幾個:

② Kafka Connect的安裝和配置

       在使用Kafka Connect時,需要注意一些事項,以幫助你構建適應長期需求的datapipeline。本章旨在提供有關的一些上下文。

       要開始使用Kafka Connect,只有一個硬性的先決條件:一個Kafka的broker集群。然而,隨著集群增長,有幾個問題需要提前考慮:

       在開始之前,確定哪種模式最適合您的環境非常有用。 對於適合單個代理的環境(例如從web伺服器向Kafka發送日誌),standalone模式非常適合。在單個source或sink可能需要大量數據的用例中(例如,將數據從Kafka發送到HDFS),分布式模式在可伸縮性方面更加靈活,並提供了高可用性服務,從而最小化停機時間。

       Kafka Connect插件是一組jar文件,Kafka Connect可以在其中找到一個或多個connector、transform、以及converter的實現。Kafka Connect將每個插件彼此隔離,這樣一個插件中的庫就不會受到其他插件庫的影響,這點非常重要。

Kafka Connect plugin是:
(1)在一個uber jar文件中包含插件及所有第三方依賴;或
(2)一個包含jar包和第三方依賴的目錄。

       Kafka Connect使用plugin path找到插件,這是Kafka Connect在worker配置文件中定義的一個以逗號分隔的目錄列表。要安裝插件,請將目錄或uber jar放在plugin path路徑中列出的目錄中。

        舉個例子 ,我們在每台機器上創建一個/usr/local/share/kafka/plugins目錄,然後將我們所有的插件jar或插件目錄放入其中。然後在worker的配置文件中加入如下配置項:

       現在,當我們啟動worker時,Kafka Connect可以發現這些插件中定義的所有connector、transform以及converter。Kafka Connect顯式地避免了其他插件中的庫, 並防止了沖突。

       如果要在同一個機器上運行多個standalone實例,有一些參數需要是獨一無二的:
(1)offset.storage.file.filename:connector偏移量的存儲。
(2)rest.port:用於監聽http請求的rest介面所佔用的埠。

       connector和task的配置,offsets和狀態會存儲在Kafka的內部主題中,Kafka Connect會自動創建這些主題,且所有topic都使用了壓縮清理策略。
       如果要手動創建這些topic,推薦使用如下命令

這里只列出一些有疑問的。

       配置了group.id的worker會自動發現彼此並形成集群。一個集群中的所有worker必須使用相同的三個Kafka topic來共享配置、偏移量以及狀態,所有worker必須配置相同的config.storage.topic、offset.storage.topic以及status.storage.topic。

       每個converter實現類都有自己的相關配置需求。下面的例子展示了一個worker屬性文件,其中使用的AvroConverter需要將Schema Registry的url作為屬性進行傳遞。

注意: 除了其配置覆蓋這些配置的connector,worker上運行的所有connector都使用這些converter。

③ Kafka身份認證與許可權控制配置

編輯原有配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/server.properties

listeners=SASL_PLAINTEXT://192.168.43.209:9092

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

allow.everyone.if.no.acl.found=true

super.users=User:root

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

創建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf

KafkaServer{

       org.apache.kafka.common.security.plain.PlainLoginMole required

        username="kafka"

        password="kafkapswd"

        user_ kafkaa(用戶名)="kafkaapswd"(密碼)

        user_ kafkab(用戶名)=" kafkabpswd"(密碼)

user_ kafkac(用戶名)=" kafkacpswd"(密碼)

user_ kafkad(用戶名)=" kafkadpswd"(密碼);

};

修改執行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh

if ["x$KAFKA_OPTS" ]; then

    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"

fi

修改執行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-run-class.sh

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf'

if ["x$DAEMON_MODE" = "xtrue" ]; then

  nohup $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS  $KAFKA_JMX_OPTS$KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" >"$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &

else

  exec $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH$KAFKA_OPTS "$@"

fi

創建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf

KafkaClient{

       org.apache.kafka.common.security.plain.PlainLoginMole required

        username=" kafkaa"

        password=" kafkaapswd";

};

修改執行文件

vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh

vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-procer.sh

if ["x$KAFKA_OPTS" ]; then

    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"

fi

運行jar包的伺服器的指定路徑下必須有kafka_ client_ jaas.conf文件

在程序中添加如下配置

System.setProperty("java.security.auth.login.config","xx/kafka_client_jaas.conf");

props.put("security.protocol","SASL_PLAINTEXT");

props.put("sasl.mechanism","PLAIN");

問題描述:發布消息、訂閱消息時,出現如下錯誤,WARN [Consumer clientId=consumer-1, groupId=console-consumer-20752]Error while fetching metadata with correlation id 2 :{test2=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

解決方法:各客戶端的用戶名設置為相同,多個客戶端同時管理會產生沖突。

④ kafka配置參數詳解

kafka的配置分為 broker、procter、consumer三個不同的配置

一 BROKER 的全局配置
最為核心的三個配置 broker.id、log.dir、zookeeper.connect 。

------------------------------------------- 系統 相關 -------------------------------------------

broker.id =1

log.dirs = /tmp/kafka-logs

port =6667

message.max.bytes =1000000

num.network.threads =3

num.io.threads =8

background.threads =4

queued.max.requests =500

host.name

advertised.host.name

advertised.port

socket.send.buffer.bytes =100*1024

socket.receive.buffer.bytes =100*1024

socket.request.max.bytes =100 1024 1024

------------------------------------------- LOG 相關 -------------------------------------------

log.segment.bytes =1024 1024 1024

log.roll.hours =24*7

log.cleanup.policy = delete

log.retention.minutes=7days

指定日誌每隔多久檢查看是否可以被刪除,默認1分鍾
log.cleanup.interval.mins=1

log.retention.bytes=-1

log.retention.check.interval.ms=5minutes

log.cleaner.enable=false

log.cleaner.threads =1

log.cleaner.io.max.bytes.per.second=None

log.cleaner.depe.buffer.size=500 1024 1024

log.cleaner.io.buffer.size=512*1024

log.cleaner.io.buffer.load.factor =0.9

log.cleaner.backoff.ms =15000

log.cleaner.min.cleanable.ratio=0.5

log.cleaner.delete.retention.ms =1day

log.index.size.max.bytes =10 1024 1024

log.index.interval.bytes =4096

log.flush.interval.messages=None

log.flush.scheler.interval.ms =3000

log.flush.interval.ms = None

log.delete.delay.ms =60000

log.flush.offset.checkpoint.interval.ms =60000

------------------------------------------- TOPIC 相關 -------------------------------------------

auto.create.topics.enable =true

default.replication.factor =1

num.partitions =1

實例 --replication-factor3--partitions1--topic replicated-topic :名稱replicated-topic有一個分區,分區被復制到三個broker上。

----------------------------------復制(Leader、replicas) 相關 ----------------------------------

controller.socket.timeout.ms =30000

controller.message.queue.size=10

replica.lag.time.max.ms =10000

replica.lag.max.messages =4000

replica.socket.timeout.ms=30*1000

replica.socket.receive.buffer.bytes=64*1024

replica.fetch.max.bytes =1024*1024

replica.fetch.wait.max.ms =500

replica.fetch.min.bytes =1

num.replica.fetchers=1

replica.high.watermark.checkpoint.interval.ms =5000

controlled.shutdown.enable =false

controlled.shutdown.max.retries =3

controlled.shutdown.retry.backoff.ms =5000

auto.leader.rebalance.enable =false

leader.imbalance.per.broker.percentage =10

leader.imbalance.check.interval.seconds =300

offset.metadata.max.bytes

----------------------------------ZooKeeper 相關----------------------------------

zookeeper.connect = localhost:2181

zookeeper.session.timeout.ms=6000

zookeeper.connection.timeout.ms =6000

zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每個topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1

修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000

刪除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes
二 CONSUMER 配置
最為核心的配置是group.id、zookeeper.connect

group.id

consumer.id

client.id = group id value

zookeeper.connect=localhost:2182

zookeeper.session.timeout.ms =6000

zookeeper.connection.timeout.ms =6000

zookeeper.sync.time.ms =2000

auto.offset.reset = largest

socket.timeout.ms=30*1000

socket.receive.buffer.bytes=64*1024

fetch.message.max.bytes =1024*1024

auto.commit.enable =true

auto.commit.interval.ms =60*1000

queued.max.message.chunks =10

rebalance.max.retries =4

rebalance.backoff.ms =2000

refresh.leader.backoff.ms

fetch.min.bytes =1

fetch.wait.max.ms =100

consumer.timeout.ms = -1

三 PRODUCER 的配置
比較核心的配置:metadata.broker.list、request.required.acks、procer.type、serializer.class

metadata.broker.list

request.required.acks =0

request.timeout.ms =10000

send.buffer.bytes=100*1024

key.serializer.class

partitioner.class=kafka.procer.DefaultPartitioner

compression.codec = none

compressed.topics=null

message.send.max.retries =3

retry.backoff.ms =100

topic.metadata.refresh.interval.ms =600*1000

client.id=""

------------------------------------------- 消息模式 相關 -------------------------------------------

procer.type=sync

queue.buffering.max.ms =5000

queue.buffering.max.messages =10000

queue.enqueue.timeout.ms = -1

batch.num.messages=200

serializer.class= kafka.serializer.DefaultEncoder

⑤ Kafka相關內容總結(Kafka集群搭建手記)

Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規范的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成為Procer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是procer和consumer都依賴於zookeeper來保證系統可用性集群保存一些meta信息。
入門請參照: https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html
在此不再贅述。

這部分不是本文的重點,但是kafka需要用到kafka集群,所以先搭建kafka集群。
從kafka官方文檔看到,kafka似乎在未來的版本希望拋棄zookeep集群,自己維護集群的一致性,拭目以待吧。
我們搭建集群使用的是三台同機房的機器,因為zookeeper不怎麼占資源也不怎麼占空間(我們的業務目前比較簡單),所以三台機器上都搭建了zookeeper集群。
搭建zookeeper集群沒什麼難度,參考文檔: http://www.cnblogs.com/huangxincheng/p/5654170.html
下面列一下我的配置並解析:

一共用三台物理機器,搭建一個Kafka集群。
每台伺服器的硬碟劃分都是一樣的,每個獨立的物理磁碟掛在一個單獨的分區裡面,這樣很方便用於Kafka多個partition的數據讀寫與冗餘。
/data1比較小,為了不成為集群的瓶頸,所以/data1用於存放kafka以及Zookeeper
每台機器的磁碟分布如下:

下面是kafka的簡單配置,三台伺服器都一樣,如有不一致的在下文有說明。
kafka安裝在目錄/usr/local/kafka/下,下面的說明以10.1.xxx.57為例。

最重要的配置文件server.properties,需要配置的信息如下:

從上面的配置看到,kafka集群不需要像hadoop集群那樣,配置ssh通訊,而且一個kafka伺服器(官方文檔稱之為broker,下面統一使用這個稱呼)並不知道其他的kafka伺服器的存在,因此你需要逐個broker去啟動kafka。各個broker根據自己的配置,會自動去配置文件上的zk伺服器報到,這就是一個有zk伺服器粘合起來的kafka集群。
我寫了一個啟動腳本,放在 /usr/local/kafka/bin 下面。啟動腳本每個broker都一樣:

如同kafka集群裡面每一個broker都需要單獨啟動一樣,kafka集群裡面每一個broker都需要單獨關閉。
官方給出的關閉腳本是單獨運行 bin/kafka-server-stop.sh
但是我運行的結果是無法關閉。打開腳本一看,才發現是最簡單的辦法,發一個TERM信號到kafka的java進程,官方腳本給出的grep有點問題。
發信號之後,一直tail著kafka日誌,看到正常關閉。

指定zookeeper伺服器,topic名稱是LvsKafka(注意topic名稱不能有英文句號(.)和下劃線(_),否則會通不過,理由是名稱會沖突,下文對此略有解析)
replication-factor指出重復因子是2,也就是每條數據有兩個拷貝,可靠性考慮。
partitions 指出需要多少個partition,數據量大的多一點,無論生產和消費,這是負載均衡和高並發的需要。

可以看到剛才新建的24個partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59這台機器。
建立topic時我們指出需要2個拷貝,從上面的輸出的Replicas欄位看到,這兩個拷貝放在59,58兩個機器,也就是10.1.xxx.59和10.1.xxx.58.
Isr表示當前partition的所有拷貝所在的機器中,哪些是還活著(可以提供服務)的。現在是59和58都還存活。

這個命令另外還會看到一些類似於下面的內容:

__consumer_offsets到底是什麼呢?其實就是客戶端的消費進度,客戶端會定時上報到kafka集群,而kafka集群會把每個客戶端的消費進度放入一個自己內部的topic中,這個topic就是__consumer_offsets。我查看過__consumer_offsets的內容,其實就是每個客戶端的消費進度作為一條消息,放入__consumer_offsets這個topic中。
這里給了我們兩個提示:
1、kafka自己管理客戶端的消費進度,而不是依靠zk,這就是kafka官方文檔說的kafka未來會拋棄zk的底氣之一;
2、留意到這個kafka自己的topic是帶下劃線的,也就是,kafka擔心我們自己建的topic如果帶下劃線的話會跟這些內部自用的topic沖突;

⑥ kafka術語和配置介紹

procer 是生產者,負責消息生產,上遊程序中按照標準的消息格式組裝(按照每個消息事件的欄位定義)發送到指定的topic。procer生產消息的時候,不會因為consumer處理能力不夠,而阻塞procer的生產。consumer會從指定的topic 拉取消息,然後處理消費,並提交offset(消息處理偏移量,消費掉的消息並不會主動刪除,而是kafka系統根據保存周期自動消除)。

topic是消費分類存儲的隊列,可以按照消息類型來分topic存儲。

replication是topic復制副本個數,用於解決數據丟失,防止leader topic宕機後,其他副本可以快代替。

broker是緩存代理,Kafka集群中的一台或多台伺服器統稱broker,用來保存procer發送的消息。Broker沒有副本機制,一旦broker宕機,該broker的消息將都不可用。

partition是topic的物理分組,在創建topic的時候,可以指定partition 數量。每個partition是邏輯有序的,保證每個消息都是順序插入的,而且每個消息的offset在不同partition的是唯一不同的

偏移量。kafka為每條在分區的消息保存一個偏移量offset,這也是消費者在分區的位置。比如一個偏移量是5的消費者,表示已經消費了從0-4偏移量的消息,下一個要消費的消息的偏移量是5。每次消息處理完後,要麼主動提交offset,要麼自動提交,把offset偏移到下一位,如處理offset=6消息。在kafka配置中,如果enable_auto_commit=True和auto_commit_interval_ms=xx,那表示每xx 毫秒自動提交偏移量

分組。是指在消費同一topic的不同consumer。每個consumer都有唯一的groupId,同一groupId 屬於同一個group。不同groupId的consumer相互不影響。對於一個topic,同一個group的consumer數量不能超過 partition數量。比如,Topic A 有 16個partition,某一個group下有2個consumer,那2個consumer分別消費8個partition,而這個group的consumer數量最多不能超過16個。

kafka的配置主要分四類,分別是zookeeper、server、consumer、procer。其他的配置可以忽略。

zk的配置比較簡單,也可以默認不改.dataDir是zk存儲節點配置的目錄地址,clientPort是zk啟動的埠,默認2181,maxClientCnxns是限制ip的連接此處,設置0表示無連接次數,一般情況根據業務部署情況,配置合理的值。

⑦ Kafka生產者開發,原理分析,以及參數配置

生產者開發(基於java),生產者發送消息主要有以下三步

那麼我們進行抽象,大致可以得到這兩個類。

另外Kafka為了表現以下封裝的特性,把准備生產者的參數配成了一個Properties類,
以這個類為KafkaProcer構造函數入參。

那麼KafkaProcer的參數具體可以配置什麼呢?

由123步可知,可以配置攔截器,序列化器,分區器。
這些都可以自己實現特定介面(ProcerInterceptor,Serializer,Partioner),
然後放到Properties裡面,最後給KafkaProcer
攔截器就是對ProcerRecord做一些處理,然後返回處理過的新的ProcerRecord(自定義攔截策略)
序列化器是要講java對象轉成byte[]數組然後進行網路傳輸(自己定義序列化策略)
分區器就是為消息選擇分區(這里自己可以設計分區策略)

再次回到這張圖

可以看到,有兩個線程在完成消息的發送,一個是主線程,一個是Sender線程。
主線程經過123步後,會將同一個partition的多個Record封裝(壓縮)到一個ProcerBatch對象中,
這樣的目的是方便傳輸,提高效率,RecordAccumulator裡面維持著一個雙端ProcerBatch隊列數組,

然後Sender線程從隊頭取ProcerBatch封裝成Request,這里設計到一個邏輯到物理的轉換。
分區是邏輯的,而broker才是物理的,一個區對應一個broker,所以要轉換。

另外Sender線程裡面維持了一個以Nodeid(就是對應broker)為Key,Deque<Request>為值的Map,
這裡面的Request指的是那種沒有Response的Request。一旦有了Response就會清理掉的。

這個是由通過leastLoadedNode節點實現的,不多說了。

其實除了123步中的參數,還有其它參數,這里就說一個
ack
acks=1。默認值即為1。生產者發送消息之後,只要分區的leader副本成功寫入消息,那麼它就會收到來自服務端的成功響應。
acks=0。生產者發送消息之後不需要等待任何服務端的響應。
acks=-1或acks=all。生產者在消息發送之後,需要等待ISR中的所有副本都成功寫入消息之後才能夠收到來自服務端的成功響應。

閱讀全文

與kafka伺服器需要什麼配置相關的資料

熱點內容
怎麼用命令方塊控制僵屍 瀏覽:774
大型雲伺服器有哪些 瀏覽:466
解壓版三國街機 瀏覽:423
去中心化app裡麵包含什麼 瀏覽:948
密鑰安裝命令行 瀏覽:505
文獻編譯英文 瀏覽:659
php調用瀏覽器 瀏覽:527
數控車床編程初學實例 瀏覽:949
cad中篩選命令是什麼 瀏覽:800
數控銑床法蘭克編程 瀏覽:330
怎麼樣分解壓縮包圖標 瀏覽:619
php兩年工作經驗簡歷 瀏覽:765
怎麼提前解壓房貸 瀏覽:700
反詐宣傳app哪裡可以拿到用戶資料 瀏覽:856
華為交換機命令配置 瀏覽:12
電機pid演算法實例c語言 瀏覽:974
安裝ue5未找到金屬編譯器 瀏覽:965
l1壓縮性骨折微創手術 瀏覽:617
看電腦配置命令 瀏覽:110
單片機調用db數值偏移量 瀏覽:447