Skip to Content
Author's profile photo Vadim Klimov

Uniting AMQP and MQTT Message Brokering with RabbitMQ


In this blog, I would like to share experience of embedding sensor devices into existing message brokering, log data processing and visualization infrastructure. Some elements of this infrastructure are already successfully used in their traditional application areas and niches where they became well known and widespread. Motivation of this blog is to describe how use cases of already known infrastructure components can be re-thought and extended.

The prototype aims demonstration of message broker capabilities in processing variety of transmitted data that is generated not only by enterprise systems, but also by Internet of Things (IoT) devices and applied computing units, and yet another use case for log data processing and visualization tools.


Below is a diagram depicting components involved in a prototype:

Component communication diagram.png

Following tools were used in a prototype:

  • A single-board computer Raspberry Pi (Raspberry Pi – Teach, Learn, and Make with Raspberry Pi) – a gateway responsible for initial processing of data submitted from sensors via GPIO and sending data to a message broker via MQTT. Having General Purpose Input/Output (GPIO) and USB interfaces, it can be flexibly extended with connected devices, include those capable of collecting variety of metering and telemetry information. Possibility to install and run adopted full-fledged Linux system on it provides required runtime and environment for execution of shipped and custom developed programs (written in Python, Java, Ruby, Perl, C/C++ and some other languages). I used the official distributive of Raspbian OS (Linux Debian based build for Raspberry Pi) on the Raspberry Pi device and Python for developing required script;
  • Raspberry Pi Sense HAT (Hardware Attached on Top) – sensor block, which is an expansion board mounted on Raspberry Pi main board;
  • RabbitMQ (RabbitMQ – Messaging that just works) – a message broker. RabbiMQ is originally an AMQP message broker that became popular in heterogeneous IT landscapes and is effectively used in enterprise application integration field and message queuing and exchange between IT infrastructure components, where key aspects are high performance and throughput, scalability and high availability, and technological diversity of integrated message producers and consumers;
  • ELK stack – Elasticsearch, Logstash, Kibana (Elastic · Revealing Insights from Data (Formerly Elasticsearch)). Thanks to its open architecture, large amount of available plugins and flexibility in customization, ELK stack is used in many companies for log data aggregation and processing (Logstash), indexing and search (Elasticsearch), and visualization (Kibana).


One of key aspects of this prototype is usage of Message Queue Telemetry Transport (MQTT) protocol – one of de-facto protocols in machine-to-machine (M2M) connectivity, which becomes popular with evolution and popularization of IoT concepts. MQTT protocol can be thought as a lightweight and simplified analogue of Advanced Message Queuing Protocol (AMQP), that is also based on publish/subscribe pattern, but provides much smaller network footprint and can be utilized by small devices.


Few notes regarding a prototype:

  • Raspberry Pi was connected to a Wi-Fi network to simplify access to it, RabbitMQ and ELK tools were installed in a virtualized environment on another machines. Eeven though technically they could have been installed on Raspberry Pi itself, my intention was to logically decouple “IoT” part of a prototype from central components like message broker and data collection, processing and visualization tools. Having done so, I could achieve high flexibility and portability of a prototype, as well as constant remote access to all its components literally from any location with Internet access;
  • Data indexing and visualization tools were used in sake of demonstrating solution end-to-end, but they are not really focus of this blog;
  • In Python script and Logstash configuration, I used some techniques that simplified process of errors analysis (like printing MQTT callbacks output to console) – tactic which is relevant for prototyping, but which shall be disabled if the solution is to be brought to production or applied to volume intensive processing;
  • To keep Python script simple in a prototype, MQTT broker connection parameters and user credentials were hard coded in a script – in real life scenarios, such properties shall be decoupled from a script and stored in dedicated configuration files / secure areas;
  • User credentials are intentionally replaced with placeholders in source code and configuration files listings.


Now let us go through major aspects of prototype configuration and see it in action.



Raspberry Pi: Sensor data collection and publishing to MQTT broker

A Python script was developed in order to collect telemetry information from sensor block (in a prototype, to keep it simple, only metrics for temperature, humidity and pressure were collected) and publish it to a topic registered at message broker via MQTT protocol. Following Python libraries were used in a script:


Below is listing of script source code:


import datetime
import json
import paho.mqtt.client
import sense_hat
import time
sleepTime = 1
# MQTT details
mqttDeviceId = "Raspberry-Pi:Prototype"
mqttBrokerHost = ""
mqttBrokerPort = 1883
mqttUser = "<user name>"
mqttPassword = "<password>"
mqttTelemetryTopic = "RPi.Data"
sense = sense_hat.SenseHat()
# Callback methods
def on_connect(client, userdata, flags, rc):
    if rc == 0:
          print("Connected to MQTT broker (RC: %s)" % rc)
          print("Connection to MQTT broker failed (RC: %s)" % rc)
def on_log(client, userdata, level, buf):
def on_publish(client, userdata, mid):
    print("Data published (Mid: %s)" % mid)
def on_disconnect(client, userdata, rc):
    if rc != 0:
          print("Unexpected disconnect")
    print("Disconnected from MQTT broker")
mqttClient = paho.mqtt.client.Client()
mqttClient.username_pw_set(mqttUser, mqttPassword)
# Register callbacks
mqttClient.on_connect = on_connect
mqttClient.on_log = on_log
mqttClient.on_publish = on_publish
mqttClient.on_disconnnect = on_disconnect
# Connect to MQTT broker
mqttClient.connect(mqttBrokerHost, mqttBrokerPort, 60)
# Collect telemetry information from Sense HAT and publish to MQTT broker in JSON format
while True:
    telemetryData = {}
    telemetryData["DeviceId"] = mqttDeviceId
    telemetryData["Timestamp"] = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    telemetryData["Temperature"] = str(round(sense.get_temperature(), 2))
    telemetryData["Humidity"] = str(round(sense.get_humidity(), 2))
    telemetryData["Pressure"] = str(round(sense.get_pressure(), 2))
    telemetryDataJson = json.dumps(telemetryData)
    mqttClient.publish(mqttTelemetryTopic, telemetryDataJson, 1)


While usage of Sense HAT APIs is out of scope of this blog and is well documented in official documentation, there are several key points of MQTT protocol usage, which I would like to draw attention to. Even though MQTT is based on principles of publish/subscribe pattern – similarly to other message queuing techniques which are common in enterprise integration world (like JMS API or AMQP standard), there are several key differences from those that shall be highlighted:

  • MQTT protocol implements session handling mechanisms, but they are much more lightweight in comparison to those used in enterprise level message queuing standards. After connection is established to a broker, client can start publishing data or consuming it by sending or accepting respective packets;
  • MQTT protocol can be effectively used in infrastructures where network communication between client and broker is likely to be interrupted (for example, when MQTT client doesn’t have stable network connection, which can result abnormal connection terminations). Reconnection attempts can be handled by client – that is an embedded functionality provided by corresponding MQTT library APIs;
  • Data exchange between client and broker is based on transmission of series of messages called control packets. Complete list of supported control packets, their structure and sequence is defined in MQTT protocol specification. Control packets are used for client/broker communication management, payload data transmission and acknowledgements. Control packets (both header and payload parts) are much smaller in size in comparison to those normally used in enterprise level message oriented integration. MQTT protocol is not intended to be used for transmission of large messages, but is rather used for exchange of large amount of very small messages;
  • MQTT protocol actively uses callback pattern. Client can implement custom logic for handling control packets of particular types that are received from broker. Corresponding callback methods are registered for a MQTT client object before connection request is sent to a broker.


For those who are interested in details of MQTT protocol, I would encourage getting familiar with a MQTT protocol specification which can be found at MQTT and which contains extensive information about protocol details and control packet types that can be exchanged between MQTT client and broker.

RabbitMQ: MQTT and AMQP message brokering

RabbitMQ is a core integration component in the entire prototype: it acts as a bridge between MQTT and AMQP message queuing by providing MQTT broker services for Raspberry Pi (which publishes payloads containing sensor data to corresponding topic) and AMQP broker services for Logstash (which consumes sensor data in a form of log events from corresponding queue).

The first step is to enable MQTT support in RabbitMQ by installing a MQTT adapter plugin (RabbitMQ – RabbitMQ MQTT Adapter ). Standard shipment of RabbitMQ already contains this adapter, so we only need to enable it by running a command:

rabbitmq-plugins enable rabbitmq_mqtt

