① kafka参数配置
配置日志落在哪些磁盘
配置使用哪个zookeeper
注意 ,只需要在最后面追加一个/chroot即可
具体配置格式 协议名称,主机名称, 端口号 写法 protocol://hostname:port
这个是broker的全局配置,也可以在创建topic的时候 指定每个topic的配置,默认topic的配置覆盖broker的配置。
kafka使用的scale编写的,最终是通过jvm运行,所以需要设置jvm参数对kafka调优。kafka启动的时候会读取两个环境变量
kafka并不需要设置太多的OS参数,通常需要关注下面几个:
② Kafka Connect的安装和配置
在使用Kafka Connect时,需要注意一些事项,以帮助你构建适应长期需求的datapipeline。本章旨在提供有关的一些上下文。
要开始使用Kafka Connect,只有一个硬性的先决条件:一个Kafka的broker集群。然而,随着集群增长,有几个问题需要提前考虑:
在开始之前,确定哪种模式最适合您的环境非常有用。 对于适合单个代理的环境(例如从web服务器向Kafka发送日志),standalone模式非常适合。在单个source或sink可能需要大量数据的用例中(例如,将数据从Kafka发送到HDFS),分布式模式在可伸缩性方面更加灵活,并提供了高可用性服务,从而最小化停机时间。
Kafka Connect插件是一组jar文件,Kafka Connect可以在其中找到一个或多个connector、transform、以及converter的实现。Kafka Connect将每个插件彼此隔离,这样一个插件中的库就不会受到其他插件库的影响,这点非常重要。
Kafka Connect plugin是:
(1)在一个uber jar文件中包含插件及所有第三方依赖;或
(2)一个包含jar包和第三方依赖的目录。
Kafka Connect使用plugin path找到插件,这是Kafka Connect在worker配置文件中定义的一个以逗号分隔的目录列表。要安装插件,请将目录或uber jar放在plugin path路径中列出的目录中。
举个例子 ,我们在每台机器上创建一个/usr/local/share/kafka/plugins目录,然后将我们所有的插件jar或插件目录放入其中。然后在worker的配置文件中加入如下配置项:
现在,当我们启动worker时,Kafka Connect可以发现这些插件中定义的所有connector、transform以及converter。Kafka Connect显式地避免了其他插件中的库, 并防止了冲突。
如果要在同一个机器上运行多个standalone实例,有一些参数需要是独一无二的:
(1)offset.storage.file.filename:connector偏移量的存储。
(2)rest.port:用于监听http请求的rest接口所占用的端口。
connector和task的配置,offsets和状态会存储在Kafka的内部主题中,Kafka Connect会自动创建这些主题,且所有topic都使用了压缩清理策略。
如果要手动创建这些topic,推荐使用如下命令:
这里只列出一些有疑问的。
配置了group.id的worker会自动发现彼此并形成集群。一个集群中的所有worker必须使用相同的三个Kafka topic来共享配置、偏移量以及状态,所有worker必须配置相同的config.storage.topic、offset.storage.topic以及status.storage.topic。
每个converter实现类都有自己的相关配置需求。下面的例子展示了一个worker属性文件,其中使用的AvroConverter需要将Schema Registry的url作为属性进行传递。
注意: 除了其配置覆盖这些配置的connector,worker上运行的所有connector都使用这些converter。
③ Kafka身份认证与权限控制配置
编辑原有配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/server.properties
listeners=SASL_PLAINTEXT://192.168.43.209:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:root
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf
KafkaServer{
org.apache.kafka.common.security.plain.PlainLoginMole required
username="kafka"
password="kafkapswd"
user_ kafkaa(用户名)="kafkaapswd"(密码)
user_ kafkab(用户名)=" kafkabpswd"(密码)
user_ kafkac(用户名)=" kafkacpswd"(密码)
user_ kafkad(用户名)=" kafkadpswd"(密码);
};
修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh
if ["x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"
fi
修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-run-class.sh
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf'
if ["x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS$KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" >"$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH$KAFKA_OPTS "$@"
fi
创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginMole required
username=" kafkaa"
password=" kafkaapswd";
};
修改执行文件
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-procer.sh
if ["x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"
fi
运行jar包的服务器的指定路径下必须有kafka_ client_ jaas.conf文件
在程序中添加如下配置
System.setProperty("java.security.auth.login.config","xx/kafka_client_jaas.conf");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
问题描述:发布消息、订阅消息时,出现如下错误,WARN [Consumer clientId=consumer-1, groupId=console-consumer-20752]Error while fetching metadata with correlation id 2 :{test2=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解决方法:各客户端的用户名设置为相同,多个客户端同时管理会产生冲突。
④ kafka配置参数详解
kafka的配置分为 broker、procter、consumer三个不同的配置
一 BROKER 的全局配置
最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。
------------------------------------------- 系统 相关 -------------------------------------------
broker.id =1
log.dirs = /tmp/kafka-logs
port =6667
message.max.bytes =1000000
num.network.threads =3
num.io.threads =8
background.threads =4
queued.max.requests =500
host.name
advertised.host.name
advertised.port
socket.send.buffer.bytes =100*1024
socket.receive.buffer.bytes =100*1024
socket.request.max.bytes =100 1024 1024
------------------------------------------- LOG 相关 -------------------------------------------
log.segment.bytes =1024 1024 1024
log.roll.hours =24*7
log.cleanup.policy = delete
log.retention.minutes=7days
指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1
log.retention.bytes=-1
log.retention.check.interval.ms=5minutes
log.cleaner.enable=false
log.cleaner.threads =1
log.cleaner.io.max.bytes.per.second=None
log.cleaner.depe.buffer.size=500 1024 1024
log.cleaner.io.buffer.size=512*1024
log.cleaner.io.buffer.load.factor =0.9
log.cleaner.backoff.ms =15000
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.delete.retention.ms =1day
log.index.size.max.bytes =10 1024 1024
log.index.interval.bytes =4096
log.flush.interval.messages=None
log.flush.scheler.interval.ms =3000
log.flush.interval.ms = None
log.delete.delay.ms =60000
log.flush.offset.checkpoint.interval.ms =60000
------------------------------------------- TOPIC 相关 -------------------------------------------
auto.create.topics.enable =true
default.replication.factor =1
num.partitions =1
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
controller.socket.timeout.ms =30000
controller.message.queue.size=10
replica.lag.time.max.ms =10000
replica.lag.max.messages =4000
replica.socket.timeout.ms=30*1000
replica.socket.receive.buffer.bytes=64*1024
replica.fetch.max.bytes =1024*1024
replica.fetch.wait.max.ms =500
replica.fetch.min.bytes =1
num.replica.fetchers=1
replica.high.watermark.checkpoint.interval.ms =5000
controlled.shutdown.enable =false
controlled.shutdown.max.retries =3
controlled.shutdown.retry.backoff.ms =5000
auto.leader.rebalance.enable =false
leader.imbalance.per.broker.percentage =10
leader.imbalance.check.interval.seconds =300
offset.metadata.max.bytes
----------------------------------ZooKeeper 相关----------------------------------
zookeeper.connect = localhost:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes
二 CONSUMER 配置
最为核心的配置是group.id、zookeeper.connect
group.id
consumer.id
client.id = group id value
zookeeper.connect=localhost:2182
zookeeper.session.timeout.ms =6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
auto.offset.reset = largest
socket.timeout.ms=30*1000
socket.receive.buffer.bytes=64*1024
fetch.message.max.bytes =1024*1024
auto.commit.enable =true
auto.commit.interval.ms =60*1000
queued.max.message.chunks =10
rebalance.max.retries =4
rebalance.backoff.ms =2000
refresh.leader.backoff.ms
fetch.min.bytes =1
fetch.wait.max.ms =100
consumer.timeout.ms = -1
三 PRODUCER 的配置
比较核心的配置:metadata.broker.list、request.required.acks、procer.type、serializer.class
metadata.broker.list
request.required.acks =0
request.timeout.ms =10000
send.buffer.bytes=100*1024
key.serializer.class
partitioner.class=kafka.procer.DefaultPartitioner
compression.codec = none
compressed.topics=null
message.send.max.retries =3
retry.backoff.ms =100
topic.metadata.refresh.interval.ms =600*1000
client.id=""
------------------------------------------- 消息模式 相关 -------------------------------------------
procer.type=sync
queue.buffering.max.ms =5000
queue.buffering.max.messages =10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
serializer.class= kafka.serializer.DefaultEncoder
⑤ Kafka相关内容总结(Kafka集群搭建手记)
Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Procer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是procer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
入门请参照: https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html
在此不再赘述。
这部分不是本文的重点,但是kafka需要用到kafka集群,所以先搭建kafka集群。
从kafka官方文档看到,kafka似乎在未来的版本希望抛弃zookeep集群,自己维护集群的一致性,拭目以待吧。
我们搭建集群使用的是三台同机房的机器,因为zookeeper不怎么占资源也不怎么占空间(我们的业务目前比较简单),所以三台机器上都搭建了zookeeper集群。
搭建zookeeper集群没什么难度,参考文档: http://www.cnblogs.com/huangxincheng/p/5654170.html
下面列一下我的配置并解析:
一共用三台物理机器,搭建一个Kafka集群。
每台服务器的硬盘划分都是一样的,每个独立的物理磁盘挂在一个单独的分区里面,这样很方便用于Kafka多个partition的数据读写与冗余。
/data1比较小,为了不成为集群的瓶颈,所以/data1用于存放kafka以及Zookeeper
每台机器的磁盘分布如下:
下面是kafka的简单配置,三台服务器都一样,如有不一致的在下文有说明。
kafka安装在目录/usr/local/kafka/下,下面的说明以10.1.xxx.57为例。
最重要的配置文件server.properties,需要配置的信息如下:
从上面的配置看到,kafka集群不需要像hadoop集群那样,配置ssh通讯,而且一个kafka服务器(官方文档称之为broker,下面统一使用这个称呼)并不知道其他的kafka服务器的存在,因此你需要逐个broker去启动kafka。各个broker根据自己的配置,会自动去配置文件上的zk服务器报到,这就是一个有zk服务器粘合起来的kafka集群。
我写了一个启动脚本,放在 /usr/local/kafka/bin 下面。启动脚本每个broker都一样:
如同kafka集群里面每一个broker都需要单独启动一样,kafka集群里面每一个broker都需要单独关闭。
官方给出的关闭脚本是单独运行 bin/kafka-server-stop.sh
但是我运行的结果是无法关闭。打开脚本一看,才发现是最简单的办法,发一个TERM信号到kafka的java进程,官方脚本给出的grep有点问题。
发信号之后,一直tail着kafka日志,看到正常关闭。
指定zookeeper服务器,topic名称是LvsKafka(注意topic名称不能有英文句号(.)和下划线(_),否则会通不过,理由是名称会冲突,下文对此略有解析)
replication-factor指出重复因子是2,也就是每条数据有两个拷贝,可靠性考虑。
partitions 指出需要多少个partition,数据量大的多一点,无论生产和消费,这是负载均衡和高并发的需要。
可以看到刚才新建的24个partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59这台机器。
建立topic时我们指出需要2个拷贝,从上面的输出的Replicas字段看到,这两个拷贝放在59,58两个机器,也就是10.1.xxx.59和10.1.xxx.58.
Isr表示当前partition的所有拷贝所在的机器中,哪些是还活着(可以提供服务)的。现在是59和58都还存活。
这个命令另外还会看到一些类似于下面的内容:
__consumer_offsets到底是什么呢?其实就是客户端的消费进度,客户端会定时上报到kafka集群,而kafka集群会把每个客户端的消费进度放入一个自己内部的topic中,这个topic就是__consumer_offsets。我查看过__consumer_offsets的内容,其实就是每个客户端的消费进度作为一条消息,放入__consumer_offsets这个topic中。
这里给了我们两个提示:
1、kafka自己管理客户端的消费进度,而不是依靠zk,这就是kafka官方文档说的kafka未来会抛弃zk的底气之一;
2、留意到这个kafka自己的topic是带下划线的,也就是,kafka担心我们自己建的topic如果带下划线的话会跟这些内部自用的topic冲突;
⑥ kafka术语和配置介绍
procer 是生产者,负责消息生产,上游程序中按照标准的消息格式组装(按照每个消息事件的字段定义)发送到指定的topic。procer生产消息的时候,不会因为consumer处理能力不够,而阻塞procer的生产。consumer会从指定的topic 拉取消息,然后处理消费,并提交offset(消息处理偏移量,消费掉的消息并不会主动删除,而是kafka系统根据保存周期自动消除)。
topic是消费分类存储的队列,可以按照消息类型来分topic存储。
replication是topic复制副本个数,用于解决数据丢失,防止leader topic宕机后,其他副本可以快代替。
broker是缓存代理,Kafka集群中的一台或多台服务器统称broker,用来保存procer发送的消息。Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。
partition是topic的物理分组,在创建topic的时候,可以指定partition 数量。每个partition是逻辑有序的,保证每个消息都是顺序插入的,而且每个消息的offset在不同partition的是唯一不同的
偏移量。kafka为每条在分区的消息保存一个偏移量offset,这也是消费者在分区的位置。比如一个偏移量是5的消费者,表示已经消费了从0-4偏移量的消息,下一个要消费的消息的偏移量是5。每次消息处理完后,要么主动提交offset,要么自动提交,把offset偏移到下一位,如处理offset=6消息。在kafka配置中,如果enable_auto_commit=True和auto_commit_interval_ms=xx,那表示每xx 毫秒自动提交偏移量
分组。是指在消费同一topic的不同consumer。每个consumer都有唯一的groupId,同一groupId 属于同一个group。不同groupId的consumer相互不影响。对于一个topic,同一个group的consumer数量不能超过 partition数量。比如,Topic A 有 16个partition,某一个group下有2个consumer,那2个consumer分别消费8个partition,而这个group的consumer数量最多不能超过16个。
kafka的配置主要分四类,分别是zookeeper、server、consumer、procer。其他的配置可以忽略。
zk的配置比较简单,也可以默认不改.dataDir是zk存储节点配置的目录地址,clientPort是zk启动的端口,默认2181,maxClientCnxns是限制ip的连接此处,设置0表示无连接次数,一般情况根据业务部署情况,配置合理的值。
⑦ Kafka生产者开发,原理分析,以及参数配置
生产者开发(基于java),生产者发送消息主要有以下三步
那么我们进行抽象,大致可以得到这两个类。
另外Kafka为了表现以下封装的特性,把准备生产者的参数配成了一个Properties类,
以这个类为KafkaProcer构造函数入参。
那么KafkaProcer的参数具体可以配置什么呢?
由123步可知,可以配置拦截器,序列化器,分区器。
这些都可以自己实现特定接口(ProcerInterceptor,Serializer,Partioner),
然后放到Properties里面,最后给KafkaProcer
拦截器就是对ProcerRecord做一些处理,然后返回处理过的新的ProcerRecord(自定义拦截策略)
序列化器是要讲java对象转成byte[]数组然后进行网络传输(自己定义序列化策略)
分区器就是为消息选择分区(这里自己可以设计分区策略)
再次回到这张图
可以看到,有两个线程在完成消息的发送,一个是主线程,一个是Sender线程。
主线程经过123步后,会将同一个partition的多个Record封装(压缩)到一个ProcerBatch对象中,
这样的目的是方便传输,提高效率,RecordAccumulator里面维持着一个双端ProcerBatch队列数组,
然后Sender线程从队头取ProcerBatch封装成Request,这里设计到一个逻辑到物理的转换。
分区是逻辑的,而broker才是物理的,一个区对应一个broker,所以要转换。
另外Sender线程里面维持了一个以Nodeid(就是对应broker)为Key,Deque<Request>为值的Map,
这里面的Request指的是那种没有Response的Request。一旦有了Response就会清理掉的。
这个是由通过leastLoadedNode节点实现的,不多说了。
其实除了123步中的参数,还有其它参数,这里就说一个
ack
acks=1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。
acks=0。生产者发送消息之后不需要等待任何服务端的响应。
acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。