08.rpc实现
# 01.rpc
RabbitMQ rpc实现(消费者可以将执行结果返回给生产者)
# 1.1 rpc作用
1.在前面的方法中只能实现生产者发消息,消费者收消息,消息流是单向的
2.如果生产者发一条消息,需要让消费者执行,并让消费者将执行结果返回给生产者,前面的方法就无法完成
3.rpc可以实现生产者与消费者互发消息(两边即是生产者又是消费者)
4.为了实现即使生产者又是消费者,需要建立两个queue,一个queue用来放命令,一个queue用来放执行结果
# 1.2 rpc类型的exchange(server端执行命令返回结果)
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel() #实例化连接实例对象
channel.queue_declare(queue='rpc_queue') #客户端将消息发送到rpc_queue服务器端也要定义这个queue
def fib(n): #这里就是求斐波那契的一个函数,不是重点
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
#1 on_request中做了哪些事:收取命令,执行命令,返回结果
#2 这里的props参数就是客户端告诉服务器端要将结果发送到哪个queue中
def on_request(ch, method, props, body): #调试的时候,消费者只要发送消息就会调用这个函数
n = int(body) #这里的body就是从客户端收取的内容
print(" [.] fib(%s)" % n)
response = fib(n) #直接调用定义的fib斐波那契函数得到执行结果
#1在这一步服务器端已经的到了执行结果,但是怎样才能将结果返回给客户端
#2 这里是在客户端发送消息的时候添加了一个新的字段,字段里指明服务器将结果发到哪个队列里
ch.basic_publish(exchange='',
routing_key=props.reply_to, #是发送方随机生成的那个队列的名字
#1 correlation_id是客户端发送给服务器端uuid值,服务端会将这个id再返回给客户端
#2 这样客户端就可以确定这条消息就是客户端刚刚发送给你的执行结果
properties=pika.BasicProperties(correlation_id = props.correlation_id),
body=str(response)) # 将消息返回给客户端
ch.basic_ack(delivery_tag = method.delivery_tag) #收到消息后主动发送确认
#收到消息就调用回调函数on_request,并指定从rpc_queue中收取消息
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming() #调试的时候,消费者没有消息时就卡在这里,有消息就自动调用on_request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 1.3 rpc类型的exchange(client端发送命令接收命令执行结果)
import pika
import uuid,time
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel() #实例化连接实例对象
result = self.channel.queue_declare(exclusive=True) #生成一个随机queue对象
self.callback_queue = result.method.queue #获得这个随机queue
self.channel.basic_consume(self.on_response, no_ack=True, #声明调用on_response处理
queue=self.callback_queue) #从刚刚指定的随机queue中取结果
def on_response(self, ch, method, props, body): #只要一收到消息就自动调用这个回调函数
#1 corr_id是服务器端自己生成的uuid值,在发送消息时已经一起发送给服务器端
#2 服务器端会将执行结果和客户端发送给自己uuid值一起发送给客户端
#3 客户端通过判断如果,uuid值相同那么这条消息就是我刚刚发送消息的执行结果
#4 得到了执行结果body后才会将None变成body执行结果,结束下面的while循环
if self.corr_id == props.correlation_id:
self.response = body #想让下面的while循环结束就在这里让self.response不为None
#这里的body是我们收到的内容
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='', #在广播模式可以指定转发器名字
routing_key='rpc_queue', #将命令发发送到rpc_queue
properties=pika.BasicProperties( #消息持久化
reply_to = self.callback_queue,
#让服务器将执行结果返回到这个queue里,
# reply_to这个queue是前面生成的随机queue
correlation_id = self.corr_id,#corr_id随机生成的uuid值
),
body=str(n)) #发送的消息
while self.response is None: #想让这个死循环结束,就必须使self.response不为None
self.connection.process_data_events()
#非阻塞版的start_consumer,有消息就收消息,没消息不阻塞继续往下走
#如果有消息就会自动触发on_response函数,
#在on_response中将self.response改成body(服务端返回的结果)了所以就跳出while了
print("只要能走到这里就证明没消息")
time.sleep(0.5)
return int(self.response)
fibonacci_rpc = FibonacciRpcClient() #实例化
imp_num = input('please input fib num:')
response = fibonacci_rpc.call(imp_num) #调用实例的call方法 计算斐波那契第六个数的值
print(" 第%s个fib数是 %r" %(imp_num,response))
#注:uuid的作用是如果我们在while循环中没有sleep而是发送了其他命令,不用uuid标示就会接收乱
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
上次更新: 2024/3/13 15:35:10