安装并配置 rabbitmq
安装 pika
pip3 install pika
生产者
1.send.py
import pika
# 连接服务器
rabbit_username = ''
rabbit_password = ''
credentials = pika.PlainCredentials(rabbit_username,rabbit_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.173.32.59',port='5672',credentials=credentials))
# channel 是进行消息读写的通道
channel = connection.channel()
# 第二步,创建一个名为 queue 的队列,然后把消息发送到这个队列
channel.queue_declare(queue='queue')
# 第三步,现在可以发送消息,但是 RabbitMQ 不能直接把消息发送到队列,需要先发送到交换机,这里先使用默认交换机(exchange),它使用一个空字符串表示,routing_key
# 参数必须指定队列名称,这里为 queue
channel.basic_publish(exchange='',routing_key='queue',body='hello world')
print('send:send message ‘hello world')
connection.close()
执行后得到
send:send message ‘hello world
消费者
2.receive.py
import pika
# 先连接服务器
rabbit_username = ''
rabbit_password = ''
credentials = pika.PlainCredentials(rabbit_username,rabbit_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.173.32.59',port='5672',credentials=credentials))
channel = connection.channel()
# 为确保队列存在,在执行一次 queue_declare 创建一个队列,我们可以多次运行该命令,但是只有一个队列会被创建
channel.queue_declare(queue='queue')
# 第三步,定义一个回调函数,获得消息时,pika 哭调用这个回调函数来处理消息,该回调消息将打印到屏幕
def callback(ch,method,properties,body):
print("receive: %r" %(body,))
channel.basic_consume(callback,queue='queue',no_ack=True)
print( 'Waiting for messages.')
channel.start_consuming()
执行得到
Waiting for messages.
receive: b'hello world'
该程序不会中断,我们修改生产者的代码中的 body 内容为’hello world1’
channel.basic_publish(exchange='',routing_key='queue',body='hello world1')
然后执行后观察 receive.py
可以看到该程序收到了发送的内容。
现在登录 web 管理平台,可以看到当前的连接情况、channel、queue 等信息。
一些概念
Broker:消息队列服务器实体
消息:每个消息都有一个路由键(routing key)的属性。就是一个简单的字符串。
connection:应用程序与broker的网络连接。
channel:几乎所有的操作都在channel中进行,channel是进行消息读写的通道。客户端可建立多个channel,每个channel代表一个会话任务。
交换机:接收消息,根据路由键转发消息到绑定的队列。
绑定:一个绑定就是基于路由键将交换机和队列连接起来的路由规则,所以交换机不过就是一个由绑定构成的路由表。
举例:一个具有路由键“key1”的消息要发送到两个队列,queueA和queueB。要做到这点就要建立两个绑定,每个绑定连接一个交换机和一个队列。两者都是由路由键“key1”触发,这种情况,交换机会复制一份消息并把它们分别发送到两个队列中。
队列:消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
交换机
交换机用来接收消息,转发消息到绑定的队列,是rabbitMq中的核心。
交换机共有4种类型:direct,topic,headers和fanout。
为什么不创建一种交换机来处理所有类型的路由规则?因为每种规则匹配时的CPU开销是不同的,所以根据不同需求选择合适交换机。
Direct交换机:转发消息到routingKey指定队列(完全匹配,单播)
routingKey与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发routingkey标记为dog的消息,不会转发dog.puppy,也不会转发dog.guard等。
Topic交换机:按规则转发消息(最灵活,组播)
Topic类型交换机通过模式匹配分配消息的routing-key属性。将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
Fanout交换机:转发消息到所有绑定队列(最快,广播)
fanout交换机不处理路由键,简单的将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。
一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。
还有一些其他类型的交换机类型,如header、failover、system等,现在在当前的RabbitMQ版本中均未实现。
因为交换机是命名实体,声明一个已经存在的交换机,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换机,然后重新声明并且赋予新的类型。
交换机的属性:
- 持久性:如果启用,交换机将会在server重启前都有效。
- 自动删除:如果启用,那么交换机将会在其绑定的队列都被删掉之后删除自身。
- 惰性:如果没有声明交换机,那么在执行到使用的时候会导致异常,并不会主动声明。
队列的属性:
- 持久性:如果启用,队列将在Server服务重启前都有效。
- 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除自身。
- 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
- 排他性:如果启用,队列只能被声明它的消费者使用。