RabbitMQ(一)简单工作队列 Python实现

    科技2024-08-11  26

    1. Work Queue模式工作流程

    Producer 生产消息,并把消息提交到ExchangeExchange把消息放入相应的Queue(队列)Cosumer监听队列,当有消息时取出并通过回调函数作处理

    2. 代码:

    2.1 基本实现

    以下是send.py 代表了Producer import sys import pika from credentials import credentials msg = ''.join(sys.argv[1:]) or "Hello World...." cred = pika.PlainCredentials(**credentials) with pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=cred)) as connection: channel = connection.channel() channel.declare_queue("hello") channel.basic_publish(exchange='', routing_key='hello', body=msg) print(" [x] Sent %r" % msg) 解析: credentials 是登录RabbitMQ客户端的账号密码,格式是: { "username": "xxx", "password": "xxx" } 用channel.declare_queue(“hello”)声明一个队列,队列声明是幂等的,意味着如果已存在同名的队列就不会重复创建使用basic_publish发布一个消息 ,其中: exchange参数设置为空字符,就是使用默认的其中routing_key参数就是队列的名字 receive.py 消费者 import time import pika from credentials import credentials def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") cred = pika.PlainCredentials(**credentials) with pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=cred)) as connection: channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() 解析: 创建连接、声明队列的代码和生产者一致使用basic_consume方法获取消息,在收到消息后,触发回调callback在callback中通过time.sleep()模拟一个耗时的任务

    2.2 运行

    启动send.py, 它马上就返回了,不会等待消费者接收或是callback的执行。启动receive.py, 打印出: [*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World....' [x] Done

    2.3 进阶

    消费者意外结束?消息确认

    消费者有可能会去处理一些耗时任务,有可能在执行一半就“死”了,然而它接收的消息已经被删除了,这是不可复原的因此,我们可以让消费者在成功接收、处理后回传给RabbitMQ服务器一个确认信号,在收到确认信号后,RabbitMQ服务器再删除这个消息。如果一个消费者意外结束了,并且RabbitMQ没有收到确认信号,消息就会被重新入列,并传送给另一个消费者,这样,消息就不会意外丢失。这种机制不是通过“超时”来实现的,因为消费者处理消息可能用时会非常长,RabbitMQ会判断消费者是否“死”了(Channel关闭,连接关闭,或TCP连接中断)从而将消息重新分发。再看receive.py,只需要在basic_consume方法中,把auto_ack(自动确认)参数去掉: channel.basic_consume(queue='hello', on_message_callback=callback) 然后,在回调结束的位置发送确认即可: def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag)

    RabbitMQ服务器崩溃?

    以上确认机制确保消费者意外结束而导致消息丢失,但如果RabbitMQ服务器意外结束,我们创建的队列、消息都会消失。通过durable参数使队列持久化: channel.queue_declare(queue='task_queue', durable=True) 通过使delivery_mode=2,在生产者提交一个消息时,使消息持久化: channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))

    公平分发

    Round Robin算法使得不同的消费者得到的消息数量是大致相等的,但这不是绝对公平的比如,有两个消费者进程,排在奇数位的消息很复杂,处理很耗时;而排在偶数位的消息很简单,处理时间很短,那么复杂的任务总是给其中一个进程,而另一个进程却很空闲。为此,我们可以在receive.py中对channel实例这样配置: channel.basic_qos(prefetch_count=1) 这样的话,只有在消费者发送确认,确认它的上一个任务已经完成,它才会收到下一个。
    Processed: 0.012, SQL: 8