導航:首頁 > 配伺服器 > 如何將kafka部署到伺服器上

如何將kafka部署到伺服器上

發布時間:2023-01-27 01:35:07

Ⅰ Kafka集群部署(Docker容器的方式)

文章主要介紹以docker容器的方式部署kafka集群。

上述配置文件中的server.x,數字x對應到data/myid文件中的值。三台機器x的值分別就是1,2,3。參數詳細說明請參考 官網文檔 。

1.--net=host: 容器與主機共享同一Network Namespace,即容器與網路看到的是相同的網路視圖(host模式存在一定的風險,對安全要求很高的生產環境最好不要用host模式,應考慮除此之外的其他幾種模式)
2.-v: 指定主機到容器的目錄映射關系
這樣就以容器的方式啟動了zookeeper的服務,可以通過 "docker exec -it zookeeper bash" 命令進入容器中進行一些操作,例如查看服務啟動是否正常。也可以通過查看2181埠是否被監聽判斷zookeeper的服務是否運行

詳細的參數配置說明請參考 官方文檔 ,參數不僅可以通過上述文件的方式來配置,也可以通過容器環境變數的方式來配置,這里結合兩種方式使用。

1.KAFKA_ADVERTISED_HOST_NAME、KAFKA_BROKER_ID的值要結合每台機器自身設置
2./etc/hosts文件中最好配置ip與hostname的映射關系,否則會報出如下錯誤" Error: Exception thrown by the agent : java.net.MalformedURLException: Local host name unknown: java.net.UnknownHostException: node0: node0: System error "
3.通過-e 指定的環境變數與在server.properties中配置的選項其效果是一樣的
4.配置文件中的選項若要通過環境變數來指定,方式為:如broker.id對應KAFKA_BROKER_ID,類似的log.dirs對應KAFKA_LOG_DIRS
5.KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"指java堆內存大小的設置,6G大小是kafka官網給出的數值,此數值要結合機器的內存大小給出。超過6G的內存,可以設置為6G;若機器的內存低於6G而設置6G,則會報錯。
5.啟動成功後,可以通過"docker logs kafka"命令查看日誌

1.ZK_HOSTS:ZooKeeper訪問地址(需指定機器的ip,localhost:2181或127.0.0.1:2181均會報 "java.net.ConnectException: Connection refused" 異常)

Ⅱ Kafka 基礎原理及工作流程簡述

Kafka 工作流程

基礎總結:

1)broker :broker代表kafka的節點, Broker是分布式部署並且相互之間相互獨立的, 啟動的時候向zookeeper 注冊,在Zookeeper上會有一個專門 用來進行Broker伺服器列表記錄 的節點:/brokers/ids。每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創建屬於自己的節點,如/brokers/ids/[0...N]。Kafka使用了全局唯一的數字來指代每個Broker伺服器,不同的Broker必須使用不同的Broker ID進行注冊,創建完節點後, 每個Broker就會將自己的IP地址和埠信息記錄 到該節點中去。其中,Broker創建的節點類型是 臨時節點 ,一旦Broker 宕機 ,則 對應的臨時節點也會被自動刪除 。

2)topic:消息主題,在Kafka中,同一個 Topic的消息會被分成多個分區 並將其分布在多個Broker上, 這些分區信息及與Broker的對應關系 也都是由Zookeeper在維護,由專門的節點來記錄,如:/borkers/topics Kafka中每個Topic都會以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker伺服器啟動後,會到對應Topic節點(/brokers/topics)上注冊自己的Broker ID並寫入針對該Topic的分區總數,如/brokers/topics/login/3->2,這個節點表示Broker ID為3的一個Broker伺服器,對於"login"這個Topic的消息,提供了2個分區進行消息存儲,同樣,這個分區節點也是臨時節點。

3)partition :同一topic類型消息的分區,如圖,每個分區都存在一個leader 和N個follower(副本),副本個數在創建topic的時候可以指定創建多少個。消息生產者生產消息和消費組消費消息都是通過leader完成,副本的存在是為了防止發生節點宕機,導致leader掛了,follower隨時頂上去變成leader,繼續恢復生產。重點來了,leader所在節點掛了,會有follower變成leader,所以同一個topic的同一個partition的leader與follower不可能在同一個broker,這樣才能做到這個broker上的某個topic的某個partition的leader掛了,其他正常節點上的這個topic的這個partition的follower會頂上來。

