Kafka单机部署

释放双眼,带上耳机,听听看~!

这篇文章直接上手安装,如需了解Kafka原理的同学请看Kafka的第一篇文章:https://abcops.cn/kafka-theory-one/

下载安装

目前Kafka的最新的源码包版本为2.2.1,二进制版本为2.12-2.2.1
一般好多人都采用二进制部署,因为解压即用,快捷方便,再此也选择二进制安装,官方建议用二进制的2.12版本
Kafka官方下载地址:http://kafka.apache.org/downloads
Kafka_2.12下载地址:wget http://mirror.bit.edu.cn/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz

解压kafka

wget http://mirror.bit.edu.cn/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar xf kafka_2.12-2.2.1.tgz -C /usr/local/
mv /usr/local/kafka_2.12-2.2.1 /usr/local/kafka

添加Kafka环境变量

cat << EOF >> /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF

source /etc/profile

ZooKeeper部署

ZooKeeper官方站点:https://zookeeper.apache.org/
Kafka使用ZooKeeper来管理,因此需要安装ZooKeeper,并且要先启动ZooKeeper。
但是在Kafka中内置了ZooKeeper配置文件:/usr/local/kafka/config/zookeeper.properties,单机部署我们选择Kafka所提供的ZK配置文件来使用ZK,但是如果是集群环境的Kafka,建议使用单独的ZooKeeper来管理Kafka集群,以下我们选择使用Kafka内置的ZooKeeper来管理单节点的Kafka,如果你想单独部署ZooKeeper,可以参考:https://abcops.cn/zookeeper-single-deployment/

注意:ZooKeeper需要允许Java环境,需要先部署JDK

JDK下载链接:https://pan.baidu.com/s/1uvktlm-6AqhQKJQZR1nQuw
tar xf jdk-8u161-linux-x64.tar.gz  -C /usr/local/

添加环境变量

cat << EOF >> /etc/profile
#################JAVA#################
export JAVA_HOME=/usr/local/jdk1.8.0_161
export JRE_HOME=\$JAVA_HOME/jre
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar:\$JRE_HOME/lib
export PATH=\$JAVA_HOME/bin:\$JRE_HOME/bin:\$PATH
EOF

source /etc/profile
java -version

修改ZooKeeper配置文件

配置文件详解

cat /usr/local/kafka/config/zookeeper.properties
tickTime=2000       
#ZooKeeper服务器之间或客户单与服务器之间维持心跳的时间间隔,单位是毫秒,默认为2000。

initLimit=10
#zookeeper接受客户端(这里所说的客户端不是用户连接zookeeper服务器的客户端,而是zookeeper服务器集群中连接到leader的follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。
#当已经超过10个心跳的时间(也就是tickTime)长度后 zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20秒。

syncLimit=5
#标识ZooKeeper的leader和follower之间同步消息,请求和应答时间长度,最长不超过多少个tickTime的时间长度,总的时间长度就是5*2000=10秒。

dataDir=/data/ZK
#存储内存数据库快照的位置;ZooKeeper保存Client的数据都是在内存中的,如果ZooKeeper节点故障或者服务停止,那么ZooKeeper就会将数据快照到该目录当中。

dataLogDir=/data/zk-logs
#如果没提供的话使用的则是dataDir。zookeeper的持久化都存储在这两个目录里。dataLogDir里是放到的顺序日志(WAL)。而dataDir里放的是内存数据结构的snapshot,便于快速恢复。为了达到性能最大化,一般建议把dataDir和dataLogDir分到不同的磁盘上,这样就可以充分利用磁盘顺序写的特性

clientPort=2181
#ZooKeeper客户端连接ZooKeeper服务器的端口,监听端口

maxClientCnxns=60
#ZooKeeper可接受客户端连接的最大数量,默认为60

autopurge.snapRetainCount=3
#ZooKeeper要保留dataDir中快照的数量

autopurge.purgeInterval=1
#ZooKeeper清楚任务间隔(以小时为单位),设置为0表示禁用自动清除功能

server.1=localhost:2888:3888
#指定ZooKeeper集群主机地址及通信端口
#1 为集群主机的数字标识,一般从1开始,三台ZooKeeper集群一般都为123
#localhost 为集群主机的IP地址或者可解析主机名
#2888 端口用来集群成员的信息交换端口,用于ZooKeeper集群节点与leader进行信息同步
#3888 端口是在leader挂掉时或者刚启动ZK集群时专门用来进行选举leader所用的端口

ZK配置文件内容

