运行环境
使用docker 部署
客户端连接
无密码连接
使用docker 启动环境
1
| docker run -d --hostname rabbit-svr --name rabbit -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq:management
|
python 客户端连接
1 2 3 4 5 6 7 8 9 10
| import pika
parameters = pika.ConnectionParameters( host='localhost', port=5672, virtual_host='/' ) connection = pika.BlockingConnection(parameters)
channel = connection.channel()
|
客户端使用密码连接
下文直接省略 channel
创建之前的代码
简单模式
flowchart LR
P((P)) -->|发送消息| hello[hello]
hello --> C((C))
producer
1 2 3 4 5 6 7 8 9 10 11
| channel.queue_declare(queue='hello', durable=True, auto_delete=True)
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello Rabbit!') print(" [x] Send messages")
|
consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| channel.queue_declare(queue='hello', durable=True, auto_delete=True)
def callback(ch, method, properties, body): print(" [x] Received %r" % ch) print(" [x] Received %r" % method) print(" [x] Received %r" % properties) print(f"[x] Received {body.decode()}") channel.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='hello', auto_ack=False, on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
|
channel.queue_declare
参数解读
- queue,str,queue name
- passive,bool, Only check to see if the queue exists and raise
- durable,bool,Survive reboots of the broker
- exclusive,bool,Only allow access by the current connection
- auto_delete,bool,Delete after consumer cancels or disconnects
在producer
和consumer
绑定的queue_declare
必须有同样的设置,才能生效,不然会报错.
consumer 其他获取方式
1 2 3
| dat = channel.basic_get('hello',auto_ack=True)
print(dat)
|
工作模式
flowchart LR
P((P)) -->|发送消息| hello[hello]
hello --> C1((C1))
hello --> C2((C2))
工作队列(也称为:任务队列)的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们将任务安排稍后执行。我们将任务封装为消息并发送到队列中。一个在后台运行的工作进程将弹出任务,并最终执行该工作。当你运行多个工作进程时,任务将在它们之间共享。
现在我们将发送代表复杂任务的字符串。我们没有现实中的任务,比如需要调整大小的图像或需要渲染的 PDF 文件,所以让我们通过假装忙碌来模拟它——使用 time.sleep()
函数。我们将字符串中点的数量作为其复杂度;每个点将代表一秒钟的”工作”。例如,由 Hello...
描述的模拟任务将需要三秒钟
允许从命令行发送任意消息。该程序将任务安排到我们的工作队列中,所以让我们将其命名为 new_task.py
:
1 2 3 4 5 6 7
| import sys
message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task', body=message) print(f" [x] Sent {message}")
|
以下脚本为消息正文中的每个点模拟一秒钟的工作。它将弹出队列中的消息并执行任务,所以让我们将其称为 worker.py
:
1 2 3 4 5 6
| import time
def callback(ch, method, properties, body): print(f" [x] Received {body.decode()}") time.sleep(body.count(b'.')) print(" [x] Done")
|
在两个控制台先运行worker.py
,再运行new_task.py
将数据分发下去.
1 2 3 4 5 6 7 8 9 10 11 12
| # console 1 python worker.py
# console 2 python worker.py
# console 3 python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message.....
|
观察到以下内容
1 2 3 4 5 6
| # shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
|
1 2 3 4 5
| # shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
|
默认情况下,RabbitMQ
会将每条消息按顺序发送给下一个消费者。平均每个消费者会收到相同数量的消息。这种消息分发方式称为轮询。
消息确认
为了确保消息永远不会丢失,RabbitMQ 支持消息确认。确认是由消费者发送回 RabbitMQ 的,以告诉 RabbitMQ 某个特定的消息已经被接收、处理,并且 RabbitMQ 可以删除它。
如果一个消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失),且未发送 ack,RabbitMQ 会理解消息未被完全处理,并将重新入队。如果同时有其他消费者在线,它会迅速将消息重新投递给另一个消费者。这样,你可以确保即使工作进程偶尔死亡,也不会丢失任何消息。
一旦任务完成,就是时候从工作进程发送正确的确认了这里使用ch.basic_ack(delivery_tag = method.delivery_tag)
1 2 3 4 5 6 7
| def callback(ch, method, properties, body): print(f" [x] Received {body.decode()}") time.sleep(body.count(b'.') ) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
|
如果忘记确认消息可以在控制台中使用以下命令来查看没有确认的消息.
rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。确保消息不会丢失需要两件事:我们需要将队列和消息都标记为持久的。
首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为持久的:
1
| channel.queue_declare(queue='hello', durable=True)
|
这个 queue_declare
的更改需要同时应用于生产者和消费者代码。
现在即使 RabbitMQ 重启, task_queue
队列也不会丢失。现在我们需要将消息标记为持久性——通过提供一个值为 pika.DeliveryMode.Persistent
的 delivery_mode
属性
1 2 3 4 5 6
| channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = pika.DeliveryMode.Persistent ))
|
channel 的公平分发
现在调度仍然不完全符合我们的预期。例如,在有两个工作者的情况下,如果所有奇数消息都很重而偶数消息都很轻,一个工作者将一直很忙,而另一个工作者几乎不做任何工作。嗯,RabbitMQ 对此一无所知,仍然会均匀地分发消息。
我们可以使用 Channel#basic_qos
通道方法和 prefetch_count=1
设置。这使用 basic.qos
协议方法来告诉 RabbitMQ 一次不要给一个工作进程超过一条消息。换句话说,在它处理并确认前一条消息之前,不要向工作进程发送新消息。相反,它会将其发送给下一个不忙的工作进程。
1
| channel.basic_qos(prefetch_count=1)
|
flowchart LR
P((P)) -->|发送消息| hello[hello]
hello -->|prefetch=1| C1((C1))
hello -->|prefetch=1| C2((C2))
完整代码示例 new_task.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import sys
import pika
connection = pika.BlockingConnection( pika.ConnectionParameters(host="localhost"), ) channel = connection.channel()
channel.queue_declare(queue="task_queue", durable=True)
message = " ".join(sys.argv[1:]) or "Hello World!" channel.basic_publish( exchange="", routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode=pika.DeliveryMode.Persistent, ), ) print(f" [x] Sent {message}")
connection.close()
|
worker.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import pika import time
connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body): print(f" [x] Received {body.decode()}") time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
|
路由模式
flowchart LR
DL(direct_logs):::exchange -->|debug| Q_DEBUG[debug]:::queue
DL -->|error| Q_ERROR[error]:::queue
DL -->|info| Q_INFO[info]:::queue
publisher.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import pika import sys
connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
|
woker.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import sys
import pika
queue_name = '' connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)
for severity in severities: queue_name = severity print(queue_name) channel.queue_declare(queue=queue_name, exclusive=True) channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
|
RabbitMQ 的消息传递流程是:生产者 → 交换机 → 队列 → 消费者。其中,交换机的核心作用是根据绑定关系(Binding)和路由键(Routing Key)将消息路由到对应的队列。
- 交换机必须通过绑定关系与队列关联,绑定规则决定了消息如何从交换机 “转发” 到队列。
- 如果交换机没有绑定任何队列,消息到达交换机后,由于没有可路由的目标队列,将无法被存储,最终被直接丢弃。
参考文档
1.官方文档rabbitmq-libraries
2.RabbitMQ(三)路由模式 Python实现