4)生產者發送消息的 負載均衡 :由於同一個Topic消息會被分區並將其分布在多個Broker上,因此, 生產者需要將消息合理地發送到這些分布式的Broker上 ,那麼如何實現生產者的負載均衡,Kafka支持傳統的四層負載均衡,也支持Zookeeper方式實現負載均衡。 (4.1) 四層負載均衡,根據生產者的IP地址和埠來為其確定一個相關聯的Broker。通常,一個生產者只會對應單個Broker,然後該生產者產生的消息都發往該Broker。這種方式邏輯簡單,每個生產者不需要同其他系統建立額外的TCP連接,只需要和Broker維護單個TCP連接即可。但是,其無法做到真正的負載均衡,因為實際系統中的每個生產者產生的消息量及每個Broker的消息存儲量都是不一樣的,如果有些生產者產生的消息遠多於其他生產者的話,那麼會導致不同的Broker接收到的消息總數差異巨大,同時,生產者也無法實時感知到Broker的新增和刪除。 (4.2) 使用Zookeeper進行負載均衡,由於每個Broker啟動時,都會完成Broker注冊過程,生產者會通過該節點的變化來動態地感知到Broker伺服器列表的變更,這樣就可以實現動態的負載均衡機制。

5)消費者負載均衡:與生產者類似,Kafka中的消費者同樣需要進行負載均衡來實現多個消費者合理地從對應的Broker伺服器上接收消息,每個消費組分組包含若干消費者, 每條消息都只會發送給分組中的一個消費者 ,不同的消費者分組消費自己特定的Topic下面的消息,互不幹擾。

6)分區與消費者 的關系: 消費組 (Consumer Group)  consumer group 下有多個 Consumer(消費者)。對於每個消費者組 (Consumer Group),Kafka都會為其分配一個全局唯一的Group ID,Group 內部的所有消費者共享該 ID。訂閱的topic下的每個分區只能分配給某個 group 下的一個consumer(當然該分區還可以被分配給其他group)。同時,Kafka為每個消費者分配一個Consumer ID,通常採用"Hostname:UUID"形式表示。在Kafka中,規定了 每個消息分區 只能被同組的一個消費者進行消費 ,因此,需要在 Zookeeper 上記錄 消息分區 與 Consumer 之間的關系,每個消費者一旦確定了對一個消息分區的消費權力,需要將其Consumer ID 寫入到 Zookeeper 對應消息分區的臨時節點上,例如:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]  其中,[broker_id-partition_id]就是一個 消息分區 的標識,節點內容就是該 消息分區 上 消費者的Consumer ID。

7)消息的消費進度Offset 記錄:在消費者對指定消息分區進行消息消費的過程中, 需要定時地將分區消息的消費進度Offset記錄到Zookeeper上 ,以便在該消費者進行重啟或者其他消費者重新接管該消息分區的消息消費後,能夠從之前的進度開始繼續進行消息消費。Offset在Zookeeper中由一個專門節點進行記錄,其節點路徑為:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] 節點內容就是Offset的值。這是kafka0.9和之前版本offset記錄的方式,之後的版本offset都改為存在kafka本地,當然了這里的本地是指磁碟不是內存。。。

8)消費者注冊:每個消費者伺服器啟動時,都會到Zookeeper的指定節點下創建一個屬於自己的消費者節點,例如/consumers/[group_id]/ids/[consumer_id],完成節點創建後,消費者就會將自己訂閱的Topic信息寫入該臨時節點。 對 消費者分組 中的 消費者 的變化注冊監聽 。每個 消費者 都需要關注所屬 消費者分組 中其他消費者伺服器的變化情況,即對/consumers/[group_id]/ids節點注冊子節點變化的Watcher監聽,一旦發現消費者新增或減少,就觸發消費者的負載均衡。 對Broker伺服器變化注冊監聽 。消費者需要對/broker/ids/[0-N]中的節點進行監聽,如果發現Broker伺服器列表發生變化,那麼就根據具體情況來決定是否需要進行消費者負載均衡。 進行消費者負載均衡 。為了讓同一個Topic下不同分區的消息盡量均衡地被多個 消費者 消費而進行 消費者 與 消息 分區分配的過程,通常,對於一個消費者分組,如果組內的消費者伺服器發生變更或Broker伺服器發生變更,會發出消費者負載均衡。

