中间件
目录
- RabbitMQ概述
- RabbitMQ安装Docker版
- RabbitMQ安装集群版
- 集群搭建步骤
- 搭建镜像队列
- Haproxy+Keepalive 实现高可用负载均衡
- RabbitMQ消息确认机制-可靠抵达
- 可靠抵达-服务端确认(confirmCallback 、returnCallback )
- 可靠抵达-消费端确认(ack)
- RabbitMQ延时队列
- 延时队列实战
- RabbitMQ死信队列
- 死信队列实战
RabbitMQ概述
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
四大核心概念
生产者
产生数据发送消息的程序是生产者
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
RabbitMQ工作原理
Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个 RabbitMQ Server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等
Connection:publisher/consumer和 broker之间的TCP连接
Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销
Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待consumer取走
Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
RabbitMQ安装Docker版
下载RabbitMQ并启动
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
设置开机启动RabbitMQ
docker update rabbitmq --restart=always
访问IP:15672端口,默认的登录账号密码为guest
SHELL方式添加添加新用户
- 创建账号
rabbitmqctl add_user admin 123
- 设置用户角色
rabbitmqctl set_user_tags admin administrator
- 设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
表示:用户admin
具有/
这个virtual host中所有资源的配置、写、读权限 - 当前用户和角色
rabbitmqctl list_users
WEB界面方式添加新用户
RabbitMQ安装集群版
如果RabbitMQ服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ 服务器可以满足每秒1000条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒10万条消息的吞吐量呢?购买昂贵的服务器来增强单机RabbitMQ务的性能显得捉襟见肘,搭建一个RabbitMQ集群才是解决实际问题的关键
集群搭建步骤
修改 3 台机器的主机名称,并重启
vim /etc/hostname
配置各个节点的 hosts 文件,让各个节点都能互相识别对方
vim /etc/hosts
10.211.55.74 node1
10.211.55.75 node2
10.211.55.76 node3
确保各个节点的 cookie 文件使用的是同一个值,在node1上执行远程操作命令,将node1的cookie复制给node2和node3
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以下命令)
rabbitmq-server -detached
在节点 2 执行
rabbitmqctl stop_app
(rabbitmqctl stop 会将Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app(只启动应用服务)
在节点 3 执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
(rabbitmqctl join_cluster rabbit@主机名称 加哪个主机就指定哪个主机的名称)
rabbitmqctl start_app
集群状态
rabbitmqctl cluster_status
重新设置超级管理员用户,在一台机器上运行即可,并使用新账号访问登录
创建账号
rabbitmqctl add_user admin admin
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
解除集群节点(node2 和 node3 机器分别执行)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
忘记集群
rabbitmqctl forget_cluster_node rabbit@node2(node1 机器上执行)
搭建镜像队列
使用镜像队列的原因
如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过publisher confirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。引入镜像队列的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
搭建步骤
启动三台集群节点
随便找一个节点添加 policy策略
- Name:策略名称,随便起
- Pattern:正则规则,只有满足该正则规则的队列,才会起作用
- Apply to:应用于交换机还是队列还是都应用
在 node1 上创建一个队列发送一条消息,队列存在镜像队列
停掉 node1 之后发现 node2 成为镜像队列
就算整个集群只剩下一台机器了 依然能消费队列里面的消息,说明队列里面的消息被镜像队列传递到相应机器里面了
Haproxy+Keepalive 实现高可用负载均衡
HAProxy 提供高可用性、负载均衡及基于TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并 且可靠的一种解决方案,包括 Twitter,Reddit,StackOverflow,GitHub 在内的多家知名互联网公司在使用。 HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。
高可用负载均衡搭建步骤
下载 haproxy(在 node1 和 node2)
yum -y install haproxy
修改 node1 和 node2 的 haproxy.cfg
vim /etc/haproxy/haproxy.cfg
需要修改红色 IP 为当前机器 IP
在两台节点启动 haproxy
haproxy -f /etc/haproxy/haproxy.cfg
ps -ef | grep haproxy
访问地址http://10.211.55.71:8888/stats
RabbitMQ消息确认机制-可靠抵达
- 保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
- publisher confirmCallback确认模式(触发时机:服务端将消息发送给RabbitMQ所在的服务器)
- publisher returnCallback未投递到queue退出模式(触发时机:RabbitmQ所在的服务器调用交换机投递给对应队列)
- consumer ack机制(触发机制:消费端成功获取到消息队列的消息)
可靠抵达-服务端确认(confirmCallback 、returnCallback )
开启发送端确认
- NONE:禁用发布确认模式,默认模式
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE
# 开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
开启发送端消息抵达队列的确认
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 只要发送端消息抵达队列,以异步方式优先回调这个returnConfirm(绑定一起使用)
spring.rabbitmq.template.mandatory=true
定制RabbitTemplate自定义confirmCallback 、returnCallback 触发方法
/*** 定制RabbitTemplate* 1. MQ服务器收到消息就回调* 1. spring.rabbitmq.publisher-confirms=true* 2. 设置回调确认confirmCallback * 2. 消息正确抵达队列进行回调* 1. spring.rabbitmq.publisher-returns=true* 2. spring.rabbitmq.template.mandatory=true* 3. 设置回调确认returnCallback */// PostConstruct: 当MyRabbitConfig对象创建完再执行该方法@PostConstructpublic void initRabbitTemplate() {// 设置MQ服务器收到消息回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 只要消息抵达MQ服务器ack就为true* @param correlationData:当前消息的唯一关联数据(这个是消息的唯一id)即发送时传的CorrelationData参数* @param b:ack,消息是否成功还是失败* @param s:失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("correlationData: " + correlationData);System.out.println("ack: " + b);System.out.println("s: " + s);}});// 设置消息抵达队列回调rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 只要消息没有投递给指定的队列,就触发这个失败回调* @param message:投递失败的消息详细信息* @param i:回复的状态码* @param s:回复的文本内容* @param s1:当时这个消息发送给哪个交换机* @param s2:当时这个消息发送给哪个路由键*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println("fail message: " + message);System.out.println("i: " + i);System.out.println("s: " + s);System.out.println("s1: " + s1);System.out.println("s2: " + s2);}});}
可靠抵达-消费端确认(ack)
保证每个消息被正确消费,此时才可以MQ删除这个消息
- basic.ack 用于肯定确认;MQ服务器会移除此消息
- basic.nack用于否定确认;可以指定MQ服务器是否丢弃此消息,可以批量
- basic.reject用于否定确认;跟nack使用一样,但是不能批量
- 默认是自动ack,只要消息接收到,客户端会自动确认,服务端就会移除这个消息,如果客户端在处理消息时候宕机则会丢失消息,因此要手动确认,保证消息不丢失。当客户端宕机后,消息会从unacked状态变成ready状态,当下一次新的客户端连接进来再将消息重新发送给客户端
# 设置客户端手动确认接受到消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = {"hello-java-queue"})public void receiveMessage1(Message message, Content content, Channel channel) {System.out.println("content1: " + content.toString());// 通道内按顺序自增long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 确认消息接收成功,非批量签收模式// long deliveryTag, boolean multipe (当前消息的标签,是否批量签收)channel.basicAck(deliveryTag, false);// 消息接收成功,但是拒绝签收消息// long deliveryTag, boolean multipe, boolean requeue (当前消息的标签,是否批量签收,是否重新入队(false丢掉消息,true将消息重新入队))channel.basicNack(deliveryTag,false,false);} catch (IOException e) {// 网络中断}}
RabbitMQ延时队列
概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
延时TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这 条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
消息设置TTL
针对每条消息设置TTL
rabbitTemplate.convertAndSend("exchange", "route-key", "消息", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息10秒过期message.getMessageProperties().setExpiration("10000");return message;}
});
队列设置TTL
在创建队列的时候设置队列的x-message-ttl
属性
@Bean("queue")
public Queue queueB() {Map<String, Object> args = new HashMap<>(3);//声明队列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable("queue").withArguments(args).build();
}
两者的区别
如果设置了队列的TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需 要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以 直接投递该消息到消费者,否则该消息将会被丢弃。
延时队列实战
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:
延时队列架构代码
@Configuration
public class TtlQueueConfig {// 普通交换机名称public static final String X_EXCHANGE = "X";// 死信交换机名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";// 普通队列名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";// 死信队列名称public static final String DEAD_LETTER_QUEUE = "QD";// 通用队列名称public static final String QUEUE_C = "QC";// 声明 xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明 yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 A ttl 为 10s 并绑定到对应的死信交换机@Bean("queueA")public Queue queueA() {Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}// 声明队列 A 绑定 X 交换机@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//声明队列 B ttl 为 40s 并绑定到对应的死信交换机@Bean("queueB")public Queue queueB() {Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//声明队列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queue1B).to(xExchange).with("XB");}//声明死信队列 QD@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}//声明死信队列 QD 绑定关系@Beanpublic Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}//声明队列 C 死信交换机@Bean("queueC")public Queue queueC() {Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//没有声明 TTL 属性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 C 绑定 X 交换机@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}}
生产者代码
// 发送延时消息
@GetMapping("sendMsg/{msg}")
public R sendMsg(@PathVariable("msg") String msg) {rabbitTemplate.convertAndSend("X", "XA", "消息为10秒" + msg);rabbitTemplate.convertAndSend("X", "XB", "消息为40秒" + msg);return R.ok();
}// 发送指定延时时间的消息
@GetMapping("sendTtlMsg/{ttl}/{msg}")
public R sendTtlMsg(@PathVariable("ttl") String ttl, @PathVariable("msg") String msg) {rabbitTemplate.convertAndSend("X", "XC", msg, message -> {// 发送消息时候的延时时长message.getMessageProperties().setExpiration(ttl);return message;});return R.ok();
}
消费者代码
// 监听延时队列
@RabbitListener(queues = {"QD"})
public void receiveD(Message message, String content, Channel channel) {System.out.println("接受消息: " + content);
}
实现效果
第一条消息在10S后变成了死信消息然后被消费者消费掉,第二条消息在40S之后变成了死信消息然后被消费掉,这样一个延时队列就打造完成了。
RabbitMQ死信队列
死信概念
死信,顾名思义就是无法被消费的消息,一般来说,producer将消息投递到broker或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中,还有比如说用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
死信队列实战
死信架构代码
@Configuration
public class DeadQueueConfig {//普通交换机名称private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名称private static final String DEAD_EXCHANGE = "dead_exchange";// 声明死信交换机@Bean(DEAD_EXCHANGE)public DirectExchange deadExchange() {return new DirectExchange(DEAD_EXCHANGE);}// 声明死信队列@Bean("dead_queue")public Queue deadQueue() {return new Queue("dead_queue");}// 声明死信队列与死信交换机的绑定关系@Bean("deadBinding")public Binding deadBinding(@Qualifier("dead_queue") Queue deadQueue,@Qualifier(DEAD_EXCHANGE) DirectExchange deadExchange) {return BindingBuilder.bind(deadQueue).to(deadExchange).with("lisi");}// 声明普通交换机@Bean(NORMAL_EXCHANGE)public DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}// 声明普通队列@Bean("normal_queue")public Queue normalQueue() {Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "lisi");return QueueBuilder.durable("normal_queue").withArguments(args).build();}// 声明普通队列与普通交换机的绑定关系@Bean("normalBinding")public Binding normalBinding(@Qualifier("normal_queue") Queue normalQueue,@Qualifier(NORMAL_EXCHANGE) DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("zhansan");}}
生产者代码,设置消息过期TTL
@GetMapping("product")
public R product() {rabbitTemplate.convertAndSend("normal_exchange", "zhansan", "消息", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息10秒过期message.getMessageProperties().setExpiration("10000");return message;}});return R.ok();
}
消费者代码(关闭正常接收队列,模拟接收不到消息,进入死信)
// 正常接受消息队列
// @RabbitListener(queues = {"normal_queue"})
// public void consumer(String content) {
// System.out.println("正常队列接受消息:" + content);
// }// 死信接受消息队列@RabbitListener(queues = {"dead_queue"})public void dead(String content) {System.out.println("死信队列接受消息:" + content);}
消费者代码(模拟队列达到最大长度,进入死信)
args.put("x-max-length", );
// 声明普通队列
@Bean("normal_queue")
public Queue normalQueue() {Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "lisi");// 设置队列长度args.put("x-max-length", 6);return QueueBuilder.durable("normal_queue").withArguments(args).build();
}
消费者代码(模拟消息被拒,进入死信)