Low Orbit Flux Logo 2 D

RabbitMQ Tutorial

Index

RabbitMQ Install

Installation RHEL / CentOS / Fedora

Install from the Fedora or RHEL repo. These may not have the most up to date packages. Install from Package Cloud or Bintray Yum repos. Should be more up to date. Download the RPM from https://www.rabbitmq.com/install-rpm.html#downloads. You will need to install any dependencies.

details https://www.rabbitmq.com/install-rpm.html

Installation Debian / Ubuntu

Install from standard Ubuntu or Debian repos. These are likely to be old versions and not the most up to date. Install from Package Cloud or Bintray apt repos. Download and install manually. This will require installing dependencies. https://www.rabbitmq.com/install-debian.html#manual-installation

Installation Generic Binary Build

Installing this way gives you a bit more control.

No root access needed No extra repos needed Easy to run multiple versions ( ex. for dev )

Grab the latest version here:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.8/rabbitmq-server-generic-unix-3.8.8.tar.xz

Check if a newer version exists and install that if this is out of date.

Install Erlang For 3.8.8 you should install a version of erlang between 21.3 and 23.x. Check this page to see what is compatible: https://www.rabbitmq.com/which-erlang.html

sbin add to path var data will go here by default

setup the PATH setup RABBITMQ_HOME


sbin/rabbitmq-serve         # run server in the foreground
rabbitmq-server -detached   # run server detached in the background
sbin/rabbitmqctl shutdown   # shutdown the server
sbin/rabbitmqctl stop       # also shutdown the server
sbin/rabbitmqctl stop       # show status

The node is configured here: $RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf

You can place environment variables here: $RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf

You can download a zero dependency version of Elang from RabbitMQ on Github. They provide RPMs and sourced archives. The RPM is useful in the following cases: you are using RHEL/CentOS/Fedora don’t want to add an extra repo You have root access and don’t mind installing a package

If you don’t want to install a package, you can install from source with the .tar.gz file:

https://github.com/rabbitmq/erlang-rpm/releases https://github.com/rabbitmq/erlang-rpm/archive/v23.0.4.tar.gz

Service Management


chkconfig rabbitmq-server on
sudo service rabbitmq-server start
sudo service rabbitmq-server stop
sudo service rabbitmq-server status

runs as rabbitmq by default

default access, only on local host, won’t work on remote hosts, used by server and client
user guest with password guest

RabbitMQ Ports

These ports may be needed depending on what is enabled and what plugins you are using:

4369 epmd, peer discovery service
5672, 5671 AMQP 0-9-1 and 1.0 clients
25672 used for inter-node and CLI tools communication
35672-35682 used by CLI tools
15672 HTTP API clients, management UI and rabbitmqadmin
61613, 61614 STOMP
1883, 8883 MQTT
15674 STOMP-over-WebSockets
15675 MQTT-over-WebSockets
15692 Prometheus metrics

Tuning for Production


ulimit -n
65536 at least this for prod
4096 probably OK for dev

“There are two limits in play: the maximum number of open files the OS kernel allows (fs.file-max) and the per-user limit (ulimit -n). The former must be higher than the latter.”

Systemd

/etc/systemd/system/rabbitmq-server.service.d/limits.conf
[Service] LimitNOFILE=64000

Docker

/etc/docker/daemon.json
....

Without systemd

Verify Limits

Three ways to verify these limits:

Check status like this: rabbitmqctl status

cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits

$RABBITMQ_BEAM_PROCESS_PID - this is the PID of the RabbitMQ Erlang VM

You can also see these values in the management UI.

Some Nice CLI Tools


sudo rabbitmq-diagnostics ping              # verify local node running and cli can authenticate
sudo rabbitmq-diagnostics status            # show components, mem usage, etc
sudo rabbitmq-diagnostics environment       # show node configuration
sudo rabbitmq-diagnostics node_health_check # more extensive health check

Logs

Logs usually go here:

/var/log/rabbitmq

RABBITMQ_LOG_BASE - logs kepot here


sudo journalctl --system     # show system service logs
sudo journalctl --system | grep rabbitmq     # RabbitMQ specific

Log rotation is usually done weekly. You can configure it here:

/etc/logrotate.d/rabbitmq-server

Python - Part 1

Using the following:


python -m pip install pika --upgrade

messages to a non-existing queue are just dropped

You can use an IP or hostname instead of ‘localhost’ in the connection .

Sending

establish connection:


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

Create a queue ( makes sure it exists ):


channel.queue_declare(queue='hello')

Send a message:


channel.basic_publish(exchange='',      # default exchange
                      routing_key='hello',          # specify queue
                      body='Hello World!')         # message