Ⅲ 服務端技術實戰系列——Kafka篇

一.概念&原理

[if !supportLists]1. [endif]主題(topic):主題是對消息的分類。

[if !supportLists]2. [endif]消息(message):消息是kafka通信的基本單位。

[if !supportLists]3. [endif]分區(partition): 一組 消息對應 一個 主題, 一個 主題對應 一個或多個 分區。每個分區為一系列有序消息組成的 有序隊列 ;每個分區在物理上對應一個文件夾

[if !supportLists]4. [endif]副本(replica):每個分區有 一個或多個 副本,分區的副本分布在集群的 不同 代理(機器)上,以提高可用性;分區的副本與日誌對象是一一對應的。

[if !supportLists]5. [endif]Kafka只保證一個 分區內 的消息 有序性 ,不保證跨分區消息的有序性。消息被追加到相應分區中, 順序寫入磁碟 ,效率非常高。

[if !supportLists]6. [endif]Kafka選取某個某個分區的 一個 副本作為leader副本,該分區的 其他 副本為follower副本。 只有leader副本負責處理客戶端讀/寫請求 ,follower副本從leader副本同步數據。

[if !supportLists]7. [endif]任何發布到分區的消息都會追加到日誌文件的尾部, 每條消息 在日誌文件中的 位置 都對應一個 按序遞增的偏移量 ;偏移量在一個分區下嚴格有序。

[if !supportLists]8. [endif]Kafka不允許對消息進行隨機讀寫。

[if !supportLists]9. [endif]新版消費者將 消費偏移量 保存到kafka內部的一個主題中。

[if !supportLists]10. [endif]Kafka集群由 一個或多個代理 (Broker,也稱為kafka實例)構成。可以在 一台 伺服器上配置 一個或多個代理 ,每個代理具有唯一標識broker.id。

[if !supportLists]11. [endif]生產者將消息 發送給代理 (Broker)。

[if !supportLists]12. [endif]消費者以 拉取 (pull)方式拉取數據,每個消費者都屬於一個消費組。

[if !supportLists]13. [endif]同一個主題的一條消息只能被 同一個消費組 下的某一個消費者消費,但 不同消費組 的消費者可以 同時 消費該消息。

[if !supportLists]14. [endif]消息 廣播 :指定各消費者屬於不同消費組;消息 單播 :指定各消費者屬於同一個消費組。

[if !supportLists]15. [endif]Kafka啟動時在Zookeeper上創建相應節點來保存 元數據 ,元數據包括:代理節點信息、集群信息、主題信息、分區狀態信息、分區副本分配方案、動態配置等;

[if !supportLists]16. [endif]Kafka通過 監聽 機制在節點注冊監聽器來監聽節點元數據變化;

[if !supportLists]17. [endif]Kafka將數據寫入 磁碟 ,以文件系統來存數據;

[if !supportLists]18. [endif]生產環境一般將zookeeper集群和kafka集群 分機架 部署;

[if !supportLists]二.[endif] Kafka Procer

配置:

/**

 * xTestProxy——KafkaConfigConstant

 *

 * @author  ZhangChi

 * @date  2018年6月20日---下午5:50:44

 * @version  1.0

 */

public   class  KafkaConfigConstant {

public   static   final  String KAFKA_CLUSTER  = "fa-common1.hangzhou-1.kafka.internal.lede.com:9200,fa-common2.hangzhou-1.kafka.internal.lede.com:9200,fa-common3.hangzhou-1.kafka.internal.lede.com:9200";

}

生產者配置:

/**

 * xTestProxy——HttpKafkaProcerFactory

 *

 * @author  ZhangChi

 * @date  2018年6月11日---下午2:37:51

 * @version  1.0

 */

