1. What is RabbitMQ?
RabbitMQ is a “traditional” message broker that implements variety of messaging protocols. It was one of the first open source message brokers to achieve a reasonable level of features, client libraries, dev tools, and quality documentation. RabbitMQ was originally developed to implement AMQP, an open wire protocol for messaging with powerful routing features. While Java has messaging standards like JMS, it’s not helpful for non-Java applications that need distributed messaging which is severely limiting to any integration scenario, microservice or monolithic. With the advent of AMQP, cross-language flexibility became real for open source message brokers.
2. Structure
- Exchanges and bindings
- Direct exchange
- Fanout exchange
- Topic exchange
- Headers exchange
- Queues
- Channels
3. What is AMQP?
4. Practice
使用 RabbitMQ
的 python
库 pika
演示 RabbitMQ
的使用方法。
4.1 Simple Example
- send.py
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = conn.channel()
# create queue
channel.queue_declare(queue='hello')
# use default exchange
channel.basic_publish(
exchange='', routing_key='hello', body='Hello World')
print('Send Success')
conn.close()
- receive.py
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = conn.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Receive %r" % body)
# 默认情况下,手动消息回复为打开状态,现在设置 no_ack 为 True,设置为自动回复
channel.basic_consume(
callback, queue='hello', no_ack=True)
print('Waiting for messages.')
channel.start_consuming()
4.2 Multiple Workers And Message durability
RabbitMQ
在消费者层面进行负载均衡(轮询)。
采用手动 ack
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(
callback, queue='hello')
- Message durability
(a) mark queue as durable
channel.queue_declare(queue='hello', durable=True)
(b) mark message as durable
channel.basic_publish(
exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties( delivery_mode = 2, ) # make message persistent
)
# 注意:
# Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk,
# there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be
# just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue.
# If you need a stronger guarantee then you can use publisher confirms.
publisher confirms
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = conn.channel()
# create queue
channel.queue_declare(queue='hello')
channel.confirm_delivery()
# use default exchange
if channel.basic_publish(
exchange='',
routing_key='hello',
properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1),
body='Hello World'):
print('Message publish was confirmed')
conn.close()
else:
print('Message could not be confirmed')
- 公平派遣任务
RabbitMQ
由于采用轮询的方式进行任务派遣,所以有可能由于任务的不同引起部分 Worker
一直繁忙,部分 Worker
空闲。
可以使用 basic.qos
方法设置 prefetch_count=1
channel.basic_qos(prefetch_count=1)
4.3 Exchange types
Exchange types: direct
, topic
, headers
and fanout
4.3.1 Publish/Subscribe
- send.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange='logs',
exchange_type='fanout'
)
channel.basic_publish(
exchange='logs',
routing_key='',
body='Hello World'
)
connection.close()
- receive.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange='logs',
exchange_type='fanout'
)
# 申明 queue,随机命名,如果 exclusive=True 时,当消费者连接关闭,自动删除 queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange='logs',
queue=queue_name
)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
callback,
queue=queue_name,
no_ack=True
)
channel.start_consuming()
4.3.2 Routing
- send.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange='direct_logs',
exchange_type='direct'
)
channel.basic_publish(
exchange='direct_logs',
routing_key='test',
body='Hello World'
)
connection.close()
- receive.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange='direct_logs',
exchange_type='direct'
)
# 申明 queue,随机命名,如果 exclusive=True 时,当消费者连接关闭,自动删除 queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key='test'
)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
callback,
queue=queue_name,
no_ack=True
)
channel.start_consuming()
4.3.3 Topics
- send.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange='topic_logs',
exchange_type='topic'
)
channel.basic_publish(
exchange='topic_logs',
routing_key='test.update',
body='Hello World'
)
connection.close()
- receive.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange='topic_logs',
exchange_type='topic'
)
# 申明 queue,随机命名,如果 exclusive=True 时,当消费者连接关闭,自动删除 queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key='test.*'
)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
callback,
queue=queue_name,
no_ack=True
)
channel.start_consuming()
4.3.4 RPC
RabbitMQ
可实现 RPC (Remote Procedure Call)
功能。
- rpc_server.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
- rpc_client.py
import uuid
import pika
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)