05.消息分发
# 01.安装pike
**注:**为了支持RabbitMQ,每种语言都有自己专有的模块去与RabbitMQ通信,在python中最常用的是pika
安装pika的方法:
(django2.2) C:\Users\Lenovo>pip install pika -i https://pypi.tuna.tsinghua.edu.cn/simple/
1
# 02.最简单的队列通信
重启RabbitMq会丢失所有的queue中的数据
RabbitMQ默认采取轮询方法,将第一个消息发给a第二个发送给b以此类推
默认生产者会将消息发送消费者后会等待确认,如果加上no_ack=True就不会等确认
如果消费者还未执行完就突然断了,那么这条数据不会被其他消费者接收到
# 2.1 生产者
# -*- coding: utf-8 -*-
import pika
class Sender(object):
def __init__(self):
# 远程rabbitmq服务的配置信息
self.username = 'admin' # 指定远程rabbitmq的用户名密码
self.pwd = '123456'
self.ip_addr = '192.168.56.66'
self.port_num = 5672
self.connection = None
self.channel = None
def create_connect(self):
# 消息队列服务的连接和队列的创建
credentials = pika.PlainCredentials(self.username, self.pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(self.ip_addr, self.port_num, '/', credentials))
channel = connection.channel() # 建立rabbitmq协议的通道
# 声明队列queue
channel.queue_declare(queue='hello')
self.connection = connection
self.channel = channel
def send(self,msg=None):
self.create_connect()
msg = msg if msg else 'Hello World!'
channel = self.create_connect()
# 通过刚刚建立的通道channel调用basic_publish发送消息
self.channel.basic_publish(exchange='',
routing_key='hello', # routing_key就是队列的名字,这里指定向队列hello发消息
body=msg # body里面是要向队列发送的消息
)
print(" [x] Sent %s"%msg)
self.connection.close() # 发送完毕后关掉
if __name__ == '__main__':
send = Sender()
send.send('aaaaaaaaa')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 2.2 消费者
# -*- coding: utf-8 -*-
import pika
class Receiver(object):
def __init__(self):
# 远程rabbitmq服务的配置信息
self.username = 'admin' # 指定远程rabbitmq的用户名密码
self.pwd = '123456'
self.ip_addr = '192.168.56.66'
self.port_num = 5672
self.connection = None
self.channel = None
def create_connect(self):
# 消息队列服务的连接和队列的创建
credentials = pika.PlainCredentials(self.username, self.pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(self.ip_addr, self.port_num, '/', credentials))
channel = connection.channel() # 建立rabbitmq协议的通道
# 声明队列queue
channel.queue_declare(queue='hello')
self.connection = connection
self.channel = channel
def run(self):
self.create_connect()
# 这里仅仅是声明了收队列hello中的内容,并调用callback处理,实际上在这里还没有收
# 只有执行下面的channel.start_consuming()才会正真的收
self.channel.basic_consume('hello', self.callback, False)
# 'hello': # 指明要从那个队列里收消息
# callback: # 如果收到消息,调用callback函数来处理消息,标准格式带四个参数
# False: # no_ack=False,消费者必须确认已经消费了,否则数据会一直在队列中不丢失
print(' [*] Waiting for messages. To exit press CTRL+C')
self.channel.start_consuming() # 开始消息,是个死循环,一直监听收消息
# 收到消息后,回调函数处理任务
def callback(self, ch, method, properties, body):
'''
:param ch: ch是刚刚声明的channel = connection.channel()对象
:param method: method是指定要把消息发送给那些queue的信息
:param properties: 其他属性
:param body: 收到的消息
'''
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 如果去掉no_ack = False 必须加ch.basic_ack,让消费者收到消息主动发送确认
# 否则消费者如果收到消息后断掉,那么这条消息依然会发送给另一个,再断开又发送下一个,无限循环
if __name__ == '__main__':
receive = Receiver()
receive.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 2.3 测试消费者异常断开
- 因为 no_ack=False, 所以消费者必须确认已经消费了,否则数据会一直在队列中不丢失
- 当收到消费的客户端异常断开,rabbitmq会捕获到,就会将消息推送给下一个消费者
[root@k8s-node2 ~]# rabbitmq list_queues
1
上次更新: 2024/3/13 15:35:10