RabbitMQ

RabbitMQ

Posted by ZYT on February 11, 2019

1. What is RabbitMQ?

RabbitMQ is a “traditional” message broker that implements variety of messaging protocols. It was one of the first open source message brokers to achieve a reasonable level of features, client libraries, dev tools, and quality documentation. RabbitMQ was originally developed to implement AMQP, an open wire protocol for messaging with powerful routing features. While Java has messaging standards like JMS, it’s not helpful for non-Java applications that need distributed messaging which is severely limiting to any integration scenario, microservice or monolithic. With the advent of AMQP, cross-language flexibility became real for open source message brokers.

2. Structure

  • Exchanges and bindings
    • Direct exchange
    • Fanout exchange
    • Topic exchange
    • Headers exchange
  • Queues
  • Channels

3. What is AMQP?

AMQP 0-9-1 Model Explained

4. Practice

使用 RabbitMQpythonpika 演示 RabbitMQ 的使用方法。

4.1 Simple Example

  • send.py
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = conn.channel()

# create queue
channel.queue_declare(queue='hello')

# use default exchange
channel.basic_publish(
    exchange='', routing_key='hello', body='Hello World')
print('Send Success')
conn.close()
  • receive.py
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = conn.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Receive %r" % body)

# 默认情况下,手动消息回复为打开状态,现在设置 no_ack 为 True,设置为自动回复
channel.basic_consume(
    callback, queue='hello', no_ack=True)

print('Waiting for messages.')

channel.start_consuming()

4.2 Multiple Workers And Message durability

  • RabbitMQ 在消费者层面进行负载均衡(轮询)。

采用手动 ack

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(
    callback, queue='hello')
  • Message durability

(a) mark queue as durable

channel.queue_declare(queue='hello', durable=True)

(b) mark message as durable

channel.basic_publish(
    exchange='', 
    routing_key="task_queue", 
    body=message, 
    properties=pika.BasicProperties( delivery_mode = 2, ) # make message persistent
)
# 注意:
# Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, 
# there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be
# just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. 
# If you need a stronger guarantee then you can use publisher confirms.

publisher confirms

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = conn.channel()

# create queue
channel.queue_declare(queue='hello')
channel.confirm_delivery()

# use default exchange
if channel.basic_publish(
    exchange='', 
    routing_key='hello', 
    properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1), 
    body='Hello World'):
    print('Message publish was confirmed')
    conn.close()
else:
    print('Message could not be confirmed')

  • 公平派遣任务

RabbitMQ 由于采用轮询的方式进行任务派遣,所以有可能由于任务的不同引起部分 Worker 一直繁忙,部分 Worker 空闲。

可以使用 basic.qos 方法设置 prefetch_count=1

channel.basic_qos(prefetch_count=1)

4.3 Exchange types

Exchange types: direct, topic, headers and fanout

4.3.1 Publish/Subscribe

  • send.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(
    exchange='logs',
    exchange_type='fanout'
)

channel.basic_publish(
    exchange='logs',
    routing_key='',
    body='Hello World'
)

connection.close()
  • receive.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(
    exchange='logs',
    exchange_type='fanout'
)

# 申明 queue,随机命名,如果 exclusive=True 时,当消费者连接关闭,自动删除 queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(
    exchange='logs',
    queue=queue_name
)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    callback,
    queue=queue_name,
    no_ack=True
)

channel.start_consuming()

4.3.2 Routing

  • send.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(
    exchange='direct_logs',
    exchange_type='direct'
)

channel.basic_publish(
    exchange='direct_logs',
    routing_key='test',
    body='Hello World'
)

connection.close()
  • receive.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(
    exchange='direct_logs',
    exchange_type='direct'
)

# 申明 queue,随机命名,如果 exclusive=True 时,当消费者连接关闭,自动删除 queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(
    exchange='direct_logs',
    queue=queue_name,
    routing_key='test'
)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    callback,
    queue=queue_name,
    no_ack=True
)

channel.start_consuming()

4.3.3 Topics

  • send.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(
    exchange='topic_logs',
    exchange_type='topic'
)

channel.basic_publish(
    exchange='topic_logs',
    routing_key='test.update',
    body='Hello World'
)

connection.close()
  • receive.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(
    exchange='topic_logs',
    exchange_type='topic'
)

# 申明 queue,随机命名,如果 exclusive=True 时,当消费者连接关闭,自动删除 queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(
    exchange='topic_logs',
    queue=queue_name,
    routing_key='test.*'
)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    callback,
    queue=queue_name,
    no_ack=True
)

channel.start_consuming()

4.3.4 RPC

RabbitMQ 可实现 RPC (Remote Procedure Call) 功能。

  • rpc_server.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()
  • rpc_client.py
import uuid
import pika

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))

        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)