MENU

Kafka消息队列

• 2020 年 10 月 08 日 • 默认分类,微服务,Linux

Kafka:分布式,支持分区,复制,基于zookeeper协调的消息中间件

Kafka结构:
生产者:生产信息,并将消息推送到指定的kafka节点的分区中
kafka-server:消息队列,存储生产者推送的消息,并将消息推送给消费者消费
topic:主题,对不同类型的消息可以通过创建不同的主题进行区分
partition:分区,消息的载体,是一个日志文件,文件中存储的是生产者推送的消息,文件中的消息被成功消费之后不会被清除,在指定的时间周期内会被保留,分区支持复制,可以在其他节点创建从分区,消息的接收和推动都是由主分区完成,存储同一类型消息的分区属于同一个主题,也就是说可以把一个主题划分为一个或多个分区存储消息
Offset:偏移量,因为被消费成功的消息不会在分区中删除,所以消费者可以通过指定偏移量来消费不同的消息
消费者组:消费者以组的形式向kafk获取消息,组中是多个消费者实例,要求属于同一个组的消费者不能消费同一个分区中的消息,不同组的消费者可以消费同一个分区中的消息
Zookeeper:服务注册中心,分布式架构中的协调程序,用于记录每隔kafka节点的信息以及创建的主题,分区,消费者所使用的偏移量的信息,实现kafka节点之间的通信

节点状态:
follower:从节点,备份
leader:主节点,工作

服务器信息

IPHostname
172.16.1.12zoo-kafka-A
172.16.1.13zoo-kafka-B
172.16.1.14zoo-kafka-C

加压软件包

#所有节点执行
tar zxf kafka_2.11-1.0.1.tgz
tar zxf zookeeper-3.3.6.tar.gz
mv zookeeper-3.3.6 /usr/local/zookeeper
cp zoo_sample.cfg zoo.cfg
mv kafka_2.11-1.0.1 /usr/local/kafka

配置zookeeper

创建数据、日志存储目录
mkdir /usr/local/zookeeper/data
mkdir /usr/local/zookeeper/datalog
修改配置文件
# The number of milliseconds of each tick
tickTime=2000        # 节点之间发送心跳包的时间,单位毫秒
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10        # 单位个,乘以心跳包就是时间,新加入节点初始化的时间
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5    # 单位个,乘以心跳包就是时间,节点连接的超时等待时间
vi zoo.cfg
dataDir=/usr/local/zookeeper/data    # 数据存放目录
dataLogDir=/usr/local/zookeeper/datalog    # 日志存放目录
clientPort=2181
# 集群信息
server.1=172.16.1.12:2888:3888   # 节点之间通讯的端口和节点之间选取leader的端口
server.2=172.16.1.13:2888:3888
server.3=172.16.1.14:2888:3888

2181端口是zookeeper和kafka通信的端口
2888端口是三个节点互相通信的端口
3888端口是三个节点之间选举主主节点的端口

拷贝配置文件到其他节点
scp /usr/local/zookeeper/conf/zoo.cfg root@172.16.1.13:/usr/local/zookeeper/conf/
scp /usr/local/zookeeper/conf/zoo.cfg root@172.16.1.14:/usr/local/zookeeper/conf/
进入data目录创建一个myid文件

三台节点都需要创建,每个节点的id不同

#NodeA:
cd  /usr/local/zookeeper/data/
echo 1 > myid
#NodeB:
cd  /usr/local/zookeeper/data/
echo 2 > myid
#NodeC:
cd  /usr/local/zookeeper/data/
echo 3 > myid
放行防火墙
firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --add-port=2888/tcp --permanent
firewall-cmd --add-port=3888/tcp --permanent
firewall-cmd --reload
setenforce 0
启动服务
#NodeA:
ln -s /usr/local/zookeeper/bin/* 
zkServer.sh start
zkServer.sh status
#NodeB:
ln -s /usr/local/zookeeper/bin/* 
zkServer.sh start
zkServer.sh status
#NodeC:
ln -s /usr/local/zookeeper/bin/* 
zkServer.sh start
zkServer.sh status   # 查看状态,要确定有两个follower,一个leader

配置kafka

修改配置文件
vi /usr/local/kafka/config/server.properties
broker.id=1                                    //kafka节点的唯一标识,与myid符合
listeners=PLAINTEXT://172.16.1.12:9092        //指定当前节点监听的IP与端口
log.dirs=/usr/local/kafka/data                //指定该节点主题与分区存放位置,也就是消息存放位置需要创建
message.max.byte=1024000                    //指定生产者推送单个消息的最大字节数
defaults.replication.factors=2                //默认follower的个数
replica.fetch.max.bytes=1024000                //指定消费者单个消息的最大字节数
num.rtition=1                                //每个parttion创建一个队列
zookeeper.connect=172.16.1.12:2181,172.16.1.13:2181,172.16.1.14:2181        //指定zookeeper节点的IP与端口
将配置文件拷贝到其他节点
scp server.properties root@172.16.1.13:/usr/local/kafka/config/
scp server.properties root@172.16.1.14:/usr/local/kafka/config/
配置其他节点文件
[root@zoo-kafka-B ~]# vi /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://172.16.1.13:9092

[root@zoo-kafka-C ~]# vi /usr/local/kafka/config/server.properties 
broker.id=3
listeners=PLAINTEXT://172.16.1.14:9092
创建日志文件存放目录
mkdir /usr/local/kafka/data
放行端口
firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload
启动服务
ln -s /usr/local/kafka/bin/* /usr/local/bin
[root@zoo-kafka-A ~]# cd /usr/local/kafka/bin
[root@zoo-kafka-A bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@zoo-kafka-B ~]# cd /usr/local/kafka/bin
[root@zoo-kafka-B bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 
[root@zoo-kafka-C ~]# cd /usr/local/kafka/bin/
[root@zoo-kafka-C bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 

netstat -anpt | grep 9092

选项:
–daemon:后台运行

创建主题分区
[root@zoo-kafka-A ~]# sh kafka-topics.sh --create --zookeeper 172.16.1.12:2181 --replication-factor 3 --partitions 1 --topic pay

选项:
–create:创建
–zookeeper:指定将kafka节点信息注册到哪个zookeeper节点
–replication-factor:指定该主题和分区创建在几个几点上
–partitions:该主题创建几个分区
–topic:主题的名字

模拟生产者
[root@zoo-kafka-A ~]# sh kafka-console-producer.sh --broker-list 172.16.1.12:9092 --topic pay

选项:
–broker-list:指定连接哪个kafka节点
–topic:指定将消息推送到哪个主题

模拟消费者
[root@zoo-kafka-B ~]# sh kafka-console-consumer.sh --zookeeper 172.16.1.12:2181 --from-beginning --topic pay

选项:
–zookeeper:将消费者的偏移量记录到指定的zookeeper
–from-beginning:消费者从第一条消息开始消费
–topic:指定从哪个主题消费信息