Close the connection ( flushes buffers and makes sure message was actually sent ):


connection.close()

Show queues and how many messages each one has:


sudo rabbitmqctl list_queues

Receiving

Create a queue again ( makes sure it exists ):


channel.queue_declare(queue='hello')

Subscribe a callback function that will be called by the Pika library any time a message is received.

Create the callback function:


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

Tell Pika to call this function anytime something comes in in the specified queue:


channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

Infinite look that takes data and runs callbacks, [CTRL] - C to stop:


channel.start_consuming()

Don’t need in my example:


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Terminal 1:

python receive.py

Terminal 2:

python send.py

Try re-running send.py in another terminal.

Python - Part 2 - Work queues

Round-robin dispatching can be used to parallelise tasks.

new_task.py
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent %r" % message)
worker.py
import time def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode()) time.sleep(body.count(b'.')) print(" [x] Done")

Two terminals, both running this as workers:

python worker.py

Send some tasks:

python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

Acks

If the channel or connection is closed without sending an ack the message will be re-queued. No message timeouts.

Acks are on by default ( when not useing auto_ack=True ):

channel.basic_consume(queue='hello', on_message_callback=callback)

Put this in your callback:

ch.basic_ack(delivery_tag = method.delivery_tag)

DON’T FORGET THE ACK

Troubleshoot forgotten ack:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Message durability

Note - can’t reuse same queue name with different parameters

Declare queue durable:

channel.queue_declare(queue='task_queue', durable=True)

Make message persistent:

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2,                   # make message persistent
                      ))

NOTE

Slight delay after accepting message and writing to disk Message may be cached and RabbitMQ doesn’t call fsync() Use publisher confirms for a stronger guarantee

QOS

Use QOS to only send one message to a worker at a time:

channel.basic_qos(prefetch_count=1)    # just before basic_consume()

Python - Part 3 - Publish/Subscribe

Exchange types:

Default exchange - nameless - routing_key specifies the queue name

Show exchanges from the CLI:

sudo rabbitmqctl list_exchanges

Create fanout exchange called “logs”:

channel.exchange_declare(exchange='logs', exchange_type='fanout')

Routing key is ignored for fanout exchanges

Since you are publishing to all queues each consumer can create its own temporary queue. In this case you only want new messages, not old messages.

Publish to fanout exchange:

channel.basic_publish(exchange='logs', routing_key='', body=message)

Connect to a random queue. Make sure it is deleted on consumer connection close.

result = channel.queue_declare(queue='', exclusive=True)

Bindings

Binding - relationship between exchange and queue

Show bindings from the CLI:

rabbitmqctl list_bindings

Create a binding between a queue and exchange:

channel.queue_bind(exchange='logs', queue=result.method.queue)
emit_log.py
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
receive_logs.py
import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
python receive_logs.py
python receive_logs.py > logs_from_rabbit.log
python emit_log.py

Python - Part 4 - Routing

Use a routing key:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

Direct exchange

Create a direct exchange:

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

Publish using routing key:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

Create a queue and create multiple bindings:

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

emit_log_direct.py

import pika import sys

connection = pika.BlockingConnection( pika.ConnectionParameters(host=’localhost’)) channel = connection.channel()

channel.exchange_declare(exchange=’direct_logs’, exchange_type=’direct’)

severity = sys.argv[1] if len(sys.argv) > 1 else ‘info’ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!’ channel.basic_publish( exchange=’direct_logs’, routing_key=severity, body=message) print(“ [x] Sent %r:%r” % (severity, message)) connection.close()

receive_logs_direct.py

import pika import sys

connection = pika.BlockingConnection( pika.ConnectionParameters(host=’localhost’)) channel = connection.channel()

channel.exchange_declare(exchange=’direct_logs’, exchange_type=’direct’)

result = channel.queue_declare(queue=’’, exclusive=True) queue_name = result.method.queue

severities = sys.argv[1:] if not severities: sys.stderr.write(“Usage: %s [info] [warning] [error]\n” % sys.argv[0]) sys.exit(1)

for severity in severities: channel.queue_bind( exchange=’direct_logs’, queue=queue_name, routing_key=severity)

print(‘ [*] Waiting for logs. To exit press CTRL+C’)

def callback(ch, method, properties, body): print(“ [x] %r:%r” % (method.routing_key, body))

channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

On three different terminals:

python receive_logs_direct.py warning error > logs_from_rabbit.log python receive_logs_direct.py info warning error python emit_log_direct.py error “Run. Run. Or it will explode.”

Python - Part 5 - Topics

Topic exchange routing key needs to be a list of words that are delimited by dots routing key is limited to 255 bytes example routing keys: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”

