Low Orbit Flux Logo 2 F

RabbitMQ Tutorial

On this page we are going to cover how to install/setup/manage RabbitMQ and how to work with it using Python.

Index

RabbitMQ Install

We are going to cover a few different methods of installing RabbitMQ.

Installation RHEL / CentOS / Fedora

You have a few different options if you want to install RabbitMQ on a Red Hat based system:

For more details check: The Official Documentation.

Installation Debian / Ubuntu

You have a few options if you want to install RabbitMQ on Debian or Ubuntu:

Installation Generic Binary Build

Installing this way gives you a bit more control.

Install Erlang

You can download a zero dependency version of Elang from RabbitMQ on Github. They provide RPMs and sourced archives.

The RPM is useful if the following is true:

If you don’t want to install a package, you can install from source with the .tar.gz file:

My Install of the Zero Dependency Version of Erlang

I installed this on Ubuntu 20.04. I couldn’t really install the RPM since I wasn’t using a Red Hat based system. I didn’t want to build from source since this is also geared towards Red Hat based systems ( didn’t look into this too far ). I decided to just unpack the RPM file and copy the files over. This turned out to be very easy and presented practically no issues.

Install the rpm2cpio tool. This requires root if you pull it from the repo but you could do this on a separate machine where you have permissions.

sudo apt install rpm2cpio

Download the RPM from Github:

wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.1.5/erlang-23.1.5-1.el8.x86_64.rpm

Unpack the RPM and rename the directory:

rpm2cpio erlang-23.1.5-1.el8.x86_64.rpm| cpio -idmv
mv usr erlang

Change the path inside this script:

vi erlang/lib64/erlang/erts-11.1.3/bin/erl
ROOTDIR="/home/user1/erlang/lib64/erlang"

Setup the PATH variable:

export PATH=$PATH:/home/user1/erlang/lib64/erlang/erts-11.1.3/bin/
echo "export PATH=\$PATH:/home/user1/erlang/lib64/erlang/erts-11.1.3/bin/" >>  .bashrc

Test it out by launching Erlang in a fresh terminal:

erl

Rabbit MQ

Paths/Dirs:

sbin add to path
var data will go here by default

Download the compressed archive:

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-generic-unix-3.8.9.tar.xz

Unpack it and setup the PATH variable:

unxz rabbitmq-server-generic-unix-3.8.9.tar.xz 
tar xvf rabbitmq-server-generic-unix-3.8.9.tar
export PATH=$PATH:/home/user1/rabbitmq_server-3.8.9/sbin
echo "export PATH=\$PATH:/home/user1/rabbitmq_server-3.8.9/sbin" >> .bashrc

Setup the RABBITMQ_HOME variable:

export RABBITMQ_HOME=/home/user1/rabbitmq_server-3.8.9
echo "export RABBITMQ_HOME=/home/user1/rabbitmq_server-3.8.9" >> .bashrc

Use these commands to start, stop, and check status:


sbin/rabbitmq-server         # run server in the foreground
rabbitmq-server -detached   # run server detached in the background
sbin/rabbitmqctl shutdown   # shutdown the server
sbin/rabbitmqctl stop       # also shutdown the server
sbin/rabbitmqctl status     # show status

Config files:

$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf The node is configured here
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf You can place environment variables here

Service Management

This is going to be relevant if you installed using a package or if you set up RabbitMQ as a systemd service.

You can use the following commands to manage the service:


chkconfig rabbitmq-server on
sudo service rabbitmq-server start
sudo service rabbitmq-server stop
sudo service rabbitmq-server status

The service runs as the “rabbitmq” user by default.

Users and Authentication

Users can be setup for connections to and from RabbitMQ. These are different from OS users.

Default access:

RabbitMQ Ports

These ports may be needed depending on what is enabled and what plugins you are using:

4369 epmd, peer discovery service
5672, 5671 AMQP 0-9-1 and 1.0 clients
25672 used for inter-node and CLI tools communication
35672-35682 used by CLI tools
15672 HTTP API clients, management UI and rabbitmqadmin
61613, 61614 STOMP
1883, 8883 MQTT
15674 STOMP-over-WebSockets
15675 MQTT-over-WebSockets
15692 Prometheus metrics

Tuning for Production


ulimit -n
65536 at least this for prod
4096 probably OK for dev

“There are two limits in play: the maximum number of open files the OS kernel allows (fs.file-max) and the per-user limit (ulimit -n). The former must be higher than the latter.”

Systemd

