本文介绍的的是kafka集群搭建、kafka权限认证(SASL/SCRAM)、整合springboot项目。
1、创建kafka日志和zookeeper文件目录:
/data/kafka/kafka-logs
/data/zookeeper/zkdata
/data/zookeeper/zklogs
2、修改kafka配置文件server.properties
2.1、将原文件重命名为server.properties_bak0311
2.2、新建server.properties,编辑:
# fixed params
listeners=PLAINTEXT://192.168.0.1:9092
log.segment.bytes=1073741824
socket.send.buffer.bytes=202400
work.threads=3
port=9092
num.recovery.threads.per.data.dir=1
log.dirs=/data/kafka/kafka-2.12-logs
log.flush.interval.messages=10000
zookeeper.connection.timeout.ms=6000
log.retention.check.interval.ms=300000
zookeeper.session.timeout.ms=6000
log.flush.interval.ms=1000
replica.fetch.max.bytes=1000000
# params input by user
advertised.port=9092
auto.create.topics.enable=true
compression.type=producer
default.replication.factor=3
delete.topic.enable=true
kafka-manager.basicAuthentication.enabled=false
kafka-manager.basicAuthentication.password=password
kafka-manager.basicAuthentication.username=admin
kafka-manager.port=9000
log.cleaner.enable=false
log.cleanup.policy=delete
log.retention.bytes=9663676416
log.retention.hours=48
log.roll.hours=168
log.segment.bytes=1073741824
log.segment.delete.delay.ms=60000
message.max.bytes=1000000
num.io.threads=8
num.partitions=5
num.replica.fetchers=1
offsets.topic.replication.factor=3
queued.max.requests=500
socket.receive.buffer.bytes=102400
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
# dependency
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181
zookeeper.connection.timeout.ms=400000
# self
#host.name=100.126.6.14
broker.id=1
advertised.host.name=192.168.0.1
3、修改zookeeper配置文件zookeeper.properties
3.1、将原文件重命名为zookeeper.properties_bak
3.2、新建zookeeper.properties,编辑:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/zkdata
dataLogDir=/data/zookeeper/zklogs
clientPort=2181
maxClientCnxns=0
autopurge.snapRetainCount=30
autopurge.purgeInterval=48
server.1=192.168.0.1:2888:3888
server.2=192.168.0.2:2888:3888
server.3=192.168.0.3:2888:3888
server.4=192.168.0.4:2888:3888
server.5=192.168.0.5:2888:3888
3.3、zookeeper数据目录添加myid配置
在各台服务器的zookeeper数据目录/data/zookeeper/zkdata添加myid文件,写入服务broker.id属性值。分别是1、2、3、4、5
1命令:
$ echo 0 > myid #创建
$ cat myid #查看
4、停止服务
5、重启集群
nohup /home/appadmin/kafka/bin/zookeeper-server-start.sh /home/appadmin/kafka/config/zookeeper.properties >/data/zookeeper/zklogs/zookeeper.log 2>1 &
nohup /home/appadmin/kafka/bin/kafka-server-start.sh /home/appadmin/kafka/config/server.properties >/data/kafka/kafka-logs/kafka.log 2>1 &
6、权限认证配置
6.1、创建用户:
[appadmin@p0-insurance2-cskafka-web01 root]$ /home/appadmin/kafka/bin/kafka-configs.sh --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181 --alter --entity-type users --entity-name admin --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]'
Completed Updating config for entity: user-principal 'admin'.
查看SCRAM证书:
$ /home/appadmin/kafka/bin/kafka-configs.sh --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181 --describe --entity-type users --entity-name admin
Configs for user-principal 'admin' are SCRAM-SHA-512=salt=c2E2d3ZnM3RpZjF3cmR1dXN4MGpxcGM5YQ==,stored_key=PiSVOcujx71an/hf/GWGexdp5R3Pv3648tHSgPrn0SHS4vIULhCKo87wH62yF24OlfPvDiavRC9a6X/K9QDULQ==,server_key=FyJtKnuW5uqRdgBwQlt1kteIGwOT3S57CSq9zDkMFUjBeWhdCmCcvHgw60ToHeCxFFJG+al8uwdlgpqDmz1nSg==,iterations=4096,SCRAM-SHA-256=salt=NHk1dnJjaWRwdjYxbHBoY2tiOHg2Z2pnYw==,stored_key=CgFKII54oa/Gwi4ARXh4onXk41M03U/qzEVc2ts6tYI=,server_key=aawRFr6VJMSjDMZlY7w1XHlMCxqBVsKK+NZNcngoo0c=,iterations=4096
6.2、在config目录中创建kafka_server_jaas.conf文件
touch kafka_server_jaas.conf
vi kafka_server_jaas.conf
KafkaServer {
org.mon.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
6.3、修改配置文件server.properties,在原有基础上添加以下内容:
#认证配置
listeners=SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
6.4、在Kafka启动脚本中添加配置文件路径
vi kafka-server-start.sh
#!/bin/bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/appadmin/kafka/config/kafka_server_jaas.conf"
6.5、重启brokers
先停kafka再停zookeeper、重启所有zookeeper、重启所有kafka
7、创建用户及赋权
7.1、针对核心消息服务创建个性化用户:
/home/appadmin/kafka/bin/kafka-configs.sh --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181 --alter --entity-type users --entity-name testuser --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]'
查看SCRAM证书:
/home/appadmin/kafka/bin/kafka-configs.sh --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181 --describe --entity-type users --entity-name testuser
7.2、在服务器端创建topic: test-topic,并向testuser用户赋权,groupid = test_groupid
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181 --create --topic test-topic --partitions 1 --replication-factor 1
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181 --add --allow-principal User:testuser --operation Write --topic test-topic
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181 --add --allow-principal User:testuser --operation Read --topic test-topic --group test_groupid
查看topics:
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181 --list
查看acl配置
bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181