public   class  HttpKafkaProcerFactory {

// 真正的KafkaProcer僅有一份

private   static  KafkaProcer kafkaProcer  = null ;

private   static  Properties property ;

public   static  KafkaProcer getKafkaProcer() {

if  ( kafkaProcer  == null ) {

synchronized  (HttpKafkaProcerFactory. class ) {

if  ( kafkaProcer  == null ) {

property  = buildKafkaProperty ();

kafkaProcer  = new  KafkaProcer( property );

}

}

}

return   kafkaProcer ;

}

public   static  Properties buildKafkaProperty() {

Properties props = new  Properties();

props.put(ProcerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );

props.put(ProcerConfig. ACKS_CONFIG , "all");

props.put(ProcerConfig. RETRIES_CONFIG , 0);

props.put(ProcerConfig. BATCH_SIZE_CONFIG , 16384);

props.put(ProcerConfig. BUFFER_MEMORY_CONFIG , 33554432);

props.put(ProcerConfig. LINGER_MS_CONFIG , 1);

props.put(ProcerConfig. KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer");

props.put(ProcerConfig. VALUE_SERIALIZER_CLASS_CONFIG ,

"org.apache.kafka.common.serialization.StringSerializer");

return  props;

}

}

生產者線程組:

/**

 * xTestProxy——HttpKafkaProcerThread

 * 多線程每次new一個實例

 *

 * @author  ZhangChi

 * @date  2018年6月25日---下午2:09:39

 * @version  1.0

 */

public   class  HttpKafkaProcerThread implements  Runnable {

private   static  Logger logger  = LoggerFactory. getLogger ("HttpKafkaProcerThread");

private   final  String KAFKA_TOPIC = KafkaConstant. HTTP_REQ_RESP_TOPIC ;

private  String kafkaMessageJson;

private  KafkaProcer procer;

public  String messageType;

public  String originalMessage;

private   static  KafkaMessage kafkaMessage  = new  KafkaMessage();

public  HttpKafkaProcerThread(KafkaProcer procer, String messageType, String originalMessage) {

this .procer = procer;

this .messageType = messageType;

this .originalMessage = originalMessage;

}

@Override

public   void  run() {

// TODO  Auto-generated method stub

/* 1.構建kafka消息*/

kafkaMessageJson = generateKafkaMessage( this .messageType, this .originalMessage);

/* 2.發送kafka消息*/

if  (kafkaMessageJson != null  && !StringUtils. isEmpty (kafkaMessageJson)) {

logger .info("create message start:" + kafkaMessageJson);

procer.send( new  ProcerRecord( this .KAFKA_TOPIC, kafkaMessageJson));

} else  {

logger .info("kafkaMessageJson is null!");

}

}

private  String generateKafkaMessage(String messageType, String originalMessage) {

if  (StringUtils. isBlank (messageType) || StringUtils. isBlank (originalMessage)) {

return   null ;

}

kafkaMessage .setMessageId(KafkaMessageUtils. generateId ());

kafkaMessage .setMessageTime(KafkaMessageUtils. generateTime ());

kafkaMessage .setMessageType(messageType);

kafkaMessage .setMessage(originalMessage);

String kafkaMessageToJson = null ;

try  {

kafkaMessageToJson = KafkaMessageUtils. objectToJson ( kafkaMessage );

} catch  (JsonProcessingException e) {

// TODO  Auto-generated catch block

e.printStackTrace();

}

kafkaMessageJson = kafkaMessageToJson;

return  kafkaMessageToJson;

}

}

[if !supportLists]三.[endif] Kafka Consumer

消費者配置:

private   static  Properties buildKafkaProperty() {

Properties properties = new  Properties();

// 測試環境kafka的埠號是9200

properties.put(ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );

// 消費組名稱

properties.put(ConsumerConfig. GROUP_ID_CONFIG , KafkaConfigConstant. GROUP_ID );

properties.put(ConsumerConfig. CLIENT_ID_CONFIG , "test");

// 從頭消費

properties.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG , "earliest");

// 自動提交偏移量

properties.put(ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG , "true");

// 時間間隔1s

properties.put(ConsumerConfig. AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");

properties.put(ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG ,

"org.apache.kafka.common.serialization.StringDeserializer");

properties.put(ConsumerConfig. VALUE_DESERIALIZER_CLASS_CONFIG ,

"org.apache.kafka.common.serialization.StringDeserializer");

return  properties;

}

消費者線程組:

/**

 * AnalysisEngine——HttpKafkaConsumerGroup

 *

 * @author  ZhangChi

 * @date  2018年6月11日---下午6:20:47

 * @version  1.0

 */

@Service("httpKafkaConsumerGroup")

public   class  HttpKafkaConsumerGroup {

@Autowired

private  RequestAnalyzer requestAnalyzer;

@Autowired

private  EsDocumentServiceImpl esDocumentServiceImpl;

@Autowired

private  AnalysisEngineClient analysisEngineClient;

@Autowired

private  MongoTemplate mongoTemplate;

private  List httpKafkaConsumerList = new  ArrayList();

public   void  initHttpKafkaConsumerGroup( int  consumerNumber, RunModeEnum mode) {

for  ( int  i = 0; i < consumerNumber; i++) {

/**

 * 將注入的服務當做構造參數,這樣保證每個子線程都能拿到服務實例而不是空指針!

 */

HttpKafkaConsumer consumerThread = new  HttpKafkaConsumer(requestAnalyzer, esDocumentServiceImpl, mode, analysisEngineClient, mongoTemplate);

httpKafkaConsumerList.add(consumerThread);

}

}

public   void  consumeGroupStart() {

for  (HttpKafkaConsumer item : httpKafkaConsumerList) {

LogConstant. runLog .info("httpKafkaConsumerList size : " + httpKafkaConsumerList.size());

Thread consumerThread = new  Thread(item);

consumerThread.start();

}

}

}

先逐個初始化消費者實例,然後將這些消費者加入到消費組列表中。消費組啟動後,會循環產生消費者線程。

 

Ⅳ Kafka相關內容總結(Kafka集群搭建手記)

Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規范的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成為Procer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是procer和consumer都依賴於zookeeper來保證系統可用性集群保存一些meta信息。
入門請參照: https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html
在此不再贅述。

這部分不是本文的重點,但是kafka需要用到kafka集群,所以先搭建kafka集群。
從kafka官方文檔看到,kafka似乎在未來的版本希望拋棄zookeep集群,自己維護集群的一致性,拭目以待吧。
我們搭建集群使用的是三台同機房的機器,因為zookeeper不怎麼占資源也不怎麼占空間(我們的業務目前比較簡單),所以三台機器上都搭建了zookeeper集群。
搭建zookeeper集群沒什麼難度,參考文檔: http://www.cnblogs.com/huangxincheng/p/5654170.html
下面列一下我的配置並解析:

一共用三台物理機器,搭建一個Kafka集群。
每台伺服器的硬碟劃分都是一樣的,每個獨立的物理磁碟掛在一個單獨的分區裡面,這樣很方便用於Kafka多個partition的數據讀寫與冗餘。
/data1比較小,為了不成為集群的瓶頸,所以/data1用於存放kafka以及Zookeeper
每台機器的磁碟分布如下:

下面是kafka的簡單配置,三台伺服器都一樣,如有不一致的在下文有說明。
kafka安裝在目錄/usr/local/kafka/下,下面的說明以10.1.xxx.57為例。

最重要的配置文件server.properties,需要配置的信息如下:

從上面的配置看到,kafka集群不需要像hadoop集群那樣,配置ssh通訊,而且一個kafka伺服器(官方文檔稱之為broker,下面統一使用這個稱呼)並不知道其他的kafka伺服器的存在,因此你需要逐個broker去啟動kafka。各個broker根據自己的配置,會自動去配置文件上的zk伺服器報到,這就是一個有zk伺服器粘合起來的kafka集群。
我寫了一個啟動腳本,放在 /usr/local/kafka/bin 下面。啟動腳本每個broker都一樣:

如同kafka集群裡面每一個broker都需要單獨啟動一樣,kafka集群裡面每一個broker都需要單獨關閉。
官方給出的關閉腳本是單獨運行 bin/kafka-server-stop.sh
但是我運行的結果是無法關閉。打開腳本一看,才發現是最簡單的辦法,發一個TERM信號到kafka的java進程,官方腳本給出的grep有點問題。
發信號之後,一直tail著kafka日誌,看到正常關閉。

指定zookeeper伺服器,topic名稱是LvsKafka(注意topic名稱不能有英文句號(.)和下劃線(_),否則會通不過,理由是名稱會沖突,下文對此略有解析)
replication-factor指出重復因子是2,也就是每條數據有兩個拷貝,可靠性考慮。
partitions 指出需要多少個partition,數據量大的多一點,無論生產和消費,這是負載均衡和高並發的需要。

可以看到剛才新建的24個partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59這台機器。
建立topic時我們指出需要2個拷貝,從上面的輸出的Replicas欄位看到,這兩個拷貝放在59,58兩個機器,也就是10.1.xxx.59和10.1.xxx.58.
Isr表示當前partition的所有拷貝所在的機器中,哪些是還活著(可以提供服務)的。現在是59和58都還存活。

這個命令另外還會看到一些類似於下面的內容:

__consumer_offsets到底是什麼呢?其實就是客戶端的消費進度,客戶端會定時上報到kafka集群,而kafka集群會把每個客戶端的消費進度放入一個自己內部的topic中,這個topic就是__consumer_offsets。我查看過__consumer_offsets的內容,其實就是每個客戶端的消費進度作為一條消息,放入__consumer_offsets這個topic中。
這里給了我們兩個提示:
1、kafka自己管理客戶端的消費進度,而不是依靠zk,這就是kafka官方文檔說的kafka未來會拋棄zk的底氣之一;
2、留意到這個kafka自己的topic是帶下劃線的,也就是,kafka擔心我們自己建的topic如果帶下劃線的話會跟這些內部自用的topic沖突;

Ⅳ windows 下遠程連接kafka伺服器並創建topic 部署服務

一.打包項目鏡像:

利用Dockerfile 來打包項目的鏡像
本次項目共依賴兩個鏡像(一個基礎系統環境和一個項目鏡像)
本次直接將Dockerfile寫好後,用shell腳本build.sh啟動打包:

然後切換到項目的目錄下找到build.sh,運行即可打包項目鏡像



報錯:"failed to dial gRPC: cannot connect to the Docker daemon. Is 'docker daemon' running on this host?: dial unix /var/run/docker.sock: connect: permission denied
"
就用

出現以下說明打包成功,接下來可以開始部署:

https://jingyan..com/article/9113f81b49ed2f2b3214c7fa.html

注意:如果遇到只讀許可權不能修改時,將host文件復制一份到桌面,修改後在替換原來的host文件
在hosts文件末尾加上kafka伺服器< !外網! 39. 0.25...>地址,修改後的格式如下:
1.1注意: 修改阿里雲伺服器的hosts 文件來配置 kafka的伺服器地址:

在hosts 文件最後加入:

添加的 kafka-server 就是以下創建topic命令中的 kafka-server別名,

監聽遠程kafka:新建消費者:

遠程創建topic的實例:

查看遠程已創建的topc:

本地:

遠程修改後的kafka topic:

2.通過git Bash 切換到kafka客戶端的bin目錄:
桌面打開 gitBash,切換到本地kafka軟體目錄:

這里一定要切換為windows

3.查看已經有的topic

--topic 指定topic名字
--replication-factor 指定副本數,因為我的是集群環境,這里副本數就為3
--partitions 指定分區數,這個參數需要根據broker數和數據量決定,正常情況下,每個broker上兩個partition最好

注意:伺服器部署時候一定要用內網172. .開頭的,外部訪問設為外網ip
不然會導致Kafka寫入數據的時候報錯 : TImeout

4.1本地docker創建topic:

4.2 本地windows 創建topic
進入本地軟體路徑KAFKA/BIN/WIONDOWS
創建topic

5.修改伺服器的host:
一定要注意加sudo 不然會導致readonly 無法修改

在host 文件的末尾加上以下:

6.切換到工程部署的目錄

7.清理redis,不然數據有殘留:
7.1伺服器上的redis掛載清除:
在 docker-compose.yml中注銷這幾行: 目的是每次啟動不必記錄上次沒有執行完的數據.

這個是用來記錄redis中假如上次指定的是1到100萬塊,沒有執行完.下次接著執行沒執行完的任務,測試時暫時關閉

7.2刪除volume:

7.3 如果volume文件被佔用時,先刪除佔用容器:

7.4 清除redis中的數據
進入redis容器中:

8.部署命令:
8.1開啟docker可視化web上監控docker:

然後訪問: http://39.100.48.41:9000
宿主機IP + 9000埠

8.2執行部署命令,啟動服務:

9.部署時報錯: yaml: line 46: did not find expected key
原因: docker-compose.yml文件中第46行 報錯

解決:將所有數據對齊,不要有多餘的空格.

Ⅵ arm架構伺服器kafka安裝

主要操作在主機vcapp250上進行

編輯config/zookeeper.properties

編輯config/server.properties

編輯bin/kafka-run-class.sh

將安裝目錄拷貝到剩餘主機的對應目錄中

啟動zookeeper

啟動kafka

Ⅶ Kafka Connect的安裝和配置