/etc/systemd/system/rabbitmq-server.service.d/limits.conf
[Service] LimitNOFILE=64000

Docker

/etc/docker/daemon.json
....

Without systemd

Verify Limits

Three ways to verify these limits:

Use the status command:

rabbitmqctl status

Check in the proc file system:

cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits

This is the PID of the RabbitMQ Erlang VM: $RABBITMQ_BEAM_PROCESS_PID

You can also see these values in the management UI.

Some Nice CLI Tools

sudo rabbitmq-diagnostics ping              # verify local node running and cli can authenticate
sudo rabbitmq-diagnostics status            # show components, mem usage, etc
sudo rabbitmq-diagnostics environment       # show node configuration
sudo rabbitmq-diagnostics node_health_check # more extensive health check
sudo rabbitmqctl list_queues                # show queues and how many messages in each

Logs

Logs usually go here:

/var/log/rabbitmq

Logs are kept here: $RABBITMQ_LOG_BASE


sudo journalctl --system     # show system service logs
sudo journalctl --system | grep rabbitmq     # RabbitMQ specific
/etc/logrotate.d/rabbitmq-server

Python - Part 0

In this section we are going to get ready to start using Python to work with RabbitMQ.

Here are some basic terms to be familiar with:

We will be using the following:

Install Pika:

python -m pip install pika --upgrade

You might need to explicitly specify python3 like this;

python3 -m pip install pika --upgrade

NOTE - I’ve tested these examples with Python 3, NOT Python 2. When I ran these on my system I explicitly used the “python3” because my system came with both 2 and 3 installed by default.

Python - Part 1

In this section we are going to cover basic sending and receiving.

Sending

Place the following snippets inside a script, for example: send.py

Establish connection:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

Create a queue ( makes sure it exists ):

channel.queue_declare(queue='hello')

Send a message:

channel.basic_publish(exchange='',  # default exchange
       routing_key='hello',         # specify queue
       body='Hello World!')         # message

Close the connection ( flushes buffers and makes sure message was actually sent ):

connection.close()

Receiving

Place the following snippets inside a script, for example: receive.py

Create a queue again ( makes sure it exists ):

channel.queue_declare(queue='hello')

Subscribe a callback function that will be called by the Pika library any time a message is received.

Create the callback function:

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

Tell Pika to call this function anytime something comes in in the specified queue:

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

Infinite loop that takes data and runs callbacks, [CTRL] - C to stop:

channel.start_consuming()

Example and Test

Full example send.py script:

send.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', # default exchange routing_key='hello', # specify queue body='Hello World!') # message connection.close()

Full example receive.py script:

receive.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback) channel.start_consuming() connection.close()

Run both of these scripts, each in a separate terminal like this.

Terminal 1:

python receive.py

Terminal 2:

python send.py

Try re-running send.py in another terminal.

You can show queues and how many messages each one has with the following:

sudo rabbitmqctl list_queues

Python - Part 2 - Work queues

Round-robin dispatching can be used to parallelise tasks.

We can use this code to create a new task. You can think of it as the client.

new_task.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='work_queue1') message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent %r" % message) connection.close()

We can use this code to create a worker process. You can think of it as a server.

worker.py
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='work_queue1') def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode()) time.sleep(body.count(b'.')) print(" [x] Done") channel.basic_consume(queue='work_queue1', auto_ack=True, on_message_callback=callback) channel.start_consuming() connection.close()

Open two terminals. Run the worker script in each of them like this to create two workers that we can watch.

python worker.py

Launch the new_task.py script to send some tasks out to the workers:

python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

Acks

You can use manual acknowlegements by leaving out “auto_ack=True” when you call basic_consume like this:

channel.basic_consume(queue='work_queue1', on_message_callback=callback)

If you do this you will need to manually send an acknowlegement when you are done processing the message. Basically just put this at the end of your callback:

ch.basic_ack(delivery_tag = method.delivery_tag)

DON’T FORGET THE ACK:

Troubleshoot forgotten ack:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Message durability

If you want messages to survive a broker restart you will want to declare them durable.

Note - You can’t reuse the same queue name with different parameters.

Declare queue durable:

channel.queue_declare(queue='work_queue1', durable=True)

Make message persistent:

channel.basic_publish(exchange='',
                      routing_key="work_queue1",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2,                   # make message persistent
                      ))

NOTE

QOS

You can enable it like this:

channel.basic_qos(prefetch_count=1)    # just before basic_consume()

