不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • MySQL

  • Redis

  • Elasticsearch

  • Kafka

  • Etcd

  • MongoDB

  • TiDB

  • RabbitMQ

    • 01.RabbitMQ基础
    • 02.RabbitMQ原理 ✅
      • 01.RabbitMQ使用场景
        • 1、What 是什么?
        • 2. Why 为什么使用RabbitMQ?
        • 3. Where 使用场景?
        • 4、How 如何工作?
      • 02.RabbitMQ原理
        • 0、架构图
        • 1、核心组件
        • 1)Publisher(生产者)
        • 2)Consumer(消费者)
        • 3)Exchange(交换机)
        • 4)Queue(队列)
        • 5)其他组件
        • 2、消息流转过程
        • 3、Exchange 和 Queue 的具体交互
      • 03.重要功能
        • 1、消费端限流
        • 2、重回队列
        • 3、TTL(消息存活时间)
        • 4、死信队列(DLX)
    • 03.保证可靠消费 ✅
    • 04.安装RabbitMQ
    • 05.消息分发
    • 06.消息持久化
    • 07.消息广播
    • 08.rpc实现
    • 09.RabbitMQ集群
  • 数据库
  • RabbitMQ
xiaonaiqiang
2021-04-30
目录

02.RabbitMQ原理 ✅常识原理

RabbitMQ 是基于 AMQP 协议的消息中间件,具有高可靠性、灵活路由、集群化、高可用队列等特点,广泛用于异步处理、系统解耦、流量削峰等场景。

消息生产者通过 Exchange 将消息路由到 Queue,消费者从 Queue 获取消息并处理。

支持多种 Exchange 类型:Direct、Fanout、Topic、Headers 来实现不同的路由机制。

RabbitMQ 通过持久化、ACK确认、消费端限流、TTL、死信队列等机制保障消息可靠传递与系统健壮性。

# 01.RabbitMQ使用场景

