1. Work Queue模式工作流程
Producer 生产消息,并把消息提交到ExchangeExchange把消息放入相应的Queue(队列)Cosumer监听队列,当有消息时取出并通过回调函数作处理
2. 代码:
2.1 基本实现
以下是send.py 代表了Producer
import sys
import pika
from credentials import credentials
msg = ''.join(sys.argv[1:]) or "Hello World...."
cred = pika.PlainCredentials(**credentials)
with pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=cred)) as connection:
channel = connection.channel()
channel.declare_queue("hello")
channel.basic_publish(exchange='', routing_key='hello', body=msg)
print(" [x] Sent %r" % msg)
解析:
credentials 是登录RabbitMQ客户端的账号密码,格式是: {
"username": "xxx",
"password": "xxx"
}
用channel.declare_queue(“hello”)声明一个队列,队列声明是幂等的,意味着如果已存在同名的队列就不会重复创建使用basic_publish发布一个消息 ,其中:
exchange参数设置为空字符,就是使用默认的其中routing_key参数就是队列的名字 receive.py 消费者
import time
import pika
from credentials import credentials
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
cred = pika.PlainCredentials(**credentials)
with pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=cred)) as connection:
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
解析:
创建连接、声明队列的代码和生产者一致使用basic_consume方法获取消息,在收到消息后,触发回调callback在callback中通过time.sleep()模拟一个耗时的任务
2.2 运行
启动send.py, 它马上就返回了,不会等待消费者接收或是callback的执行。启动receive.py, 打印出:
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World....'
[x] Done
2.3 进阶
消费者意外结束?消息确认
消费者有可能会去处理一些耗时任务,有可能在执行一半就“死”了,然而它接收的消息已经被删除了,这是不可复原的因此,我们可以让消费者在成功接收、处理后回传给RabbitMQ服务器一个确认信号,在收到确认信号后,RabbitMQ服务器再删除这个消息。如果一个消费者意外结束了,并且RabbitMQ没有收到确认信号,消息就会被重新入列,并传送给另一个消费者,这样,消息就不会意外丢失。这种机制不是通过“超时”来实现的,因为消费者处理消息可能用时会非常长,RabbitMQ会判断消费者是否“死”了(Channel关闭,连接关闭,或TCP连接中断)从而将消息重新分发。再看receive.py,只需要在basic_consume方法中,把auto_ack(自动确认)参数去掉:
channel.basic_consume(queue='hello',
on_message_callback=callback)
然后,在回调结束的位置发送确认即可:
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
RabbitMQ服务器崩溃?
以上确认机制确保消费者意外结束而导致消息丢失,但如果RabbitMQ服务器意外结束,我们创建的队列、消息都会消失。通过durable参数使队列持久化:
channel.queue_declare(queue='task_queue', durable=True)
通过使delivery_mode=2,在生产者提交一个消息时,使消息持久化:
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2,
))
公平分发
Round Robin算法使得不同的消费者得到的消息数量是大致相等的,但这不是绝对公平的比如,有两个消费者进程,排在奇数位的消息很复杂,处理很耗时;而排在偶数位的消息很简单,处理时间很短,那么复杂的任务总是给其中一个进程,而另一个进程却很空闲。为此,我们可以在receive.py中对channel实例这样配置:
channel.basic_qos(prefetch_count=1)
这样的话,只有在消费者发送确认,确认它的上一个任务已经完成,它才会收到下一个。
转载请注明原文地址:https://blackberry.8miu.com/read-33172.html