Example with Acks, Durability, and QOS

NOTE - We’ve changed the queue name here so that these examples can be used without removing the old queue. You can’t redefine a queue with different parameters.

This is the updated new task script:

new_task2.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='work_queue2', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message, properties=pika.BasicProperties( delivery_mode = 2, )) print(" [x] Sent %r" % message) connection.close()

This is the updated worker script:

worker2.py
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='work_queue2', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode()) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='work_queue2', on_message_callback=callback) channel.start_consuming() connection.close()

Python - Part 3 - Publish/Subscribe

Exchange types:

Default exchange - nameless - routing_key specifies the queue name

Show exchanges from the CLI:

sudo rabbitmqctl list_exchanges

Create fanout exchange called “logs”:

channel.exchange_declare(exchange='logs', exchange_type='fanout')

Publish to fanout exchange:

channel.basic_publish(exchange='logs', routing_key='', body=message)

Connect to a random queue. Make sure it is deleted on consumer connection close.

result = channel.queue_declare(queue='', exclusive=True)

Bindings

Binding - relationship between exchange and queue

Show bindings from the CLI:

rabbitmqctl list_bindings

Create a binding between a queue and exchange:

channel.queue_bind(exchange='logs', queue=result.method.queue)

Here is an example of a publisher:

emit_log.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()

Here is an example of a subscriber:

receive_logs.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()

You can run these commands in two spearate terminals. One can be used to just print the received messages while the other would write them to a log file.

python receive_logs.py
python receive_logs.py > logs_from_rabbit.log

You publish like this:

python emit_log.py

Python - Part 4 - Routing

Use a routing key:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

Direct exchange

Create a direct exchange:

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

Publish using routing key:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

Create a queue and create multiple bindings:

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

With this example you can specify the severity of the alert that you want to send as an argument. This is sent as the routing key.

emit_log_direct.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()

In this example we can specify serverities that we want to see messages for. These are specified as arguments to the script. They used as routing keys. More than one can be specified.

receive_logs_direct.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()

Run the following on three different terminals:

python receive_logs_direct.py warning error > logs_from_rabbit.log
python receive_logs_direct.py info warning error
python emit_log_direct.py error "Run. Run. Or it will explode."

The first command will creates a queue that uses the routing keys “warning” and “error”. Any message sent with these routing keys will go to this queue. The output of the script is written to a log file. The second command will create a queue that uses the routing keys “info”, “warning”, and “error”. These will just be printed to the console. The last command just sends a message with the routing key “error”. It will be received by the queues for both scripts.

Python - Part 5 - Topics

Topic exchange:

Binding key wildcards:

* (star) matches exactly one word
# (hash) matches zero or more words

Just using a “#” for the binding key will match all routing keys.

Example routing keys:

Here is an example using a topic exchange and wildcards in the routing keys.

This is the publisher. It takes a list of routing keys as arguments.

emit_log_topic.py
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()

This is the consumer. It takes routing keys as arguments. It will bind to each of them.

receive_logs_topic.py
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind( exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()

Here are examples of how we would call these scripts and pass routing keys to them:

python receive_logs_topic.py "#"
python receive_logs_topic.py "kern.*"
python receive_logs_topic.py "*.critical"
python receive_logs_topic.py "kern.*" "*.critical"
python emit_log_topic.py "kern.critical" "A critical kernel error"

Python - Part 6 - RPC

Using a callback queue:

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

Commonly used AMQP 0-9-1 properties:

delivery_mode 2 for persistent, anything else for transient
content_type mime-type, ex: application/json for json
reply_to often used to specify a callback queue
correlation_id good for correlating RPC requests and responses

It is better to create a queue per client instead of a queue per call. When you do this you need to match calls to responses on that queue. The correlation_id is useful for this.

The client has an anonymous queue and the server has a named queue. The client sends a parameter to the server’s queue. The server passes this value to a function. The value returned from the function is then sent back to the client’s queue and printed. A correlation_id is used to match the request and response.

This is the server:

rpc_server.py
import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()

This is the client:

rpc_client.py
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)

Launch the scripts like this to test them out:

python rpc_server.py
python rpc_client.py

Just run more server instances to scale up.

Unanswered questions:

Python - Part 7 - Publisher Confirms

Publisher confirms are an extension for reliable publishing. They allow the broker to confirm that messages have been received.

It doesn’t look like these are supported for Python but we may update this later. You are probably going to need Java or C# for this.

Management and Tools

Management UI - see here: Management

References