       在使用Kafka Connect時,需要注意一些事項,以幫助你構建適應長期需求的datapipeline。本章旨在提供有關的一些上下文。

       要開始使用Kafka Connect,只有一個硬性的先決條件:一個Kafka的broker集群。然而,隨著集群增長,有幾個問題需要提前考慮:

       在開始之前,確定哪種模式最適合您的環境非常有用。 對於適合單個代理的環境(例如從web伺服器向Kafka發送日誌),standalone模式非常適合。在單個source或sink可能需要大量數據的用例中(例如,將數據從Kafka發送到HDFS),分布式模式在可伸縮性方面更加靈活,並提供了高可用性服務,從而最小化停機時間。

       Kafka Connect插件是一組jar文件,Kafka Connect可以在其中找到一個或多個connector、transform、以及converter的實現。Kafka Connect將每個插件彼此隔離,這樣一個插件中的庫就不會受到其他插件庫的影響,這點非常重要。

Kafka Connect plugin是:
(1)在一個uber jar文件中包含插件及所有第三方依賴;或
(2)一個包含jar包和第三方依賴的目錄。

       Kafka Connect使用plugin path找到插件,這是Kafka Connect在worker配置文件中定義的一個以逗號分隔的目錄列表。要安裝插件,請將目錄或uber jar放在plugin path路徑中列出的目錄中。