cat /usr/local/kafka/config/zookeeper.properties
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/ZK
dataLogDir=/data/zk-logs
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=localhost:2888:3888

Kafka目录文件介绍

Kafka根目录文件介绍

ls kafka_2.12-2.2.1/ -l
total 52
drwxr-xr-x 3 root root  4096 May 14 00:18 bin                   #Kafka脚本程序目录
drwxr-xr-x 2 root root  4096 Jun 24 16:41 config                #Kafka配置文件目录
drwxr-xr-x 2 root root   215 Jun 23 00:56 kafka-logs            #Kafka数据存储地址
drwxr-xr-x 2 root root  4096 Jun 23 00:49 libs                  #Kafka开发程序包目录
-rw-rw-r-- 1 root root 32216 May 14 00:10 LICENSE
drwxr-xr-x 2 root root   182 Jun 23 00:51 logs                  #Kafka日志目录
-rw-rw-r-- 1 root root   336 May 14 00:10 NOTICE
drwxr-xr-x 2 root root    44 May 14 00:18 site-docs             #Kafka站点文档

Kafka执行程序介绍

ls /usr/local/kafka/bin/ -l
total 132
-rwxrwxr-x 1 root root  945 May 13 16:10 kafka-console-consumer.sh              #Kafka消费消息脚本
-rwxrwxr-x 1 root root  944 May 13 16:10 kafka-console-producer.sh              #Kafka生产消息脚本
-rwxrwxr-x 1 root root  871 May 13 16:10 kafka-consumer-groups.sh               #Kafka消费者组脚本
-rwxrwxr-x 1 root root  863 May 13 16:10 kafka-topics.sh                        #Kafka创建topic脚本
-rwxrwxr-x 1 root root 1393 May 13 16:10 zookeeper-server-start.sh              #Kafka内置ZK启动脚本
-rwxrwxr-x 1 root root 1001 May 13 16:10 zookeeper-server-stop.sh               #Kafka内置ZK关闭脚本

修改Kafka配置

server.properties
配置文件参数解释请看:

cat /usr/local/kafka/config/server.properties 
broker.id=1
listeners=PLAINTEXT://172.17.0.2:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs/
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.17.0.2:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

启动Kafka

启动ZooKeeper:

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties

查看进程以确保zk启动
ss -naplt | grep 2181
LISTEN     0      50           *:2181                     *:*                   users:(("java",pid=830,fd=101))

启动Kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

查看进程连接

jps
1858 QuorumPeerMain
2280 Jps
2201 Kafka

下面通过过滤端口号可以看到
ZooKeeper监控本地地址TCP端口2181,可以ZooKeeper看到2181后面对应的还有一个端口号为43354 
Kafka监控本地地址TCP端口9092,可以看到Kafka9092后面也有对应的一个端口号55036
netstat -anplt | egrep "(2181|9092)"  
tcp        0      0 172.17.0.2:9092         0.0.0.0:*               LISTEN      1041/java           
tcp        0      0 0.0.0.0:2181            0.0.0.0:*               LISTEN      700/java            
tcp        0      0 172.17.0.2:2181         172.17.0.2:43354        ESTABLISHED 700/java            
tcp        0      0 172.17.0.2:43354        172.17.0.2:2181         ESTABLISHED 1041/java           
tcp        0      0 172.17.0.2:9092         172.17.0.2:55036        ESTABLISHED 1041/java           
tcp        0      0 172.17.0.2:55036        172.17.0.2:9092         ESTABLISHED 1041/java 

查看2181端口的监听状态
lsof -i:2181
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java     700 root  101u  IPv4  23940      0t0  TCP *:eforward (LISTEN)
java     700 root  103u  IPv4  18120      0t0  TCP kafka_node1:eforward->kafka_node1:43354 (ESTABLISHED)
java    1041 root  101u  IPv4  25680      0t0  TCP kafka_node1:43354->kafka_node1:eforward (ESTABLISHED)

查看9092的端口监听状态
lsof -i:9092
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    1041 root  158u  IPv4  24974      0t0  TCP kafka_node1:XmlIpcRegSvc (LISTEN)
java    1041 root  174u  IPv4  30888      0t0  TCP kafka_node1:55036->kafka_node1:XmlIpcRegSvc (ESTABLISHED)
java    1041 root  177u  IPv4  29475      0t0  TCP kafka_node1:XmlIpcRegSvc->kafka_node1:55036 (ESTABLISHED)

管理Kafka