# 1、What 是什么?

  • RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件
  • 它最初起源于金融系统,用于在分布式系统中存储转发消息
  • 可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认
  • 灵活的路由(Flexible Routing):
    • 在消息进入队列之前,通过 Exchange 来路由消息
    • 对于典型的路由功能,Rabbit 已经提供了一些内置的 Exchange 来实现
    • 针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的 Exchange
  • 消息集群(Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
  • 高可用(Highly Avaliable Queues):
    • 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用
  • 多种协议(Multi-protocol):支持多种消息队列协议,如 STOMP、MQTT 等
  • 多种语言客户端(Many Clients):几乎支持所有常用语言,比如 Java、.NET、Ruby 等
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ 提供了消息的跟踪机制,使用者可以找出发生了什么
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件

# 2. Why 为什么使用RabbitMQ?

  • RabbitMQ适用于多种场景,尤其是需要可靠消息传递和灵活路由的系统中
  • 异步处理:
    • 通过消息队列,能够减少请求的延迟,提升用户体验
    • 例如,在用户注册时,消息队列可以异步发送邮件通知,降低实时响应的延迟
  • 解耦:
    • RabbitMQ可以使系统模块独立工作
    • 例如,
      • 在电商系统中,订单与库存系统通过消息队列进行解耦
      • 确保高效处理订单的同时,降低对库存系统的直接依赖
  • 流量削锋:
    • 在高并发的秒杀活动中,RabbitMQ可以用于缓冲请求,防止系统被瞬间的高流量压垮
  • 可靠传递:
    • RabbitMQ支持消息确认机制,确保消息从生产者到消费者的可靠传递,避免消息丢失

# 3. Where 使用场景?

  • 金融交易系统:需要高可靠性和顺序性的场景
  • 电商平台:订单处理、库存管理和通知服务等模块间的解耦
  • 分布式系统中的服务通信:不同服务之间的消息传递,例如微服务架构中的服务通信
  • 日志和监控系统:处理大量实时数据时,RabbitMQ可用作消息传递的中间层,保证高效、低延迟的数据流

# 4、How 如何工作?

  • Producer:
    • 消息生产者,它负责生成消息并通过Exchange发送到RabbitMQ系统
  • Exchange:
    • 消息交换机,负责根据消息的Routing Key和绑定的路由规则,将消息路由到不同的Queue
    • RabbitMQ提供了多种Exchange类型,如Direct、Fanout、Topic和Headers
  • Queue:
    • 消息队列,存储从Exchange接收到的消息,等待被消费者消费
    • 队列支持持久化、临时和自动删除等特性
  • Consumer:
    • 消息消费者,从队列中消费消息,并通过ACK机制确认消息的接收情况
  • Routing Key:
    • 用于消息路由的关键字段,Exchange根据它决定将消息分发到哪个Queue

# 02.RabbitMQ原理

# 0、架构图

# 1、核心组件

# 1)Publisher(生产者)

  • 生产者负责生成消息并发送给 RabbitMQ 的 Exchange
  • Publisher 通常是业务系统的一个部分
  • 比如订单系统在订单创建后发送一条消息,通知其他系统

# 2)Consumer(消费者)

  • 消费者从 RabbitMQ 中接收消息并进行处理
  • 它是消息的最终接收者,可能是多个不同的服务,如库存系统、通知系统等
  • 消费者在收到消息后会发送 ACK 确认消息已成功处理

# 3)Exchange(交换机)

  • 消息并不是直接被投递到 Queue(消息队列)中的,中间还必须经过 Exchange(交换器) 这一层
  • Exchange(交换器) 用来接收生产者发送的消息,并将这些消息路由给服务器中的队列中
  • Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中
  • RabbitMQ 的 Exchange(交换器) 有4种类型,不同的类型对应着不同的路由策略
  • direct (默认):把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中

  • fanout:广播模式,将消息路由到所有与该 Exchange 绑定的队列,不考虑 Routing Key

  • topic:基于模式匹配的路由机制,允许使用通配符进行复杂的路由逻辑

  • headers:根据消息头部属性进行路由,灵活性更强,但应用相对少见

  • Exchange(交换器) 示意图如下

    • 生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则
    • 而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效
    • RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来
    • 在绑定的时候一般会指定一个 BindingKey(绑定建)
    • 这样 RabbitMQ 就知道如何正确将消息路由到队列了

# 4)Queue(队列)

  • Queue 是消息最终的存储地点,消费者从队列中获取消息并进行处理
  • RabbitMQ 提供了三种不同类型的队列配置
  • 持久化队列(Durable Queue):
    • 消息持久化到硬盘,即使 RabbitMQ 重启,队列中的消息也不会丢失,适合需要高可靠性保证的场景
  • 临时队列(Transient Queue):
    • 消息存储在内存中,系统重启时数据会丢失,适用于对可靠性要求不高的场景
  • 自动删除队列(Auto-Delete Queue):
    • 当没有消费者连接到队列时,队列会自动删除,用于短期使用的队列场景

# 5)其他组件

  • Routing Key(路由键)

    • Routing Key 是生产者在发送消息时附带的一个标识,用来指示消息应该如何被路由到队列

    • 它是 Exchange 根据不同的路由规则选择队列的重要依据

  • Broker(代理)

    • RabbitMQ 作为一个消息代理(Broker),负责接收和分发消息

    • Broker 是生产者和消费者之间的中介,保证消息能够可靠地从生产者传递到消费者

  • Virtual Host(虚拟主机)

    • Virtual Host 是用来隔离不同用户或不同应用的权限机制

    • 在一个 RabbitMQ 实例中可以创建多个 Virtual Host,每个 Virtual Host 可以独立配置队列、交换机、绑定等

  • Connection

    • 是客户端与 RabbitMQ Broker 之间的 TCP 连接
    • 每个生产者和消费者都需要先建立连接才能发送或接收消息
  • Channel

    • 是多路复用的机制,允许在一个 Connection 上开启多个 Channel,减少了大量连接的开销
    • 每个 Channel 是一个轻量级的通信通道,通常用于不同的消息流或队列操作

# 2、消息流转过程

  • 消息生成:生产者(Publisher)生成一条消息,并将其发送给指定的 Exchange
  • 消息路由:
    • Exchange 根据 Routing Key 和其类型(Direct、Fanout、Topic、Headers),将消息分发到绑定的队列(Queue)中
  • 消息存储:
    • 消息到达队列后,等待消费者(Consumer)读取如果队列配置为持久化,消息会被存储到磁盘中
  • 消息消费:
    • 消费者从队列中获取消息并进行处理
    • 处理完成后,消费者会发送一个 ACK 给 RabbitMQ,确认消息已经被成功处理
  • 消息确认:
    • 当 RabbitMQ 收到消费者的 ACK 后,会删除该条消息
    • 如果消费者处理失败,RabbitMQ 可以重新将消息放回队列中,等待下一个消费者处理

# 3、Exchange 和 Queue 的具体交互

  • RabbitMQ 的核心组件是 Exchange 和 Queue
  • 两者通过绑定关系和 Routing Key 进行消息路由
  • 生产者发送消息:生产者将消息发送给 Exchange,并附带一个 Routing Key

  • 消息路由到队列:Exchange 根据消息的 Routing Key 和与队列的绑定规则,将消息路由到对应的队列中

  • 消息进入队列:队列接收到消息并存储,等待消费者进行消费

  • 消费者消费消息:消费者从队列中读取消息并处理

  • 确认消息:消费者处理完消息后,发送 ACK 确认,队列从中删除该消息

  • 通过这种机制,RabbitMQ 实现了消息的灵活路由、可靠存储和消费确认

# 03.重要功能

  • 消费端限流用于防止消费者过载;
  • 重回队列确保未成功消费的消息不会丢失;
  • TTL用于控制消息的存活时间,防止过期消息积压;
  • 死信队列用于处理无法正常消费的消息,保证系统的健壮性

# 1、消费端限流

  • 概念:

    • RabbitMQ通过消费端限流来控制消费者一次性从队列中获取的消息数量

    • 以防止消费者处理不过来,导致消息堆积

  • 原理:

    • 在RabbitMQ中,消费端可以设置prefetch值,限制每个消费者一次能接收的最大消息数量

    • 消费者只有在处理并确认完当前消息后,才会接收新的消息

    • 这样避免了消费者一次性获取过多消息导致消息积压或消费者过载

  • 举例:

    • 假设有一个消费者处理较慢,它可以设置prefetch=1,表示每次只接收一条消息
    • 处理完成并手动ACK后,才能再接收下一条消息
    • 这样可以防止这个消费者一次性接收太多消息导致处理不过来
# 设置prefetch
channel.basic_qos(prefetch_count=1)
1
2

# 2、重回队列

  • 概念:

    • RabbitMQ允许消费者在消费失败时将消息重新放回队列,以便重新被消费
    • 这种机制可以确保未成功消费的消息不会被丢弃
  • 原理:

    • 当消费者未能成功处理消息时,可以通过拒绝(basic.reject 或 basic.nack)将消息重新投递回队列

    • 重回队列的消息会继续等待其他消费者处理

  • 举例:

    • 假设一个消费者在处理消息时发生了异常,那么可以通过basic.nack方法将消息放回队列,等待其他消费者重新消费
channel.basic_nack(delivery_tag=delivery_tag, requeue=True)
1

# 3、TTL(消息存活时间)

  • 概念:

    • TTL是指消息在队列中的存活时间
    • 超过指定的时间后,消息会自动过期,通常会被发送到死信队列或直接丢弃
  • 原理:RabbitMQ支持两种方式设置TTL

    • 队列级别的TTL:为整个队列设置消息存活时间,队列中的所有消息都共享同一个TTL

    • 消息级别的TTL:为每条消息单独设置TTL

    • 消息超过TTL后会被认为过期,过期消息可以被路由到死信队列或直接丢弃

  • 举例:

    • 在一个用户注册的系统中,用户注册验证码有有效时间(比如5分钟),可以为这些验证码消息设置TTL
    • 如果消费者在5分钟内没有处理这些验证码消息,消息将被认为过期,进入死信队列
args = {"x-message-ttl": 300000}  # TTL设为5分钟
channel.queue_declare(queue='queue_with_ttl', arguments=args)
1
2

# 4、死信队列(DLX)

  • 概念:

    • 死信队列用于接收那些无法被正常消费的消息
    • 如TTL过期、队列满了或者被消费者拒绝但不重回队列的消息
  • 原理:

    • 当消息无法被正常处理时,它们可以被转发到一个特殊的交换器

    • 称为死信交换器(DLX),并存入对应的死信队列

    • 可以基于死信队列中的消息做后续分析、补偿处理等

  • 举例:

    • 例如,在订单处理系统中,如果某个订单消息由于队列已满、TTL过期或处理失败无法被消费
    • 可以将这些消息发送到一个死信队列进行后续人工干预或异常处理
args = {
    "x-dead-letter-exchange": "dlx_exchange"
}
channel.queue_declare(queue="normal_queue", arguments=args)

# 定义死信队列
channel.exchange_declare(exchange="dlx_exchange", exchange_type="direct")
channel.queue_declare(queue="dead_letter_queue")
channel.queue_bind(exchange="dlx_exchange", queue="dead_letter_queue")
1
2
3
4
5
6
7
8
9
上次更新: 2025/4/29 17:38:19
01.RabbitMQ基础
03.保证可靠消费 ✅

← 01.RabbitMQ基础 03.保证可靠消费 ✅→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式