RabbitMQ(二)发布订阅模式 Python实现

    科技2024-08-16  35

    1. Publish/Subscribe

    在work queue(工作队列)模式中,一个消息被一个消费者处理(一项工作被一个人完成)而使用发布/订阅,一个消息可以传递给多个消费者。

    2. Exchange

    当生产者产出消息时,它把消息提交给Exchange,而不是直接提交到队列中。Exchange再把消息分到不同的队列那么消息应该分发给哪个队列?一个还是多个队列?这是由Exchange的类型决定的

    2.1 Exchange的类型

    direct 把消息加入与该消息的“键”一致的队列topicheadersfanout 把消息加入到所有已知队列

    2.2 创建Exchange

    以下代码创建了一个名为logs的exchange为了把消息添加到所有已知队列,把它的类型设置为fanout channel.exchange_declare(exchange='logs', exchange_type='fanout')

    3 发布消息

    在工作队列模式中,我们用如下代码发布消息 channel.basic_publish(exchange='', routing_key='hello', body=message) exchange参数设置成空字符,也就是使用默认的exchange, routing_key设置为"hello", 也就是在所有队列中查找名为hello的队列,并把消息添加到其中而在发布订阅模式中,我们通过如下代码发布消息: channel.basic_publish(exchange='logs', routing_key='', body=message) 指定exchange的名字,而routing_key设置为空字符

    4. 绑定exchange和队列

    把若干个队列绑定到一个fanout类型的exchange上,那么消息就可以被推送到这些队列中了: channel.queue_bind(exchange='logs', queue="some_queue_name")

    5. 整体代码

    5.1 发布者和订阅者

    publish.py 代表发布者(生产者) import sys import pika from credentials import credentials cred = pika.PlainCredentials(**credentials) with pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=cred)) as connection: channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) 解析: 通过channel的exchange_declare声明一个exchange,类型为fanout通过basic_publish方法把消息发布出去 subscribe.py 代表订阅者(消费者) import pika from credentials import credentials def callback(ch, method, properties, body): print(" [x] %r" % body) cred = pika.PlainCredentials(**credentials) with pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=cred)) as connection: channel = connection.channel() channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()

    5.2 运行结果

    可以尝试运行多个消费者,并且在同一个exchange上绑定多个队列会发现,虽然生产者只生产了一次,但所有的消费者都收到了来自所有队列上的消息这和工作队列中只有一个消费者能接收消息并处理不同
    Processed: 0.008, SQL: 8