The next step is to adopt RabbitMQ configuration file if non-default values for MQTT brokering properties shall be assigned. Complete syntax is provided in official documentation referred above. In a prototype, the topic “RPi.Data” is used as a default topic to which messages coming from MQTT client (Raspberry Pi) will be published.

Finally, corresponding objects are created in RabbitMQ in a way as if it would have been used in one of usual AMQP based integration models:

RabbitMQ objects.png

  • “RPi.Data” is a topic to which Raspberry Pi will publish packets with telemetry data;
  • “RPi.Telemetry” is a queue from which Logstash will consume messages with telemetry data;
  • “RPi.Data” is a routing key that is used to identify queue “RPi.Telemetry” as a target for messages published to a topic “RPi.Data”. Routing key name has to match topic name here.

Logstash and Elasticsearch: Data processing from AMQP broker and indexing

Logstash is shipped with a standard input plugin for RabbitMQ which enables Logstash to consume messages from RabbitMQ queues. Here, we instruct Logstash to consume messages from a queue “RPi.Telemetry”.

After messages are polled from a RabbitMQ queue, Logstash performs minor conversions with retrieved data (most of them are related to data type conversion, so that it is possible to use corresponding field values as metrics when developing queries in Kibana on later steps).

After necessary conversions are done, Logstash will send messages to Elasticsearch using standard output plugin for Elasticsearch. In its turn, Elasticsearch performs data persistence and indexing so that we can consume this data and develop visualizations on top of it later on.

Below is Logstash configuration file that was used in a prototype:


input {
    rabbitmq {
          type => "rpi"
          host => ""
          user => "<user name>"
          password => "<password>"
          queue => "RPi.Telemetry"
          durable => true
          codec => "json"
filter {
    if [type] == "rpi" {
          date {
              match => [ "Timestamp", "yyyy-MM-dd HH:mm:ss.SSS" ]
              target => "Timestamp"
          mutate {
              convert => { "Temperature" => "float" }
              convert => { "Humidity" => "float" }
              convert => { "Pressure" => "float" }
    if "_grokparsefailure" in [tags] {
          drop { }
output {
    if [type] == "rpi" {
          stdout {
              codec => rubydebug
          elasticsearch {
              hosts => [ "" ]
              index => "rpi"


No configuration specific to a prototype was carried for Elasticsearch, so standard guidelines and configuration steps are applicable for it here.

Kibana: Sensor data visualization

Finally, after necessary data is passed to Elasticsearch and indexed by it, we are ready to visualize it. I used Kibana for these purposes and prepared a simple dashboard with diagrams reflecting historical changes of collected sensor data over time. The process of Kibana dashboard development follows standard steps that are common when using Kibana – namely:

  • Data discovery. Elasticsearch is used as data search engine. A search object was created and saved based on used Elasticsearch index;
  • Visualizations development. Line chart visualizations where used for individual metrics, complemented with pie chart and metric visualizations for temperature to display temperature diapasons distribution and minimum/maximum/average temperature values;
  • Dashboard development. Visualization objects prepared earlier, were placed on a single dashboard.


And here is an outlook at results visualized in a prepared Kibana dashboard after the prototype was left running for several hours:

Kibana dashboard.png

Python script output provides details about MQTT communication:

Python script output.png

For every published MQTT control packet containing telemetry data, Logstash output contains corresponding processed entry consumed from RabbitMQ and sent to Elasticsearch:

Logstash output.png

Brief look into RabbitMQ monitoring provides view on MQTT publish / AMQP consume flow from message broker perspective:

RabbitMQ queue statistics.png

Statistics for the used index (“rpi”) in Elasticsearch evidences successful indexing of collected messages with telemetry data:

Elasticsearch index statistics.png

Presented telemetry data is near real time:

  • Sensor data is obtained and transmitted by Raspberry Pi every second (customizable in script);
  • Telemetry data presented in Kibana dashboard is auto-refreshed every 5 seconds (customizable);
  • Network latency is minimal since all components are located in the same sub-network. This is normally not a case in real life scenarios were sensors may be distributed across large area and access gateways and components of central infrastructure through various networks;
  • Processing time in RabbitMQ, Logstash and Elasticsearch is also minimal (all these components were designed for large volumes processing and can be scaled).

Assigned Tags

      Be the first to leave a comment
      You must be Logged on to comment or reply to a post.