02.RabbitMQ原理常识原理
RabbitMQ 是基于 AMQP 协议的消息中间件,具有高可靠性、灵活路由、集群化、高可用队列等特点,广泛用于异步处理、系统解耦、流量削峰等场景。
消息生产者通过 Exchange 将消息路由到 Queue,消费者从 Queue 获取消息并处理。
支持多种 Exchange 类型:Direct、Fanout、Topic、Headers 来实现不同的路由机制。
RabbitMQ 通过持久化、ACK确认、消费端限流、TTL、死信队列等机制保障消息可靠传递与系统健壮性。
# 01.RabbitMQ使用场景
# 1、What 是什么?
- RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件
- 它最初起源于金融系统,用于在分布式系统中存储转发消息
可靠性(Reliablity)
:使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认灵活的路由(Flexible Routing)
:- 在消息进入队列之前,通过 Exchange 来路由消息
- 对于典型的路由功能,Rabbit 已经提供了一些内置的 Exchange 来实现
- 针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的 Exchange
消息集群(Clustering)
:多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker高可用(Highly Avaliable Queues)
:- 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用
多种协议(Multi-protocol)
:支持多种消息队列协议,如 STOMP、MQTT 等多种语言客户端(Many Clients)
:几乎支持所有常用语言,比如 Java、.NET、Ruby 等管理界面(Management UI)
:提供了易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面跟踪机制(Tracing)
:如果消息异常,RabbitMQ 提供了消息的跟踪机制,使用者可以找出发生了什么插件机制(Plugin System)
:提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件
# 2. Why 为什么使用RabbitMQ?
- RabbitMQ适用于多种场景,尤其是需要可靠消息传递和灵活路由的系统中
- 异步处理:
- 通过消息队列,能够减少请求的延迟,提升用户体验
- 例如,在用户注册时,消息队列可以异步发送邮件通知,降低实时响应的延迟
- 解耦:
- RabbitMQ可以使系统模块独立工作
- 例如,
- 在电商系统中,订单与库存系统通过消息队列进行解耦
- 确保高效处理订单的同时,降低对库存系统的直接依赖
- 流量削锋:
- 在高并发的秒杀活动中,RabbitMQ可以用于缓冲请求,防止系统被瞬间的高流量压垮
- 可靠传递:
- RabbitMQ支持消息确认机制,确保消息从生产者到消费者的可靠传递,避免消息丢失
# 3. Where 使用场景?
- 金融交易系统:需要高可靠性和顺序性的场景
- 电商平台:订单处理、库存管理和通知服务等模块间的解耦
- 分布式系统中的服务通信:不同服务之间的消息传递,例如微服务架构中的服务通信
- 日志和监控系统:处理大量实时数据时,RabbitMQ可用作消息传递的中间层,保证高效、低延迟的数据流
# 4、How 如何工作?
- Producer:
- 消息生产者,它负责生成消息并通过Exchange发送到RabbitMQ系统
- Exchange:
- 消息交换机,负责根据消息的Routing Key和绑定的路由规则,将消息路由到不同的Queue
- RabbitMQ提供了多种Exchange类型,如Direct、Fanout、Topic和Headers
- Queue:
- 消息队列,存储从Exchange接收到的消息,等待被消费者消费
- 队列支持持久化、临时和自动删除等特性
- Consumer:
- 消息消费者,从队列中消费消息,并通过ACK机制确认消息的接收情况
- Routing Key:
- 用于消息路由的关键字段,Exchange根据它决定将消息分发到哪个Queue
# 02.RabbitMQ原理
# 0、架构图
# 1、核心组件
# 1)Publisher(生产者)
- 生产者负责生成消息并发送给
RabbitMQ 的 Exchange
- Publisher 通常是业务系统的一个部分
- 比如订单系统在订单创建后发送一条消息,通知其他系统
# 2)Consumer(消费者)
- 消费者从 RabbitMQ 中接收消息并进行处理
- 它是消息的最终接收者,可能是多个不同的服务,如库存系统、通知系统等
- 消费者在收到消息后会发送 ACK 确认消息已成功处理
# 3)Exchange(交换机)
- 消息并
不是直接被投递到 Queue(消息队列)
中的,中间还必须经过 Exchange(交换器)
这一层- Exchange(交换器) 用来接收生产者发送的消息,并将这些消息路由给服务器中的队列中
- Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中
- RabbitMQ 的 Exchange(交换器) 有4种类型,不同的类型对应着不同的路由策略
direct (默认)
:把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中fanout
:广播模式
,将消息路由到所有与该 Exchange 绑定的队列,不考虑Routing Key
topic
:基于模式匹配
的路由机制,允许使用通配符进行复杂的路由逻辑
headers
:根据消息头部属性进行路由,灵活性更强,但应用相对少见Exchange(交换器) 示意图如下
- 生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则
- 而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效
- RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来
- 在绑定的时候一般会指定一个 BindingKey(绑定建)
- 这样 RabbitMQ 就知道如何正确将消息路由到队列了
# 4)Queue(队列)
- Queue 是消息最终的存储地点,消费者从队列中获取消息并进行处理
- RabbitMQ 提供了三种不同类型的队列配置
- 持久化队列(Durable Queue):
- 消息持久化到硬盘,即使 RabbitMQ 重启,队列中的消息也不会丢失,适合需要高可靠性保证的场景
- 临时队列(Transient Queue):
- 消息存储在内存中,系统重启时数据会丢失,适用于对可靠性要求不高的场景
- 自动删除队列(Auto-Delete Queue):
- 当没有消费者连接到队列时,队列会自动删除,用于短期使用的队列场景
# 5)其他组件
Routing Key(路由键)
Routing Key 是生产者在发送消息时附带的一个标识,用来指示消息应该如何被路由到队列
它是 Exchange 根据不同的路由规则选择队列的重要依据
Broker(代理)
RabbitMQ 作为一个消息代理(Broker),负责接收和分发消息
Broker 是生产者和消费者之间的中介,保证消息能够可靠地从生产者传递到消费者
Virtual Host(虚拟主机)
Virtual Host 是用来隔离不同用户或不同应用的权限机制
在一个 RabbitMQ 实例中可以创建多个 Virtual Host,每个 Virtual Host 可以独立配置队列、交换机、绑定等
Connection
- 是客户端与 RabbitMQ Broker 之间的 TCP 连接
- 每个生产者和消费者都需要先建立连接才能发送或接收消息
Channel
- 是多路复用的机制,允许在一个 Connection 上开启多个 Channel,减少了大量连接的开销
- 每个 Channel 是一个轻量级的通信通道,通常用于不同的消息流或队列操作
# 2、消息流转过程
- 消息生成:生产者(Publisher)生成一条消息,并将其发送给指定的 Exchange
- 消息路由:
- Exchange 根据
Routing Key
和其类型(Direct、Fanout、Topic、Headers),将消息分发到绑定的队列(Queue)中
- Exchange 根据
- 消息存储:
- 消息到达队列后,等待消费者(Consumer)读取如果队列配置为持久化,消息会被存储到磁盘中
- 消息消费:
- 消费者从队列中获取消息并进行处理
- 处理完成后,消费者会发送一个 ACK 给 RabbitMQ,确认消息已经被成功处理
- 消息确认:
- 当 RabbitMQ 收到消费者的 ACK 后,会删除该条消息
- 如果消费者处理失败,RabbitMQ 可以重新将消息放回队列中,等待下一个消费者处理
# 3、Exchange 和 Queue 的具体交互
- RabbitMQ 的核心组件是 Exchange 和 Queue
- 两者通过绑定关系和 Routing Key 进行消息路由
生产者发送消息:生产者将消息发送给 Exchange,并附带一个 Routing Key
消息路由到队列:Exchange 根据消息的 Routing Key 和与队列的绑定规则,将消息路由到对应的队列中
消息进入队列:队列接收到消息并存储,等待消费者进行消费
消费者消费消息:消费者从队列中读取消息并处理
确认消息:消费者处理完消息后,发送 ACK 确认,队列从中删除该消息
通过这种机制,RabbitMQ 实现了消息的灵活路由、可靠存储和消费确认
# 03.重要功能
- 消费端限流用于防止消费者过载;
- 重回队列确保未成功消费的消息不会丢失;
- TTL用于控制消息的存活时间,防止过期消息积压;
- 死信队列用于处理无法正常消费的消息,保证系统的健壮性
# 1、消费端限流
概念:
RabbitMQ通过消费端限流来控制消费者一次性从队列中获取的消息数量
以防止消费者处理不过来,导致消息堆积
原理:
在RabbitMQ中,消费端可以设置
prefetch
值,限制每个消费者一次能接收的最大消息数量消费者只有在处理并确认完当前消息后,才会接收新的消息
这样避免了消费者一次性获取过多消息导致消息积压或消费者过载
举例:
- 假设有一个消费者处理较慢,它可以设置
prefetch=1
,表示每次只接收一条消息 - 处理完成并手动ACK后,才能再接收下一条消息
- 这样可以防止这个消费者一次性接收太多消息导致处理不过来
- 假设有一个消费者处理较慢,它可以设置
# 设置prefetch
channel.basic_qos(prefetch_count=1)
2
# 2、重回队列
概念:
- RabbitMQ允许消费者在消费失败时将消息重新放回队列,以便重新被消费
- 这种机制可以确保未成功消费的消息不会被丢弃
原理:
当消费者未能成功处理消息时,可以通过拒绝(
basic.reject
或basic.nack
)将消息重新投递回队列重回队列的消息会继续等待其他消费者处理
举例:
- 假设一个消费者在处理消息时发生了异常,那么可以通过
basic.nack
方法将消息放回队列,等待其他消费者重新消费
- 假设一个消费者在处理消息时发生了异常,那么可以通过
channel.basic_nack(delivery_tag=delivery_tag, requeue=True)
# 3、TTL(消息存活时间)
概念:
- TTL是指消息在队列中的存活时间
- 超过指定的时间后,消息会自动过期,通常会被发送到
死信队列或直接丢弃
原理:RabbitMQ支持两种方式设置TTL
队列级别的TTL:为整个队列设置消息存活时间,队列中的所有消息都共享同一个TTL
消息级别的TTL:为每条消息单独设置TTL
消息超过TTL后会被认为过期,过期消息可以被路由到死信队列或直接丢弃
举例:
- 在一个用户注册的系统中,用户注册验证码有有效时间(比如5分钟),可以为这些验证码消息设置TTL
- 如果消费者在5分钟内没有处理这些验证码消息,消息将被认为过期,进入死信队列
args = {"x-message-ttl": 300000} # TTL设为5分钟
channel.queue_declare(queue='queue_with_ttl', arguments=args)
2
# 4、死信队列(DLX)
概念:
- 死信队列用于接收那些无法被正常消费的消息
- 如TTL过期、队列满了或者被消费者拒绝但不重回队列的消息
原理:
当消息无法被正常处理时,它们可以被转发到一个特殊的交换器
称为死信交换器(DLX),并存入对应的死信队列
可以基于死信队列中的消息做后续分析、补偿处理等
举例:
- 例如,在订单处理系统中,如果某个订单消息由于队列已满、TTL过期或处理失败无法被消费
- 可以将这些消息发送到一个死信队列进行后续人工干预或异常处理
args = {
"x-dead-letter-exchange": "dlx_exchange"
}
channel.queue_declare(queue="normal_queue", arguments=args)
# 定义死信队列
channel.exchange_declare(exchange="dlx_exchange", exchange_type="direct")
channel.queue_declare(queue="dead_letter_queue")
channel.queue_bind(exchange="dlx_exchange", queue="dead_letter_queue")
2
3
4
5
6
7
8
9