RabbitMQ(三)路由模式 Python实现

    科技2024-10-27  13

    1. 回顾发布/订阅模式

    在发布订阅模式中,exchange的类型设置为fanout, 所有提交到这个exchange的消息都会被推送到所有与之绑定的队列中如果希望把消息推送到一部分与该exchange绑定的队列,就要使用路由

    2. 路由

    在生产者产生消息时,通过如下代码把它提交给exchange: channel.basic_publish(exchange='logs', routing_key='', body=message) 我们通过以下代码把一个队列绑定到一个exchange上: channel.queue_bind(exchange='logs', queue=queue_name, routing_key='') 可以看出,上面的两行代码有相容的参数就是routing_key因此,只要提交消息时设置的routing_key和绑定队列时设置的routing_key一致,这个消息就会被推送到这个队列上在发布订阅模式中,exchange的类型被设置成fanout, 在这里,应该被设置成direct

    3. 代码

    以下的代码模拟了日志的产生和获取

    3.1 publish.py

    以下代码模拟了日志的产生,所谓的“日志”,就是通过参数输入的,比如执行: python3 publish.py error "this is an error" 就会向队列中加入等级为error的一条日志,内容为"this is an error"把日志等级作为routing_key,消费者就可以按照自己的需要,从队列中取出相应等级的日志 import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()

    3.2 subscribe.py

    在这个文件里,可以根据自定的日志等级查看日志信息,比如: python3 subscribe.py info debug 就可以取出等级为“info”和“debug”的日志条目 import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
    Processed: 0.019, SQL: 8