添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

一、安装rabbitMQ的python库:

安装 http://www.rabbitmq.com/install-standalone-mac.html

安装python rabbitMQ module

pip install pika
easy_install pika
https://pypi.python.org/pypi/pika

实现最简单的队列通信:

二、消息队列适用场景

消息队列在什么场景适用下适用?

可以异步执行的场景,不要求立即返回结果的场景,不要求同步的场景。如,秒杀场景,并发大但不要求同步执行,不要求立即返回结果。

三、python连接rabbitMQ服务器前的准备工作

远程连接rabbitmq server的话,需要配置权限,需要先配置

rabbitMQ管理工具:rabbitmqctl

创建用户:

sudo rabbitmqctl add_user 用户名 密码 

配置权限,允许从外面访问:

sudo rabbitmqctl set_permissions -p / 用户名".*" ".*" ".*"

远程连接时,在使用python连接rabbitMQ服务器时,在使用python创建连接时,客户端连接的时候需要配置认证参数:

credentials = pika.PlainCredentials('用户名', '密码')
onnection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.10.10',5672,'/', credentials))
channel = connection.channel()

四、一个简单的示例

Producer(生产者): 消息的生产者,负责产生消息并把消息发到交换机

Consumer (消费者) :使用队列 Queue 从 Exchange 中获取消息的应用。

Exchange (交换机 ):负责接收生产者的消息并把它转到到合适的队列。

Queue (队列) :一个存储Exchange 发来的消息的缓冲,并将消息主动发送给Consumer,或者 Consumer 主动来获取消息。

Binding (绑定) :队列 和 交换机 之间的关系。Exchange 根据消息的属性和 Binding 的属性来转发消息。绑定的一个重要属性是 binding_key。

Connection (连接)和 Channel (通道) :生产者和消费者需要和 RabbitMQ 建立 TCP 连接。一些应用需要多个connection,为了节省TCP 连接,可以使用 Channel,它可以被认为是一种轻型的共享 TCP 连接的连接。连接需要用户认证,并且支持 TLS (SSL)。连接需要显式关闭。

Message (消息) : RabbitMQ 转发的二进制对象,包括Headers(头)、Properties (属性)和 Data (数据),其中数据部分不是必要的。

routing_key: 路由键、路由密钥。

在python A端发送消息:

import pika
# 建立socket连接
credentials = pika.PlainCredentials("username", "password")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.1.10", credentials=credentials))
# 建立rabbitMQ协议的通道
channel = connection.channel()
# 声明队列:通过通道申明队列
channel.queue_declare(queue="abc")
# 发送、发布消息:routing_key--队列名称,body--消息
channel.basic_publish(exchange="",
                      routing_key="abc",
                      body="this is a message.")
print ("[x] sent a message.")
# 关闭socket连接
connection.close()

[x] sent a message.

在rabbitMQ服务器查看消息队列:

rabbitmqctl list_queue

在python B端接收消息:

import pika
# 建立socket连接
credentials = pika.PlainCredentials("username", "password")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.1.10", credentials=credentials))
# 建立rabbitMQ协议的通道
channel = connection.channel()
# 声明队列:通过通道申明队列
channel.queue_declare(queue="abc")
# ch:rabbitMQ通道。method:附带的一些参数,类型http的头信息。properties属性。body:消息体。
def callback(ch, method, properties, body):
    print("received [x] message: %r" % ch, method, property, body)
# 接收消息:routing_key--队列名称,body--消息
channel.basic_consume(callback,
                      queue="abc",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

[*] Waiting for messages. To exit press CTRL+C

received [x] message: <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('127.0.0.1', 54177)->('127.0.0.1', 5672) params=<ConnectionParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.6a3c21cbd6ab41e8a3eb328c527d0805', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=abc'])> <class 'property'> b'this is a message.'

在以前代码中,生产者和消费都声明了同一个queue,但是起作用的只有第一次声明,第二次声明不起作用。

不管谁申请,都会先检查queue是否已经存在。如果已存在,则不会重复声明;如果不存在,则声明。