• 准备

Ubuntu 18.04

server-1: 172.16.10.121

server-2: 172.16.10.122

server-3: 172.16.10.123

kafka : kafka_2.12-2.0.0

zookeeper : zookeeper-3.4.13

服务器新建kafka用户目录

  • Zookeeper集群搭建

三节点:server-1、server-2、server-3

  1. 解压文件
tar -zxvf  zookeeper-3.4.13.tar.gz

mv zookeeper-3.4.13 zookeeper
  1. 新建data、log目录
cd zookeeper/

mkdir data/

mkdir log/
  1. 修改配置
cd zookeeper/conf/

cp zoo-sample.cfg zoo.cfg

vim zoo.cfg

修改添加以下配置

dataDir=/home/kafka/zookeeper/data
dataLogDir=/home/kafka/zookeeper/log

server.0=172.16.10.121:2888:3888
server.1=172.16.10.122:2888:3888
server.2=172.16.10.123:2888:3888
说明:

server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里,172.16.10.121为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888

配置解释:
tickTime # 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
initLimit # 这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
syncLimit # 这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
dataDir # 快照日志的存储路径
dataLogDir # 事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
clientPort # 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点
  1. 创建myid文件
##  server1
echo "1" > /home/kafka/zookeeper/data/myid
##  server2
echo "2" > /home/kafka/zookeeper/data/myid
##  server3
echo "3" > /home/kafka/zookeeper/data/myid
  1. 日志清理
未完待续
  1. 启动测试

进入到zookeeper的bin 目录下启动(三台服务器以此启动):

cd zookeeper/bin

./zkServer.sh start

输出如下:

[email protected]:~/zookeeper/bin$ ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

依次查看各节点状态

./zkServer.sh status
##  server-1
[email protected]:~/zookeeper/bin$ ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
Mode: follower

##  server-2
[email protected]:~/zookeeper/bin$ ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
Mode: leader

##  server-3
[email protected]:~/zookeeper/bin$ ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
Mode: follower
说明:

zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。

使用jps 查看进程

jps

[email protected]:~/zookeeper/bin$ jps
81147 QuorumPeerMain
81322 Jps
  • Kafka集群搭建
  1. 解压文件并创建kafka-logs目录
tar -zxvf kafka_2.12-2.0.0.tgz

mv kafka_2.12-2.0.0 kafka

cd kafka

mkdir kafka-logs/
  1. 修改配置文件

参数解释:

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=172.16.10.200 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/home/kafka/kafka/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=172.16.10.121:2181,172.16.10.122:2181,172.16.10.123:2181 #设置zookeeper的连接端口
cd kafka/config/

vim server.properties
##每台服务器的broker.id都不能相同
broker.id=0  

log.dirs=/home/kafka/kafka/kafka-logs

#设置zookeeper的连接端口
zookeeper.connect=172.16.10.121:2181,172.16.10.122:2181,172.16.10.123:2181
  1. 启动kafka集群
##  进入到kafka的bin目录
cd kafka/bin/
##  启动(三台都要启动)
./kafka-server-start.sh -daemon ../config/server.properties
检查服务是否启动
jps

[email protected]:~/kafka/bin$ jps
81937 Kafka
81147 QuorumPeerMain
82013 Jps
  1. 创建Topic验证是否成功
在server-1 上创建一个名为meetads的topic消息队列:
./kafka-topics.sh --create --zookeeper 172.16.10.121:2181,172.16.10.122:2181,172.16.10.121:2181 --replication-factor 2 --partitions 1 --topic meetads
[email protected]:~/kafka/bin$ ./kafka-topics.sh --create --zookeeper 172.16.10.121:2181,172.16.10.122:2181,172.16.10.121:2181 --replication-factor 2 --partitions 1 --topic meetads
Created topic "meetads".

解释:

--replication-factor 2      #复制两份
--partitions 1              #创建1个分区
--topic                     #主题为meetads

然后在server-1上创建一个消费者consumer:

##  创建消费者,回车后进入等待状态
./kafka-console-consumer.sh --bootstrap-server 172.16.10.121:9092,172.16.10.122:9092,172.16.10.123:9092 --topic meetads --from-beginning

server-2上创建生产者producer:

##  创建生产者
./kafka-console-producer.sh --broker-list 172.16.10.121:9092,172.16.10.122:9092 --topic meetads
##  输出如下
[email protected]:~/kafka/bin$ ./kafka-console-producer.sh --broker-list 172.16.10.121:9092,172.16.10.122:9092 --topic meetads
>hello
>hello kafka!     
>

查看server-1的等待窗口可得到server-2的输入数据(server-3执行下面命令可得到同样结果):

[email protected]:~/kafka/bin$ ./kafka-console-consumer.sh --bootstrap-server 172.16.10.121:9092,172.16.10.122:9092,172.16.10.123:9092 --topic meetads --from-beginning
hello
hello kafka!
至此,kafka-zookeeper集群搭建完成!
  • Zookeeper集群动态扩容
说明:

由于zookeeper的特性:集群中只要有过半的机器是正常工作的,那么整个集群对外就是可用的。所以我们假设所有集群的数量都是奇数,我们以原集群是3台机器,现在将其扩展为5台为例

新增节点:

server-3: 172.16.10.124

server-4: 172.16.10.125

新增机器的安装目录与版本保持一致,

  1. 将原节点的配置文件zoo.cfg复制到新节点对应目录下,为避免出错这里用绝对路径
scp /home/kafka/zookeeper/conf/zoo.cfg [email protected]:/home/kafka/zookeeper/conf/
scp /home/kafka/zookeeper/conf/zoo.cfg [email protected]:/home/kafka/zookeeper/conf/
  1. 进入新机器,修改zoo.cfg,追加如下内容
## 原有
server.0=172.16.10.121:2888:3888 
server.1=172.16.10.122:2888:3888 
server.2=172.16.10.123:2888:3888
## 新增
server.3=172.16.10.124:2888:3888
server.4=172.16.10.125:2888:3888
  1. 新建myid
## server-3
echo "3" > /home/kafka/zookeeper/data/myid
## server-4
echo "4" > /home/kafka/zookeeper/data/myid
  1. 启动zookeeper,两个新节点都执行
cd zookeeper/bin
./zkServer.sh start
  1. 查看zookeeper状态
## 启动后使用命令./zkServer.sh status查看zookeeper状态。和单机模式下不同的是,这个时候,可以看到,新机器的模式为follower,而且没有出错
./zkServer.sh status
  1. 依次修改原节点,并重启zookeeper服务
## 将新机器的配置文件一次复制到三台旧机器,也可手动修改
scp /home/kafka/zookeeper/conf/zoo.cfg [email protected]:/home/kafka/zookeeper/conf/
scp /home/kafka/zookeeper/conf/zoo.cfg [email protected]:/home/kafka/zookeeper/conf/
scp /home/kafka/zookeeper/conf/zoo.cfg [email protected]:/home/kafka/zookeeper/conf/

## 进入到bin目录下重启zookeeper
./zkServer.sh restart
说明:

必须确定一台启动成功后再重新启动下一台

大吉大利

由于每个时刻只有一台机器会停止服务,所以整个集群中是有过半的机器正常工作的。所以整个集群在重启期间也是可以正常对外工作,对用户来说这个重启过程是无感知的
........