基本概念
- publisher:生产者(建立连接 -> 创建 channel -> 声明队列 -> 发送消息 -> 关闭连接和 channel)
- consumer:消费者(建立连接 -> 创建 channel -> 声明队列 -> 订阅消息)
- exchange:交换机,负责消息路由
- Fanout Exchange:广播(所有绑定到该交换机的队列都能收到消息)
- Direct Exchange:路由(根据 routingKey 来发送到指定的队列)
- Topic Exchange: 主题(通过 routingKey 通配符来发送到指定的队列,#匹配一个或对个,*匹配一个)
- queue:队列,存储消息
- 基本消息队列 BasicQueue:一对一
- 工作消息队列 WorkQueue:一对多
- virtualHost:虚拟主机,消息隔离
WorkQueue(TaskQueue)任务模型
多个消费者绑定到一个队列,共同消费队列中的消息
- 默认是平均给每个消费者,如果考虑消费者的处理能力,需要设置 prefetch 属性,每次获取一条,消费一条再获取,达到能者多劳的目的
- 同一条消息只会被一个消费者处理
发布/订阅模型
对比 WorkQueue,多了一个 exchange 交换机
- 生产者发送消息不是发送到队列,而是发送至交换机
- 消费者订阅队列
- exchange 只负责转发消息,不会对消息进行存储,如果没有正确配置队列或路由规则,消息就会丢失
如何保证消息可靠性
要确保发送的消息至少被消费一次,需要从源头开始,生产者是否成功发送消息到交换机,交换机是否正常发送消息到队列,队列里的消息是否成功被消费者消费
RabbitMQ 提供了 publisher confirm 和 publish-return 机制来避免消息在发送的过程中丢失
publish-confirm
- 如果消息成功投递到交换机,返回 ack
- 如果消息未投递到交换机,返回 nack
publish-return
- 如果消息投递到交换机了,但是没有路由到队列,返回 ack 及路由失败原因
考虑到 MQ 本身的稳定,可以分别将交换机,队列,消息都设置为持久化
如何解决消息堆积问题
当发送消息超过了处理消息的速度,就会造成消息堆积,如果队列中的消息达到上限,那么最早受到的消息,可能就会变成死信,然后被丢弃
消息堆积的本质是消费者消费消息的速度跟不上,所以可以从下面几个方面来进行考虑
- 增加消费者个数,那么速度就成倍提高了
- 增加队列长度,可以暂存更多的消息