Binding key wildcards:

* (star) matches exactly one word
# (hash) matches zero or more words

Just using a “#” for the binding key will match all routing keys.

Example using a topic exchange and wildcards in the binding keys:

Example routing keys:

Big example:

emit_log_topic.py

import pika import sys

connection = pika.BlockingConnection( pika.ConnectionParameters(host=’localhost’)) channel = connection.channel()

channel.exchange_declare(exchange=’topic_logs’, exchange_type=’topic’)

routing_key = sys.argv[1] if len(sys.argv) > 2 else ‘anonymous.info’ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!’ channel.basic_publish( exchange=’topic_logs’, routing_key=routing_key, body=message) print(“ [x] Sent %r:%r” % (routing_key, message)) connection.close()

receive_logs_topic.py

import pika import sys

connection = pika.BlockingConnection( pika.ConnectionParameters(host=’localhost’)) channel = connection.channel()

channel.exchange_declare(exchange=’topic_logs’, exchange_type=’topic’)

result = channel.queue_declare(‘’, exclusive=True) queue_name = result.method.queue

binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write(“Usage: %s [binding_key]…\n” % sys.argv[0]) sys.exit(1)

for binding_key in binding_keys: channel.queue_bind( exchange=’topic_logs’, queue=queue_name, routing_key=binding_key)

print(‘ [*] Waiting for logs. To exit press CTRL+C’)

def callback(ch, method, properties, body): print(“ [x] %r:%r” % (method.routing_key, body))

channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

python receive_logs_topic.py “#” python receive_logs_topic.py “kern.” python receive_logs_topic.py “.critical” python receive_logs_topic.py “kern.” “.critical” python emit_log_topic.py “kern.critical” “A critical kernel error”

Python - Part 6 - RPC

Using a callback queue:

result = channel.queue_declare(queue=’’, exclusive=True) callback_queue = result.method.queue

channel.basic_publish(exchange=’’, routing_key=’rpc_queue’, properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)

Commonly used AMQP 0-9-1 properties: delivery_mode - 2 for persistent, anything else for transient content_type - mime-type, ex: application/json for json reply_to - often used to specify a callback queue correlation_id - good for correlating RPC requests and responses

It is better to create a queue per client instead of a queue per call. When you do this you need to match calls to responses on that queue. The correlation_id is useful for this.

rpc_server.py

import pika

connection = pika.BlockingConnection( pika.ConnectionParameters(host=’localhost’))

channel = connection.channel()

channel.queue_declare(queue=’rpc_queue’)

def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body): n = int(body)

print(" [.] fib(%s)" % n)
response = fib(n)

ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1) channel.basic_consume(queue=’rpc_queue’, on_message_callback=on_request)

print(“ [x] Awaiting RPC requests”) channel.start_consuming()

rpc_client.py import pika import uuid

class FibonacciRpcClient(object):

def __init__(self):
    self.connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))

    self.channel = self.connection.channel()

    result = self.channel.queue_declare(queue='', exclusive=True)
    self.callback_queue = result.method.queue

    self.channel.basic_consume(
        queue=self.callback_queue,
        on_message_callback=self.on_response,
        auto_ack=True)

def on_response(self, ch, method, props, body):
    if self.corr_id == props.correlation_id:
        self.response = body

def call(self, n):
    self.response = None
    self.corr_id = str(uuid.uuid4())
    self.channel.basic_publish(
        exchange='',
        routing_key='rpc_queue',
        properties=pika.BasicProperties(
            reply_to=self.callback_queue,
            correlation_id=self.corr_id,
        ),
        body=str(n))
    while self.response is None:
        self.connection.process_data_events()
    return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(“ [x] Requesting fib(30)”) response = fibonacci_rpc.call(30) print(“ [.] Got %r” % response)

python rpc_server.py python rpc_client.py

Just run more server instances to scale up.

Unanswered questions: How should the client react if there are no servers running? Should a client have some kind of timeout for the RPC? If the server malfunctions and raises an exception, should it be forwarded to the client? Protecting against invalid incoming messages (eg checking bounds) before processing.

Python - Part 7 - Publisher Confirms

Publisher confirms are an extension for reliable publishing. They allow the broker to confirm that messages have been received.

It doesn’t look like these are supported for Python but we may update this later. You are probably going to need Java or C# for this.

Management and Tools

Management UI - see here: https://www.rabbitmq.com/management.html

References

https://www.rabbitmq.com/documentation.html https://www.rabbitmq.com/configure.html https://www.rabbitmq.com/production-checklist.html https://www.rabbitmq.com/access-control.html https://www.rabbitmq.com/cli.html