RabbitMQ Tutorial
On this page we are going to cover how to install/setup/manage RabbitMQ and how to work with it using Python.
Index
- RabbitMQ Install
- Installation RHEL / CentOS / Fedora
- Installation Debian / Ubuntu
- Installation Generic Binary Build
- Install Erlang
- Rabbit MQ
- Service Management
- Users and Authentication
- RabbitMQ Ports
- Tuning for Production
- Systemd
- Without systemd
- Verify Limits
- Some Nice CLI Tools
- Logs
- Python - Part 0
- Python - Part 1
- Sending
- Receiving
- Python - Part 2 - Work queues
- Acks
- Message durability
- Python - Part 3 - Publish/Subscribe
- Bindings
- Python - Part 4 - Routing
- Python - Part 5 - Topics
- Python - Part 6 - RPC
- Python - Part 7 - Publisher Confirms
- Management and Tools
- References
RabbitMQ Install
We are going to cover a few different methods of installing RabbitMQ.
Installation RHEL / CentOS / Fedora
You have a few different options if you want to install RabbitMQ on a Red Hat based system:
- Install from the Fedora or RHEL repo. These may not have the most up to date packages.
- Install from Package Cloud or Bintray Yum repos. Should be more up to date.
- Download the RPM from https://www.rabbitmq.com/install-rpm.html#downloads. You will need to install any dependencies.
For more details check: The Official Documentation.
Installation Debian / Ubuntu
You have a few options if you want to install RabbitMQ on Debian or Ubuntu:
- Install from standard Ubuntu or Debian repos. These are likely to be old versions and not the most up to date.
- Install from Package Cloud or Bintray apt repos.
- Download and install manually. This will require installing dependencies. https://www.rabbitmq.com/install-debian.html#manual-installation
Installation Generic Binary Build
Installing this way gives you a bit more control.
- No root access needed
- No extra repos needed
- Easy to run multiple versions ( ex. for dev )
Install Erlang
- For 3.8.9 you should install a version of Erlang between 22.3 and 23.x.
- Check this page to see what is compatible: https://www.rabbitmq.com/which-erlang.html
You can download a zero dependency version of Elang from RabbitMQ on Github. They provide RPMs and sourced archives.
The RPM is useful if the following is true:
- You are using RHEL/CentOS/Fedora
- You don’t want to add an extra repo
- You have root access and don’t mind installing a package
If you don’t want to install a package, you can install from source with the .tar.gz file:
- https://github.com/rabbitmq/erlang-rpm/releases
- https://github.com/rabbitmq/erlang-rpm/archive/v23.0.4.tar.gz
My Install of the Zero Dependency Version of Erlang
I installed this on Ubuntu 20.04. I couldn’t really install the RPM since I wasn’t using a Red Hat based system. I didn’t want to build from source since this is also geared towards Red Hat based systems ( didn’t look into this too far ). I decided to just unpack the RPM file and copy the files over. This turned out to be very easy and presented practically no issues.
Install the rpm2cpio tool. This requires root if you pull it from the repo but you could do this on a separate machine where you have permissions.
sudo apt install rpm2cpio
Download the RPM from Github:
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.1.5/erlang-23.1.5-1.el8.x86_64.rpm
Unpack the RPM and rename the directory:
rpm2cpio erlang-23.1.5-1.el8.x86_64.rpm| cpio -idmv
mv usr erlang
Change the path inside this script:
vi erlang/lib64/erlang/erts-11.1.3/bin/erl
ROOTDIR="/home/user1/erlang/lib64/erlang"
Setup the PATH variable:
export PATH=$PATH:/home/user1/erlang/lib64/erlang/erts-11.1.3/bin/
echo "export PATH=\$PATH:/home/user1/erlang/lib64/erlang/erts-11.1.3/bin/" >> .bashrc
Test it out by launching Erlang in a fresh terminal:
erl
Rabbit MQ
- Grab the latest version here on GitHub
- Check if a newer version exists and install that if this is out of date.
Paths/Dirs:
sbin | add to path |
var | data will go here by default |
- unpack tar ball
- copy into place
- setup the PATH
- setup RABBITMQ_HOME
Download the compressed archive:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-generic-unix-3.8.9.tar.xz
Unpack it and setup the PATH variable:
unxz rabbitmq-server-generic-unix-3.8.9.tar.xz
tar xvf rabbitmq-server-generic-unix-3.8.9.tar
export PATH=$PATH:/home/user1/rabbitmq_server-3.8.9/sbin
echo "export PATH=\$PATH:/home/user1/rabbitmq_server-3.8.9/sbin" >> .bashrc
Setup the RABBITMQ_HOME variable:
export RABBITMQ_HOME=/home/user1/rabbitmq_server-3.8.9
echo "export RABBITMQ_HOME=/home/user1/rabbitmq_server-3.8.9" >> .bashrc
Use these commands to start, stop, and check status:
sbin/rabbitmq-server # run server in the foreground
rabbitmq-server -detached # run server detached in the background
sbin/rabbitmqctl shutdown # shutdown the server
sbin/rabbitmqctl stop # also shutdown the server
sbin/rabbitmqctl status # show status
Config files:
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf | The node is configured here |
$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf | You can place environment variables here |
Service Management
This is going to be relevant if you installed using a package or if you set up RabbitMQ as a systemd service.
You can use the following commands to manage the service:
chkconfig rabbitmq-server on
sudo service rabbitmq-server start
sudo service rabbitmq-server stop
sudo service rabbitmq-server status
The service runs as the “rabbitmq” user by default.
Users and Authentication
Users can be setup for connections to and from RabbitMQ. These are different from OS users.
Default access:
- Default RabbitMQ user: guest
- Default RabbitMQ password: guest
- only on local host, won’t work on remote hosts
- used by server and client
RabbitMQ Ports
These ports may be needed depending on what is enabled and what plugins you are using:
4369 | epmd, peer discovery service |
5672, 5671 | AMQP 0-9-1 and 1.0 clients |
25672 | used for inter-node and CLI tools communication |
35672-35682 | used by CLI tools |
15672 | HTTP API clients, management UI and rabbitmqadmin |
61613, 61614 | STOMP |
1883, 8883 | MQTT |
15674 | STOMP-over-WebSockets |
15675 | MQTT-over-WebSockets |
15692 | Prometheus metrics |
Tuning for Production
ulimit -n
65536 | at least this for prod |
4096 | probably OK for dev |
“There are two limits in play: the maximum number of open files the OS kernel allows (fs.file-max) and the per-user limit (ulimit -n). The former must be higher than the latter.”
Systemd
/etc/systemd/system/rabbitmq-server.service.d/limits.conf[Service] LimitNOFILE=64000
Docker
/etc/docker/daemon.json....
Without systemd
- Edit one of these to adjust the per user limit:
- /etc/default/rabbitmq-server
- rabbitmq-env.conf
-
edit hard limit here: /etc/security/limits.conf
- requires enabling the pam_limits.so
Verify Limits
Three ways to verify these limits:
Use the status command:
rabbitmqctl status
Check in the proc file system:
cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits
This is the PID of the RabbitMQ Erlang VM: $RABBITMQ_BEAM_PROCESS_PID
You can also see these values in the management UI.
Some Nice CLI Tools
sudo rabbitmq-diagnostics ping # verify local node running and cli can authenticate
sudo rabbitmq-diagnostics status # show components, mem usage, etc
sudo rabbitmq-diagnostics environment # show node configuration
sudo rabbitmq-diagnostics node_health_check # more extensive health check
sudo rabbitmqctl list_queues # show queues and how many messages in each
Logs
Logs usually go here:
/var/log/rabbitmq |
Logs are kept here: $RABBITMQ_LOG_BASE
sudo journalctl --system # show system service logs
sudo journalctl --system | grep rabbitmq # RabbitMQ specific
- Log rotation is usually done weekly. You can configure it here:
/etc/logrotate.d/rabbitmq-server |
Python - Part 0
In this section we are going to get ready to start using Python to work with RabbitMQ.
Here are some basic terms to be familiar with:
- producer - sends
- consumer - receives
- producing - sending
- consuming - receiving
- queue - large message buffer
- broker - has a queue, holds messages and routes stuff
We will be using the following:
- AMQP 0-9-1
- Pika 1.0.0,
Install Pika:
python -m pip install pika --upgrade
You might need to explicitly specify python3 like this;
python3 -m pip install pika --upgrade
NOTE - I’ve tested these examples with Python 3, NOT Python 2. When I ran these on my system I explicitly used the “python3” because my system came with both 2 and 3 installed by default.
- Messages to a non-existing queue are just dropped.
- You can use an IP or hostname instead of ‘localhost’ in the connection .
Python - Part 1
In this section we are going to cover basic sending and receiving.
Sending
Place the following snippets inside a script, for example: send.py
Establish connection:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
Create a queue ( makes sure it exists ):
channel.queue_declare(queue='hello')
Send a message:
channel.basic_publish(exchange='', # default exchange
routing_key='hello', # specify queue
body='Hello World!') # message
Close the connection ( flushes buffers and makes sure message was actually sent ):
connection.close()
Receiving
Place the following snippets inside a script, for example: receive.py
Create a queue again ( makes sure it exists ):
channel.queue_declare(queue='hello')
Subscribe a callback function that will be called by the Pika library any time a message is received.
Create the callback function:
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
Tell Pika to call this function anytime something comes in in the specified queue:
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
Infinite loop that takes data and runs callbacks, [CTRL] - C to stop:
channel.start_consuming()
Example and Test
Full example send.py script:
send.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', # default exchange
routing_key='hello', # specify queue
body='Hello World!') # message
connection.close()
Full example receive.py script:
receive.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
connection.close()
Run both of these scripts, each in a separate terminal like this.
Terminal 1:
python receive.py
Terminal 2:
python send.py
Try re-running send.py in another terminal.
You can show queues and how many messages each one has with the following:
sudo rabbitmqctl list_queues
Python - Part 2 - Work queues
Round-robin dispatching can be used to parallelise tasks.
We can use this code to create a new task. You can think of it as the client.
new_task.pyimport pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='work_queue1') message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent %r" % message) connection.close()
We can use this code to create a worker process. You can think of it as a server.
worker.pyimport pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='work_queue1') def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode()) time.sleep(body.count(b'.')) print(" [x] Done") channel.basic_consume(queue='work_queue1', auto_ack=True, on_message_callback=callback) channel.start_consuming() connection.close()
Open two terminals. Run the worker script in each of them like this to create two workers that we can watch.
python worker.py
Launch the new_task.py script to send some tasks out to the workers:
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....
Acks
- If the channel or connection is closed without sending an ack the message will be re-queued and another consumer will get it.
- There are no message timeouts.
- Acks are on by default ( when not useing auto_ack=True )
You can use manual acknowlegements by leaving out “auto_ack=True” when you call basic_consume like this:
channel.basic_consume(queue='work_queue1', on_message_callback=callback)
If you do this you will need to manually send an acknowlegement when you are done processing the message. Basically just put this at the end of your callback:
ch.basic_ack(delivery_tag = method.delivery_tag)
DON’T FORGET THE ACK:
- messages will be redelivered on client exit (may look random)
- memory will grow due to unacked messages
Troubleshoot forgotten ack:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Message durability
If you want messages to survive a broker restart you will want to declare them durable.
Note - You can’t reuse the same queue name with different parameters.
Declare queue durable:
channel.queue_declare(queue='work_queue1', durable=True)
Make message persistent:
channel.basic_publish(exchange='',
routing_key="work_queue1",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
NOTE
- There may be a slight delay after accepting message and writing to disk.
- Message may be cached and RabbitMQ doesn’t call fsync().
- Use publisher confirms for a stronger guarantee.
QOS
- Use QOS to only send one message to a worker at a time.
- Helps to avoid overworking any single worker.
- Queue could fill up if all are busy.
You can enable it like this:
channel.basic_qos(prefetch_count=1) # just before basic_consume()
Example with Acks, Durability, and QOS
NOTE - We’ve changed the queue name here so that these examples can be used without removing the old queue. You can’t redefine a queue with different parameters.
This is the updated new task script:
new_task2.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='work_queue2', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2,
))
print(" [x] Sent %r" % message)
connection.close()
This is the updated worker script:
worker2.py
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='work_queue2', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='work_queue2', on_message_callback=callback)
channel.start_consuming()
connection.close()
Python - Part 3 - Publish/Subscribe
Exchange types:
- direct
- topic
- headers
- fanout
Default exchange - nameless - routing_key specifies the queue name
Show exchanges from the CLI:
sudo rabbitmqctl list_exchanges
Create fanout exchange called “logs”:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
- Routing key is ignored for fanout exchanges
- Since you are publishing to all queues each consumer can create its own temporary queue. In this case you only want new messages, not old messages.
Publish to fanout exchange:
channel.basic_publish(exchange='logs', routing_key='', body=message)
Connect to a random queue. Make sure it is deleted on consumer connection close.
result = channel.queue_declare(queue='', exclusive=True)
Bindings
Binding - relationship between exchange and queue
Show bindings from the CLI:
rabbitmqctl list_bindings
Create a binding between a queue and exchange:
channel.queue_bind(exchange='logs', queue=result.method.queue)
Here is an example of a publisher:
emit_log.pyimport pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
Here is an example of a subscriber:
receive_logs.pyimport pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
You can run these commands in two spearate terminals. One can be used to just print the received messages while the other would write them to a log file.
python receive_logs.py
python receive_logs.py > logs_from_rabbit.log
You publish like this:
python emit_log.py
Python - Part 4 - Routing
Use a routing key:
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
Direct exchange
- queue receives messages when the routing key and binding key match
- A queue can have multiple binding keys
- Multiple queues can have the same binding key
Create a direct exchange:
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
Publish using routing key:
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
Create a queue and create multiple bindings:
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
With this example you can specify the severity of the alert that you want to send as an argument. This is sent as the routing key.
emit_log_direct.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
In this example we can specify serverities that we want to see messages for. These are specified as arguments to the script. They used as routing keys. More than one can be specified.
receive_logs_direct.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Run the following on three different terminals:
python receive_logs_direct.py warning error > logs_from_rabbit.log
python receive_logs_direct.py info warning error
python emit_log_direct.py error "Run. Run. Or it will explode."
The first command will creates a queue that uses the routing keys “warning” and “error”. Any message sent with these routing keys will go to this queue. The output of the script is written to a log file. The second command will create a queue that uses the routing keys “info”, “warning”, and “error”. These will just be printed to the console. The last command just sends a message with the routing key “error”. It will be received by the queues for both scripts.
Python - Part 5 - Topics
Topic exchange:
- Routing key needs to be a list of words that are delimited by dots
- Routing key is limited to 255 bytes
- Example routing keys: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”
Binding key wildcards:
* (star) | matches exactly one word |
# (hash) | matches zero or more words |
Just using a “#” for the binding key will match all routing keys.
Example routing keys:
- “quick.orange.rabbit”
- “quick.orange.rabbit”
- “quick.orange.fox”
- “lazy.brown.fox”
- “lazy.pink.rabbit”
- “quick.brown.fox”
- “quick.orange.male.rabbit”
- “*.orange.fox”
- “lazy.brown.*”
- “lazy.*.rabbit”
Here is an example using a topic exchange and wildcards in the routing keys.
This is the publisher. It takes a list of routing keys as arguments.
emit_log_topic.py
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
This is the consumer. It takes routing keys as arguments. It will bind to each of them.
receive_logs_topic.py
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Here are examples of how we would call these scripts and pass routing keys to them:
python receive_logs_topic.py "#"
python receive_logs_topic.py "kern.*"
python receive_logs_topic.py "*.critical"
python receive_logs_topic.py "kern.*" "*.critical"
python emit_log_topic.py "kern.critical" "A critical kernel error"
Python - Part 6 - RPC
Using a callback queue:
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
Commonly used AMQP 0-9-1 properties:
delivery_mode | 2 for persistent, anything else for transient |
content_type | mime-type, ex: application/json for json |
reply_to | often used to specify a callback queue |
correlation_id | good for correlating RPC requests and responses |
It is better to create a queue per client instead of a queue per call. When you do this you need to match calls to responses on that queue. The correlation_id is useful for this.
The client has an anonymous queue and the server has a named queue. The client sends a parameter to the server’s queue. The server passes this value to a function. The value returned from the function is then sent back to the client’s queue and printed. A correlation_id is used to match the request and response.
This is the server:
rpc_server.py
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
This is the client:
rpc_client.py
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
Launch the scripts like this to test them out:
python rpc_server.py
python rpc_client.py
Just run more server instances to scale up.
Unanswered questions:
- How should the client react if there are no servers running?
- Should a client have some kind of timeout for the RPC?
- If the server malfunctions and raises an exception, should it be forwarded to the client?
- Protecting against invalid incoming messages (eg checking bounds) before processing.
Python - Part 7 - Publisher Confirms
Publisher confirms are an extension for reliable publishing. They allow the broker to confirm that messages have been received.
It doesn’t look like these are supported for Python but we may update this later. You are probably going to need Java or C# for this.
Management and Tools
Management UI - see here: Management