模式
点对点模式:一个消息被一个消费者消费;
发布订阅模式:一个消息被多个消费者消费。
交换机
交换机负责消息转发,不具备存储消息的能力,没有匹配的队列时,消息会丢失。
扇形交换机:Fanout ,广播,将消息发给所有绑定到交换机的队列;
直连交换机:Direct,定向,消息的 routing key 和 队列 binding key 对应投递;
主题交换机:Topic,通配符,将消息交给符合指定 routing pattern 的队列;
消息流程
生产者向交换机发送一条消息,交换机通过 routing key 和队列绑定规则(binding key),将消息分发到队列。
消费模式
- 推:消费者用 channel.basicConsume 订阅队列,RabbitMQ 会推送消息给消费者;
- 拉:消费者用 channel.basicGet 主动从指定队列拉取消息;
消息队列用途
解耦:功能模块之间通过消息进行关系解耦;
削峰填谷:保护系统不因短期大流量冲击而服务崩溃;
冗余数据:防止系统在保存数据时崩溃导致数据丢失;
顺序消费:能保持消息被顺序消费;
死信交换机
死信(Dead Letter)产生原因可能是:
- 消费者拒收了,basic.reject、basic.nack、requeue=false
- 消息超时过期了,ttl
- 消息数量超出队列长度了
配置死信交换机
策略(Policy)和队列参数(Optional Queue Arguments)两种方式。
策略
1 | rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues |
这里将名为 my-dlx 的死信交换机应用到全部队列。
队列参数
1 | Map<String, Object> queueArgs = new HashMap<String, Object>(); |
死信路由
- 队列参数中定义 x-dead-letter-routing-key;
- 否则使用消息上的 routing key 去找死信交换机;
死循环
死信路由出现死循环,死信就会被丢弃。如:死信没定义 routing key 并投递到默认交换机。
消息不丢失
- 消息生产阶段:从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 MQ Broker 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,这个阶段是不会出现消息丢失的。
- 消息存储阶段:这个阶段一般会直接交给 MQ 消息中间件来保证,但是你要了解它的原理,比如 Broker 会做副本,保证一条消息至少同步两个节点再返回 ack。
- 消息消费阶段:消费端从 Broker 上拉取消息,只要消费端在收到消息后,不立即发送消费确认给 Broker,而是等到执行完业务逻辑后,再发送消费确认,也能保证消息的不丢失。
- 进阶:给每个发出的消息都指定一个全局唯一 ID,或者附加一个连续递增的版本号,然后在消费端做对应的版本校验。利用拦截器机制。在生产端发送消息之前,通过拦截器将消息版本号注入消息。
消息重复消费
建立消息消费日志表。查询消费日志后再去消费,再更新消费日志
消息堆积
原因
- 消费者数量不足;
- 消费者处理速度慢;
- 消息生产速度过快;
- 队列没有被正确地消费;
- 消费者拒绝消费(basic.reject、basic.nack):
- basic.reject 拒绝单个消息,requeue=true,重新排队,堆积
- basic.nack 一次性拒绝多个消息,requeue=true,重新排队,堆积
解决
- 增加消费者数量,加快消息的消费速度;
- 优化消费者的处理逻辑,提高消费速度;
- 增加队列的大小,缓解消息积累的问题;
- 限制消息的生产速度,防止消息过快地进入队列;
- 检查消费者是否正常工作,以及是否正确地处理了消息;
- 配置消息过期时间,及时清理过期消息;