不做大哥好多年 不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核

逍遥子

不做大哥好多年
首页
  • MySQL
  • Redis
  • Elasticsearch
  • Kafka
  • Etcd
  • MongoDB
  • TiDB
  • RabbitMQ
  • 01.GO基础
  • 02.面向对象
  • 03.并发编程
  • 04.常用库
  • 05.数据库操作
  • 06.Beego框架
  • 07.Beego商城
  • 08.GIN框架
  • 09.GIN论坛
  • 10.微服务
  • 01.Python基础
  • 02.Python模块
  • 03.Django
  • 04.Flask
  • 05.SYL
  • 06.Celery
  • 10.微服务
  • 01.Java基础
  • 02.面向对象
  • 03.Java进阶
  • 04.Web基础
  • 05.Spring框架
  • 100.微服务
  • Docker
  • K8S
  • 容器原理
  • Istio
  • 数据结构
  • 算法基础
  • 算法题分类
  • 前置知识
  • PyTorch
  • 01.Python
  • 02.GO
  • 03.Java
  • 04.业务问题
  • 05.关键技术
  • 06.项目常识
  • 10.计算机基础
  • Linux基础
  • Linux高级
  • Nginx
  • KeepAlive
  • ansible
  • zabbix
  • Shell
  • Linux内核
  • MySQL

  • Redis

  • Elasticsearch

  • Kafka

  • Etcd

  • MongoDB

  • TiDB

  • RabbitMQ

    • 01.RabbitMQ基础
    • 02.RabbitMQ原理 ✅
    • 03.保证可靠消费 ✅
    • 04.安装RabbitMQ
    • 05.消息分发
    • 06.消息持久化
      • 01.消息持久化
        • 1.1 只需要添加这两句即可
        • 1.2 生产者
        • 1.3 消费者
        • 1.4 重启服务
    • 07.消息广播
    • 08.rpc实现
    • 09.RabbitMQ集群
  • 数据库
  • RabbitMQ
xiaonaiqiang
2021-05-02
目录

06.消息持久化

# 01.消息持久化

# 1.1 只需要添加这两句即可

  • 生产者
# 声明队列queue
channel.queue_declare(queue='hello',durable=True,passive=True)
properties=pika.BasicProperties(
    delivery_mode=2,  # 第二步:如果只有这一条重启服RabbitMQ服务后只有队列,但没数据
)  # 消息持久化
1
2
3
4
5
  • 消费者
# 声明队列queue
channel.queue_declare(queue='hello',durable=True)
1
2

# 1.2 生产者

sender.py

# -*- coding: utf-8 -*-
import pika

class Sender(object):
	def __init__(self):
		# 远程rabbitmq服务的配置信息
		self.username = 'admin'  # 指定远程rabbitmq的用户名密码
		self.pwd = '123456'
		self.ip_addr = '192.168.56.66'
		self.port_num = 5672
		self.connection = None
		self.channel = None

	def create_connect(self):
		# 消息队列服务的连接和队列的创建
		credentials = pika.PlainCredentials(self.username, self.pwd)
		connection = pika.BlockingConnection(pika.ConnectionParameters(self.ip_addr, self.port_num, '/', credentials))
		channel = connection.channel()   # 建立rabbitmq协议的通道
		# 声明队列queue
		channel.queue_declare(queue='hello',durable=True,passive=True)
		self.connection = connection
		self.channel = channel

	def send(self,msg=None):
		self.create_connect()
		msg = msg if msg else 'Hello World!'
		channel = self.create_connect()
		# 通过刚刚建立的通道channel调用basic_publish发送消息
		self.channel.basic_publish(exchange='',
								   routing_key='hello',  # routing_key就是队列的名字,这里指定向队列hello发消息
								   body=msg,  # body里面是要向队列发送的消息
								   properties=pika.BasicProperties(
									   delivery_mode=2,  # 第二步:如果只有这一条重启服RabbitMQ服务后只有队列,但没数据
								     )  # 消息持久化
								   )

		print(" [x] Sent %s"%msg)
		self.connection.close()  # 发送完毕后关掉

if __name__ == '__main__':
	send = Sender()
	send.send('aaaaaaaaa')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 1.3 消费者

receiver.py

# -*- coding: utf-8 -*-
import pika,time

class Receiver(object):
	def __init__(self):
		# 远程rabbitmq服务的配置信息
		self.username = 'admin'  # 指定远程rabbitmq的用户名密码
		self.pwd = '123456'
		self.ip_addr = '192.168.56.66'
		self.port_num = 5672
		self.connection = None
		self.channel = None

	def create_connect(self):
		# 消息队列服务的连接和队列的创建
		credentials = pika.PlainCredentials(self.username, self.pwd)
		connection = pika.BlockingConnection(pika.ConnectionParameters(self.ip_addr, self.port_num, '/', credentials))
		channel = connection.channel()   # 建立rabbitmq协议的通道
		# 声明队列queue
		channel.queue_declare(queue='hello',durable=True)
		self.connection = connection
		self.channel = channel

	def run(self):
		self.create_connect()
		# 这里仅仅是声明了收队列hello中的内容,并调用callback处理,实际上在这里还没有收
		# 只有执行下面的channel.start_consuming()才会正真的收
		self.channel.basic_consume('hello', self.callback, False)
		# 'hello':   # 指明要从那个队列里收消息
		# callback:  # 如果收到消息,调用callback函数来处理消息,标准格式带四个参数
		# False:      # no_ack=False,消费者必须确认已经消费了,否则数据会一直在队列中不丢失

		print(' [*] Waiting for messages. To exit press CTRL+C')
		self.channel.start_consuming()  # 开始消息,是个死循环,一直监听收消息

	# 收到消息后,回调函数处理任务
	def callback(self, ch, method, properties, body):
		'''
		:param ch:  ch是刚刚声明的channel = connection.channel()对象
		:param method:   method是指定要把消息发送给那些queue的信息
		:param properties: 其他属性
		:param body: 收到的消息
		'''
		time.sleep(60)
		print(" [x] Received %r" % body)
		ch.basic_ack(delivery_tag=method.delivery_tag)
		# 如果去掉no_ack = False 必须加ch.basic_ack,让消费者收到消息主动发送确认
		# 否则消费者如果收到消息后断掉,那么这条消息依然会发送给另一个,再断开又发送下一个,无限循环


if __name__ == '__main__':
	receive = Receiver()
	receive.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 1.4 重启服务

[root@k8s-node2 ~]#  systemctl restart rabbitmq-server 
[root@k8s-node2 ~]#  rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	2          # 重启rabbitmq服务后可以看到 hello 这个队里消息没有丢失
balance	1154400
1
2
3
4
5
6
7
上次更新: 2024/3/13 15:35:10
05.消息分发
07.消息广播

← 05.消息分发 07.消息广播→

最近更新
01
04.数组双指针排序_子数组
03-25
02
08.动态规划
03-25
03
06.回溯算法
03-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 逍遥子 技术博客 京ICP备2021005373号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式