接下来我们操作下Kafka,我们会通过kafka-topics.sh新建一个Topic,然后使用kafka-console-producer.sh消息生产脚本来生产消息到Topic中,再由kafka-console-consumer.sh消息消费者消费消息,以及常用的选项介绍。

我们本章使用的Kafka版本为kafka_2.12-2.2.1.tgz,但是在Kafka的上一个版本中kafka_2.11-2.1.0.tgz,Kafka的命令配置发生了改变,以下我们把两个版本的命令都写出来

新版本:2.12-2.2.1
老版本:2.11-2.1.0

创建Topic主题

新版本:
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 172.17.0.2:9092 --replication-factor 1 --partitions 1 --topic kafka_new_01

老版本:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.17.0.2:2181 --replication-factor 1 --partitions 1 --topic kafka_old_01

选项解释:
--create:创建新的Topic
--bootstrap-server:指定要哪台Kafka服务器上创建Topic,主机加端口,指定的主机地址一定要和配置文件中的listeners一致
--zookeeper:指定要哪台zookeeper服务器上创建Topic,主机加端口,指定的主机地址一定要和配置文件中的listeners一致
--replication-factor:创建Topic中的每个分区(partition)中的复制因子数量,即为Topic的副本数量,建议和Broker节点数量一致,如果复制因子超出Broker节点将无法创建
--partitions:创建该Topic中的分区(partition)数量
--topic:指定Topic名称

区别:
--bootstrap-server:新版本为bootstrap-server,新版本把Topic创建到了Kafka中,也就意味着信息要被存储到Broker节点的Topic中
--zookeeper:老版本为zookeeper,老版本是把Topic创建到了zookeeper中,意味着信息要被存储到ZooKeeper应用的Topic中

总结:我们的这个2.2.1版本这两种目前都支持,但是官方建议使用新版本的选项

查看已创建的Topic

新版本:
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 172.17.0.2:9092
kafka_new_01
kafka_old_01

老版本:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 172.17.0.2:2181
kafka_new_01
kafka_old_01

区别:
使用不同的选项要指定不同的服务和端口

生产消息
注意:生产消息在新版本中未发生改变,同样是向Broker节点发送消息,这也证实了producer生产者无需和ZooKeeper节点产生连接,只需和Broker建立连接即可,如果了解更多kafka的架构及工作原理,请看该篇文章的架构图部分:https://abcops.cn/kafka-theory-one/

以下向两个Topic中发送不同的消息,该消息内容为手动输入
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092 --topic kafka_new_01
>Hello Kafka_new_01
>I'm the new version

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092 --topic kafka_old_01
>Hello Kafka_old_01
>I'm the old version


参数解释:
--broker-list:指定使用哪台broker来生产消息
--topic:指定要往哪个Topic中生产消息

消费消息
消费信息命令也没发生什么改变

参数解释:
--bootstrap-server:指定要在哪台broker节点消费消息,指明IP和端口。
--topic:指定要在哪个Topic中消费消息。
--from-beginning:获取所有未被消费的信息

消费Kafka_new_01
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092 --topic kafka_new_01 --from-beginning
Hello Kafka_new_01
I'm the new version

消费Kafka_old_01
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092 --topic kafka_old_01 --from-beginning
Hello Kafka_old_01
I'm the old version

查看Topic详情

新版本命令:
/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 172.17.0.2:9092 --topic kafka_new_01
Topic:kafka_new_01  PartitionCount:1    ReplicationFactor:1 Configs:segment.bytes=1073741824
    Topic: kafka_new_01 Partition: 0    Leader: 1   Replicas: 1 Isr: 1

老版本命令:
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 172.17.0.2:2181 --describe --topic kafka_new_01
Topic:kafka_new_01  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: kafka_new_01 Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    
参数解释:
Topic:kafka_new_01:topic名称
PartitionCount:1:分片数量
ReplicationFactor:1:Topic副本数量

删除Topic

#使用--bootstrap-server删除
/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 172.17.0.2:9092 --topic kafka_new_01

#使用--zookeeper删除
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 172.17.0.2:2181 --topic kafka_old_01
Topic kafka_old_01 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
虽然有提示说"kafka_old_01"只是标记被删除,但是没关系

查看已创建Topic,我们创建的两个Topic已经被删除掉了,但是Kafka自动生成了一个叫__consumer_offsets的Topic
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 172.17.0.2:9092
__consumer_offsets
Kafka

Kafka深入原理

2019-6-17 12:42:01

Kafka

Kafka集群搭建部署

2019-7-3 16:20:37

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索