kafka-2-11集群部署
文章发布较早,内容可能过时,阅读注意甄别。
# 1,准备工作
# 1,主机准备。
准备三台主机,都是 CentOS-7,IP 如下:
- 192.168.106.7
- 192.168.106.8
- 192.168.106.9
为了方便后续操作,先配置一下主机免密码登陆。
[root@localhost ~]$ssh-keygen
[root@localhost ~]$ssh-copy-id root@192.168.106.7
[root@localhost ~]$ssh-copy-id root@192.168.106.8
[root@localhost ~]$ssh-copy-id root@192.168.106.9
1
2
3
4
2
3
4
更改三节点的主机名。
[root@localhost ~]$hostnamectl set-hostname node1
[root@localhost ~]$hostnamectl set-hostname node2
[root@localhost ~]$hostnamectl set-hostname node3
1
2
3
2
3
配置 hosts。
[root@localhost ~]$ cat >> /etc/hosts << EOF
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.106.7 node1
192.168.106.8 node2
192.168.106.9 node3
EOF
1
2
3
4
5
6
7
2
3
4
5
6
7
# 2,依赖准备。
只需准备 jdk 环境即可,通过如下命令安装:curl 192.168.1.1/a | sh
。
此种安装方式可以参考这里 (opens new window)。
# 3,安装包。
下载 kafka。
wget https://mirrors.cnnic.cn/apache/kafka/0.11.0.2/kafka_2.11-0.11.0.2.tgz
1
# 2,配置集群。
# 1,解压分发。
首先将 kafka 解压,然后分发给集群中各个 node。
[root@localhost opt]$tar xf kafka_2.11-0.11.0.2.tgz
[root@localhost opt]$mv kafka_2.11-0.11.0.2 kafka
1
2
2
使用如下脚本进行分发。
NODE_IPS=(192.168.106.7 192.168.106.8 192.168.106.9)
for node_ip in ${NODE_IPS[@]}
do
echo ">>> ${node_ip}"
scp -r /opt/kafka root@${node_ip}:/usr/local
done
1
2
3
4
5
6
2
3
4
5
6
# 2,配置 zookeeper 集群。
要搭建 kafka 集群,首先要部署 zookeeper 集群,这里直接使用 kafka 内置的 zookeeper 进行部署。
[root@localhost config]$egrep -v '^#|^$' zookeeper.properties
dataDir=/usr/local/kafka/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=10
syncLimit=5
server.1=node1:2888:3888
server.2=node2:2889:3889
server.3=node3:2890:3890
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- dataDir:zookeeper 的快照存储地址
- clientPort:zookeeper 端口
- maxClientCnxns:单个客户端与单台服务器之间的连接数的限制
- initLimit:默认值是 10,即 tickTime 属性值的 10 倍。它用于配置允许 Followers 连接并同步到 Leader 的最大时间。如果 ZooKeeper 管理的数据量很大的话可以增加这个值
- syncLimit:默认值是 5,即 tickTime 属性值的 5 倍。它用于配置 Leader 和 Followers 间进行心跳检测的最大延迟时间。如果在设置的时间内 Followers 无法与 Leader 进行通信,那么 Followers 将会被丢弃
- server.1:server.x 中的 x 要与 myid 中的数字一致,node1 为集群 IP,当然前边配置了 hosts 可以这么写,2888 用于 follower 与 leader 之间的数据同步与其他通信;3888 用于 leader 选举时的通信
配置完成之后,将文件分发出去。
NODE_IPS=(192.168.106.7 192.168.106.8 192.168.106.9)
for node_ip in ${NODE_IPS[@]}
do
echo ">>> ${node_ip}"
scp /opt/kafka/config/zookeeper.properties root@${node_ip}:/usr/local/kafka/config
done
1
2
3
4
5
6
2
3
4
5
6
创建对应的 myid,用于区分不同节点的身份。
NODE_IPS=(192.168.106.7 192.168.106.8 192.168.106.9)
for ((i=1;i<=${#NODE_IPS[@]};i++))
do
node_ip=${NODE_IPS[$i-1]}
echo ">>> ${node_ip}"
ssh root@${node_ip} 'mkdir -p /usr/local/kafka/zookeeper'
ssh root@${node_ip} "echo ${i} > /usr/local/kafka/zookeeper/myid"
done
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
启动 zookeeper 服务。
NODE_IPS=(192.168.106.7 192.168.106.8 192.168.106.9)
for node_ip in ${NODE_IPS[@]}
do
echo ">>> ${node_ip}"
ssh root@${node_ip} '/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties'
done
1
2
3
4
5
6
2
3
4
5
6
查看服务状态:
[root@localhost opt]$jps -m
2818 QuorumPeerMain config/zookeeper.properties
8901 Jps -m
1
2
3
2
3
观察一下日志,查看是否有日常。
# 3,配置 kafka 集群。
同样是先调整 kafka 的配置。
[root@localhost config]$egrep -v '^#|^$' server.properties
broker.id=1 #三个节点不能一样
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.106.7:2181,192.168.106.8:2181,192.168.106.9:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
分发配置文件。
NODE_IPS=(192.168.106.7 192.168.106.8 192.168.106.9)
for node_ip in ${NODE_IPS[@]}
do
echo ">>> ${node_ip}"
scp /opt/kafka/config/server.properties root@${node_ip}:/usr/local/kafka/config
done
1
2
3
4
5
6
2
3
4
5
6
分发之后记得更改一下每个配置文件的 broker.id
。
启动 kafka。
NODE_IPS=(192.168.106.7 192.168.106.8 192.168.106.9)
for node_ip in ${NODE_IPS[@]}
do
echo ">>> ${node_ip}"
ssh root@${node_ip} '/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties'
done
1
2
3
4
5
6
2
3
4
5
6
这样整个集群就搭建完成了。
# 3,验证集群。
# 1,创建一个 topic。
[root@localhost opt]$/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.106.7:2181,192.168.106.8:2181,192.168.106.9:2181 --replication-factor 3 --partitions 1 --topictest-topic
Created topic "test-topic".
1
2
2
# 2,查看 topic。
[root@localhost opt]$/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.106.7:2181,192.168.106.8:2181,192.168.106.9:2181
test-topic
1
2
2
# 3,生产消息。
[root@localhost opt]$/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.106.7:9092,192.168.106.8:9092,192.168.106.9:9092 --topic test-topic
>this is test
1
2
2
# 4,消费消息。
[root@localhost opt]$/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.106.7:2181,192.168.106.8:2181,192.168.106.9:2181 --topic test-topic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
this is test
1
2
3
2
3
如此,一个完整的集群就配置完成了。
上次更新: 2025/01/18, 09:43:53