MENU

RabbitMQ消息队列

• 2020 年 10 月 23 日 • 默认分类,Linux,微服务

消息队列:用来存储不需要立即响应的消息,可以实现削峰、异步、解耦的作用;

消息队列中间件应用场景:

异步处理: 在注册服务的时候,如果同步串行化的方式处理,让存储数据、邮件通知等挨着完成,延迟较大,采用消息队列,可以将邮件服务分离开来,将邮件任务之间放入消息队列中,之后返回,减少了延迟,提高了用户体验;
流量消峰(消除高峰):如淘宝购物节,先进先出原则。 秒杀活动中,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。 服务器在接收到用户请求后,首先写入消息队列。这时如果消息队列中消息数量超过最大数量,则直接拒绝用户请求或返回跳转到错误页面。秒杀业务根据秒杀规则读取消息队列中的请求信息,进行后续处理;
架构解耦: 电商里面,在订单与库存系统的中间添加一个消息队列服务器,在用户下单后,订单系统将数据先进行持久化处理,然后将消息写入消息队列,直接返回订单创建成功,然后库存系统使用拉/推的方式,获取订单信息再进行库存操作;

队列框架

RabbitMQ server:消息队列服务,用于接收生产者产生的消息,并将消息分配给消费者
producer:生产者,生产消息,消息分为两部分,标签(用于匹配规则)和数据
consumer:消费者,用来消费队列分配的消息,处理完请求之后要给队列发送ack回应
ack回应:用来判断当前的消息是否被消费,如果被消费,消息将会被删除,如果没有,则重新回转等待被消费
RoutingKey:路由关键字 ,exchange根据这个关键字进行消息投递。
exchange:代表交换机,接收生产者的消息,根据自身的匹配规则,路由到哪个队列queue
connection:用来tcp连接producer、consumer和exchange
channel:虚拟通道, 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
queue:队列(消息的载体)用于存储消息;

生产者流程

1.生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
2.生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
3.生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
4.生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来
5.生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
6.相应的交换器根据接收到的 routingKey 查找相匹配的队列。
7.如果找到,则将从生产者发送过来的消息存入相应的队列中。
8.如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

消费者流程

1.消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
2.消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作
3.等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
4.消费者确认( ack) 接收到的消息。
5.RabbitMQ 从队列中删除相应己经被确认的消息。

RabbitMQ常用端口号:

5672: 程序使用端口号
15672: 控制台端口号
25672 : 集群使用端口号

不同MQ的作用:

1.ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
2.Kafka
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,
追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,
适合产生大量数据的互联网服务的数据收集业务。
3.RocketMQ
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起
源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消
息推送、日志流式处理、binglog分发等场景。
4.RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和
发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在
其次。

服务器信息

IPHostname
172.16.1.12rabbit-A
172.16.1.13rabbit-B
172.16.1.14rabbit-C

修改主机名

CODE
rabbit-A:hostnamectl set-hostname rabbit-A
rabbit-B:hostnamectl set-hostname rabbit-B
rabbit-C:hostnamectl set-hostname rabbit-C

安装依赖

CODE
yum -y install socat

安装rpm软件

需要将软件包导入服务器中

CODE
rpm -ivh erlang-18.1-1.el7.centos.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.15-1.el7.noarch.rpm

修改hosts

CODE
vi /etc/hosts
172.16.1.12 rabbit-A
172.16.1.13 rabbit-B
172.16.1.14 rabbit-C

scp /etc/hosts root@172.16.1.13:/etc/
scp /etc/hosts root@172.16.1.14:/etc/

启动服务

CODE
systemctl restart rabbitmq-server.service
netstat -lnpt | grep beam

放行防火墙

CODE
firewall-cmd --add-port=4369/tcp --permanent
firewall-cmd --add-port=5672/tcp --permanent
firewall-cmd --add-port=15672/tcp --permanent
firewall-cmd --add-port=25672/tcp --permanent
firewall-cmd --reload
setenforce 0

拷贝cookie文件到其他节点

保证cokkie文件一致

CODE
scp /var/lib/rabbitmq/.erlang.cookie root@172.16.1.13:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@172.16.1.14:/var/lib/rabbitmq/

重启服务

CODE
# 三台节点
systemctl restart rabbitmq-server.service

rabbit-BC加入集群

CODE
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit-A
rabbitmqctl start_app

rabbit-A查看集群状态

CODE
rabbitmqctl cluster_status

加载web管理插件

CODE
rabbitmq-plugins enable rabbitmq_management

创建管理员用户

CODE
rabbitmqctl add_user admin {passwd}
rabbitmqctl set_user_tags admin administrator    //修改admin用户权限为管理员

访问

CODE
firefox 172.16.1.12:15672

节点剔除

CODE
# 在需要被剔除的节点执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

Docker安装

CODE
docker pull rabbitmq:3.7.7-management 拉取mq镜像
docker run -itd --name rabbit -p 127.0.0.1:5672:5672 -p 127.0.0.1:15672:15672 2888deb59dfc /bin/bash 创建容器
docker exect -it rabbit /bin/bash     // 进入容器,修改配置文件