06.消息持久化
# 01.消息持久化
# 1.1 只需要添加这两句即可
- 生产者
# 声明队列queue
channel.queue_declare(queue='hello',durable=True,passive=True)
properties=pika.BasicProperties(
delivery_mode=2, # 第二步:如果只有这一条重启服RabbitMQ服务后只有队列,但没数据
) # 消息持久化
1
2
3
4
5
2
3
4
5
- 消费者
# 声明队列queue
channel.queue_declare(queue='hello',durable=True)
1
2
2
# 1.2 生产者
sender.py
# -*- coding: utf-8 -*-
import pika
class Sender(object):
def __init__(self):
# 远程rabbitmq服务的配置信息
self.username = 'admin' # 指定远程rabbitmq的用户名密码
self.pwd = '123456'
self.ip_addr = '192.168.56.66'
self.port_num = 5672
self.connection = None
self.channel = None
def create_connect(self):
# 消息队列服务的连接和队列的创建
credentials = pika.PlainCredentials(self.username, self.pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(self.ip_addr, self.port_num, '/', credentials))
channel = connection.channel() # 建立rabbitmq协议的通道
# 声明队列queue
channel.queue_declare(queue='hello',durable=True,passive=True)
self.connection = connection
self.channel = channel
def send(self,msg=None):
self.create_connect()
msg = msg if msg else 'Hello World!'
channel = self.create_connect()
# 通过刚刚建立的通道channel调用basic_publish发送消息
self.channel.basic_publish(exchange='',
routing_key='hello', # routing_key就是队列的名字,这里指定向队列hello发消息
body=msg, # body里面是要向队列发送的消息
properties=pika.BasicProperties(
delivery_mode=2, # 第二步:如果只有这一条重启服RabbitMQ服务后只有队列,但没数据
) # 消息持久化
)
print(" [x] Sent %s"%msg)
self.connection.close() # 发送完毕后关掉
if __name__ == '__main__':
send = Sender()
send.send('aaaaaaaaa')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 1.3 消费者
receiver.py
# -*- coding: utf-8 -*-
import pika,time
class Receiver(object):
def __init__(self):
# 远程rabbitmq服务的配置信息
self.username = 'admin' # 指定远程rabbitmq的用户名密码
self.pwd = '123456'
self.ip_addr = '192.168.56.66'
self.port_num = 5672
self.connection = None
self.channel = None
def create_connect(self):
# 消息队列服务的连接和队列的创建
credentials = pika.PlainCredentials(self.username, self.pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(self.ip_addr, self.port_num, '/', credentials))
channel = connection.channel() # 建立rabbitmq协议的通道
# 声明队列queue
channel.queue_declare(queue='hello',durable=True)
self.connection = connection
self.channel = channel
def run(self):
self.create_connect()
# 这里仅仅是声明了收队列hello中的内容,并调用callback处理,实际上在这里还没有收
# 只有执行下面的channel.start_consuming()才会正真的收
self.channel.basic_consume('hello', self.callback, False)
# 'hello': # 指明要从那个队列里收消息
# callback: # 如果收到消息,调用callback函数来处理消息,标准格式带四个参数
# False: # no_ack=False,消费者必须确认已经消费了,否则数据会一直在队列中不丢失
print(' [*] Waiting for messages. To exit press CTRL+C')
self.channel.start_consuming() # 开始消息,是个死循环,一直监听收消息
# 收到消息后,回调函数处理任务
def callback(self, ch, method, properties, body):
'''
:param ch: ch是刚刚声明的channel = connection.channel()对象
:param method: method是指定要把消息发送给那些queue的信息
:param properties: 其他属性
:param body: 收到的消息
'''
time.sleep(60)
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 如果去掉no_ack = False 必须加ch.basic_ack,让消费者收到消息主动发送确认
# 否则消费者如果收到消息后断掉,那么这条消息依然会发送给另一个,再断开又发送下一个,无限循环
if __name__ == '__main__':
receive = Receiver()
receive.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 1.4 重启服务
[root@k8s-node2 ~]# systemctl restart rabbitmq-server
[root@k8s-node2 ~]# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 2 # 重启rabbitmq服务后可以看到 hello 这个队里消息没有丢失
balance 1154400
1
2
3
4
5
6
7
2
3
4
5
6
7
上次更新: 2024/3/13 15:35:10