        舉個例子 ,我們在每台機器上創建一個/usr/local/share/kafka/plugins目錄,然後將我們所有的插件jar或插件目錄放入其中。然後在worker的配置文件中加入如下配置項:

       現在,當我們啟動worker時,Kafka Connect可以發現這些插件中定義的所有connector、transform以及converter。Kafka Connect顯式地避免了其他插件中的庫, 並防止了沖突。

       如果要在同一個機器上運行多個standalone實例,有一些參數需要是獨一無二的:
(1)offset.storage.file.filename:connector偏移量的存儲。
(2)rest.port:用於監聽http請求的rest介面所佔用的埠。

       connector和task的配置,offsets和狀態會存儲在Kafka的內部主題中,Kafka Connect會自動創建這些主題,且所有topic都使用了壓縮清理策略。
       如果要手動創建這些topic,推薦使用如下命令:

這里只列出一些有疑問的。

       配置了group.id的worker會自動發現彼此並形成集群。一個集群中的所有worker必須使用相同的三個Kafka topic來共享配置、偏移量以及狀態,所有worker必須配置相同的config.storage.topic、offset.storage.topic以及status.storage.topic。

       每個converter實現類都有自己的相關配置需求。下面的例子展示了一個worker屬性文件,其中使用的AvroConverter需要將Schema Registry的url作為屬性進行傳遞。

注意: 除了其配置覆蓋這些配置的connector,worker上運行的所有connector都使用這些converter。

閱讀全文

與如何將kafka部署到伺服器上相關的資料

熱點內容
伺服器預留地址獲取 瀏覽:1002
雲庫文件夾怎麼設置 瀏覽:293
文件夾目錄製作自動跳轉 瀏覽:452
在哪個音樂app能聽exo的歌 瀏覽:847
pdf超級加密 瀏覽:47
蘋果手機app安裝包怎麼解壓並安裝 瀏覽:905
中原30系統源碼 瀏覽:184
程序員如何遵紀守法 瀏覽:499
java的webxml配置 瀏覽:962
如何封包遠程注入伺服器 瀏覽:864
監測機構資金動向源碼 瀏覽:967
android狀態欄字體50 瀏覽:767
python如何判斷文件後綴 瀏覽:126
龍空app哪裡下 瀏覽:348
阿里雲伺服器搭建網盤 瀏覽:690
京東軟體程序員 瀏覽:806
php游戲伺服器框架 瀏覽:391
導航開發演算法 瀏覽:432
為什麼30歲還想轉行程序員 瀏覽:380
推薦演算法的使用 瀏覽:41