2016/06/15
15 Jun, 2016

[Article] The Basics of MQTT and How WSO2 Products Support MQTT Protocol

  • Buddhima Wijeweera
  • Software Engineer - WSO2

Table of contents


Applies to

WSO2 Enterprise Service Bus Version 4.9.0 and above
WSO2 Developer Studio Version 3.6.0
WSO2 Message Broker Version 3.1.0

Introduction

A typical (and much sought after) IoT concept would be your air conditioner turning off automatically when you wake up, an automatic preparation of a cup of coffee, and just when you’re about to hit the road, information about the current weather and a traffic update. The real implementation of IoT is leveraged by IPv6 and lightweight protocols like message queue telemetry transport (MQTT). MQTT was initially developed by Andy Stanford-Clark and Arlen Nipper to inspect oil pipelines through a desert. Therefore, the aim of this protocol was to use bandwidth efficiently and consume minimum power. MQTT uses publish-subscribe architecture as opposed to request-response architecture used by HTTP.

MQTT-Broker is the central communication point of the MQTT architecture. The clients publishing data includes a Topic into the message. The Topic contains the routing information for the broker. The client who wants to receive messages can subscribe to Topics and receive messages. The architecture keeps data producers and data receivers independent and provides a highly scalable architecture.

The receiving client won’t need to pull data from the broker; instead, the receiving client will subscribe to the broker for topic(s) and the broker will pushes message that are relevant to that topic. The publishing client will publish data for the relevant topic and the broker will maintain information about topics. Topics in this world is merely a string. Therefore, this string contains slashes to make it hierarchical. The subscribing clients can use wildcard characters (# sign and + sign) to subscribe to more than a single topic.


Protocol features

  • The publish/subscribe message pattern to provide one-­to-­many message distribution and decoupling of applications.
  • A messaging transport that is agnostic to the content of the payload.
  • The use of TCP/IP to provide basic network connectivity.
  • Three qualities of service for message delivery:
    • "At most once", where messages are delivered according to the best efforts of the underlying TCP/IP network. Message loss or duplication can occur. This level could be used, for example, with ambient sensor data where it does not matter if an individual reading is lost as the next one will be published soon after.
    • "At least once", where messages are assured to arrive, but duplicates may occur.
    • "Exactly once", where messages are assured to arrive exactly once. This level could be used, for example, with billing systems where duplicate or lost messages could lead to incorrect charges being applied.
  • A small transport overhead (the fixed length header is just 2 bytes), and protocol exchanges minimized to reduce network traffic.
  • A mechanism to notify interested parties to an abnormal disconnection of a client using the Last Will and Testament feature.

Connecting with the outside world

No protocol can work standalone in the technology world. A protocol will definitely need to exchange data with other protocols in order to provide a more facilitated service. The same principle applies to MQTT as well. This means there should be a way in which MQTT-based devices send/retrieve data using other protocols like HTTP, HL7, FIX, SMTP, etc. The solution for this problem is to use a component that can exchange messaging protocols. In the enterprise software application space this component is known as an enterprise service bus (ESB). Selecting a suitable ESB for a solution should be done carefully when crafting a solution. We have selected WSO2 Enterprise Service Bus (WSO2 ESB) to demonstrate how the MQTT protocol can be integrated. You can refer WSO2 ESB product page to see more details on benefits you can obtain by incorporating it to your solution. Moreover, you can select a MQTT broker as you wish. For this article, we have selected Mosquitto broker as the MQTT broker.


MQTT support in WSO2 ESB

WSO2 ESB can play both the MQTT publishing client and receiving client role. When the ESB acts as a publishing client, it can publish messages to a given topic in a broker. On the other hand, when acting as a receiving client, the ESB can subscribe to a given topic in a broker and receive messages pushed by the broker.


Configure WSO2 ESB to receive messages via MQTT

WSO2 ESB provides 2 ways to subscribe to a MQTT broker. They are as follows:

  • MQTT inbound endpoint (with WSO2 ESB 4.9.0 and later)
  • MQTT transport from Axis2 (conventional way)

Configuring MQTT inbound endpoint

  1. Install Mosquitto broker and start the broker.

    Official page: https://mosquitto.org/

    You can install Mosquitto and client by following the steps below:
    Adding the repository: sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
    Update: sudo apt-get update
    Install: sudo apt-get install mosquitto
    Install client: sudo apt-get install mosquitto-clients

    Hint: Usually Mosquitto server startup when the operating system starts. To avoid that you can comment out start on net-device-up line in /etc/init/mosquitto.conf file.
    Start Mosquitto broker: type “mosquitto” in terminal. It will display a log as follows:

  2. Start WSO2 ESB and login to the management console
    Download WSO2 ESB 4.9.0 (or later version) from the product page and extract the zip file.
    Download axis2-transport-mqtt-1.0.0.jar and mqtt-client-0.4.0.jar files from this link.
    Move axis2-transport-mqtt-1.0.0.jar and mqtt-client-0.4.0.jar file to <ESB_HOME>/repository/components/lib folder.
    Go to <ESB_HOME>/bin folder and execute wso2server.sh (wso2server.bat for Windows) file.
    At the end of execution through terminal, you’ll see the Management Console URL.

  3. Create an MQTT inbound endpoint

    Login to the management console.
    Click on the Inbound Endpoint link on the left panel.
    Click on Add Inbound Endpoint link and give a name and select the type as MQTT.
    Click on Next button and you’ll get a configuration window as follow:
    The following table explains the meaning of each parameter:
    Parameter Name Description Value
    Sequence Once inbound endpoint receives a message from the broker, the message gets injected to the sequence mentioned here TestIn (later we’ll see how this sequence is created)
    Error sequence Error sequence which the message should be injected in case of issue with processing at inbound endpoint Fault (this is the default error sequence that comes with the ESB)
    Suspend This is used to pause the execution of this inbound endpoint. If this is “true” the inbound endpoint will not accept messages pushed by the broker False (to make inbound endpoint ready to accept messages from the broker)
    Sequential This defines how messages should be injected to the sequence mentioned. Giving “false” will inject messages in parallel True (inject messages one after another)
    Coordination This indicates whether coordination should be done in a cluster of ESB nodes. If true, this inbound endpoint will run only on the leader node of the cluster. False (since still we are using a single node for this experiment)
    mqtt.connection.factory Name of the MQTT connection factory mqttConFactory (this is the out-of-box connection factory. This can be changed if there’s a custom implementation)
    mqtt.server.host.name Hostname of the MQTT message broker localhost (Since Mosquitto server setup on the same machine)
    mqtt.server.port Port of the Mosquitto server that clients can connect to. 1883 (This is the default port of MQTT Broker)
    mqtt.topic.name Topic name that ESB should subscribe to esb.test
    mqtt.client.id This is used to identify ESB from broker side client-id-1234 (this can be changed as desired)
    content.type Content type of the message receiving from Broker. XML or JSON message formats are supported by WSO2 ESB JSON
    Once the inbound endpoint is set up, the Mosquitto broker will show a new connection.

  4. Create a sequence for inbound endpoint

    Click on Sequences on left panel.
    Then click on Add Sequence link.
    Put the name as “TestIn” and construct a sequence as follows:
    In this configuration, we have used 2 mediators, Log Mediator and Drop Mediator.
    Log Mediator will log the full message (level is set to full).
    Drop Mediator will discard the message.

    The message receiving to Inbound Endpoint will inject the message to this TestIn sequence. This sequence will log the message in the terminal and drop it.

  5. Publish messages using Mosquitto client

    Use Mosquitto publishing client for publishing data via terminal.
    The following command will send a JSON payload to MQTT Broker, and eventually be received by the Inbound Endpoint and will be injected into the sequence:
    mosquitto_pub -h localhost -p 1883 -t esb.test  -m {"company":"WSO2"}
    
    If the execution is carried out perfectly, you will get a log message in the ESB terminal as follows:
    In the Payload section of the message, you’ll see the JSON message you sent.

Configuring MQTT transport from Axis2

This is an alternative (actually the conventional) way of configuring MQTT Transport. This configuration was the only way to setup MQTT before the WSO2 ESB 4.9.0 version. We will explain this method for the sake of completion, but please note that you won’t get the additional benefits of MQTT Inbound Endpoint (dynamic configuration changes, coordination) by this method.

  1. Start the mosquitto server

    You need to install mosquitto broker (if you haven’t done already).
    Use “mosquitto” command to start the broker.

  2. Change axis2 configuration

    Open <ESB_HOME>/repository/conf/axis2/axis2.xml file.
    Enable MQTT Listener configuration as follows.

  3. Start WSO2 ESB and login to the management console

    Download axis2-transport-mqtt-1.0.0.jar and mqtt-client-0.4.0.jar files from this link.
    Move axis2-transport-mqtt-1.0.0.jar & mqtt-client-0.4.0.jar file to <ESB_HOME>/repository/components/lib folder.
    Go to <ESB_HOME>/bin folder and execute wso2server.sh (wso2server.bat for Windows) file.
    At the end of execution through the terminal, you’ll see the Management Console URL.

  4. Create a proxy-service

    Unlike the previous case, here you can create a proxy-service that listens to the broker.
    So, if you have not used WSO2 ESB before, we recommend you refer to the following articles to get an idea about creating a proxy service.
    Adding a Proxy Service Custom Proxy Template

    The following should be the source view after creating the proxy service:
    Special values:
    Transport should include mqtt to listen to the MQTT Broker
    Messages receiving from broker will be injected to in-sequence. Similar to the sequence of Inbound Endpoint, this will also log and drop the messages.

Configure WSO2 ESB to publish messages via MQTT

There’s only one way to configure WSO2 ESB to act as the publishing client.

  1. Start the mosquitto server

    You need to install mosquitto broker (if you haven’t done already)
    Use “mosquitto” command to start the broker.

  2. Change axis2 configuration

    Open <ESB_HOME>/repository/conf/axis2/axis2.xml file.
    Enable MQTT Sender configuration as follows.

  3. Start WSO2 ESB and login to the management console

    Download axis2-transport-mqtt-1.0.0.jar & mqtt-client-0.4.0.jar files from this link.
    Move axis2-transport-mqtt-1.0.0.jar & mqtt-client-0.4.0.jar file to <ESB_HOME>/repository/components/lib folder.
    Go to the <ESB_HOME>/bin folder and execute wso2server.sh (wso2server.bat for Windows) file. At the end of execution through the terminal, you’ll see the Management Console URL.

  4. Create an Endpoint

    In the management console, you can navigate to the Endpoints section.
    In Add Endpoint tab click on Address Endpoint link to create an endpoint.
    As the URL of the endpoint, you can provide something like follows:
    mqtt:/PublishingProxy?mqtt.server.host.name=localhost&mqtt.server.port=1883&mqtt.client.id=esb.test.sender&mqtt.topic.name=esb.test&mqtt.subscription.qos=0&mqtt.blocking.sender=true
    
    The following table explains the meaning of some parameters:
    Parameter Name Description Value
    mqtt.server.host.name Hostname of the MQTT message broker localhost (since Mosquitto server setup on the same machine)
    mqtt.server.port Port of the Mosquitto server which clients can connect to. 1883 (this is the default port of MQTT Broker)
    mqtt.topic.name Topic name that ESB should be subscribe to esb.test
    mqtt.client.id This is used to identify ESB from broker side esb.test.sender (this can be changed on your desire)
    You can refer to this endpoint created from other synapse artifacts like proxy service, API, or sequence.

Quality of service

Quality of service is a very common topic that you’ll see in any article about the MQTT protocol. There are 3 types of QoS provided by MQTT. WSO2 ESB also supports all 3 types of QoS for sending and receiving MQTT messages. Here, we will briefly describe the QoS types for the sake of completion.

  • QoS 0 (at most once) - In this level, the message is sent only once, and receiver might/ might not receive the message. It’s known as the “fire and forget” strategy.
  • QoS 1 (at least once) - In this level, the sender stores the message and tries to redeliver until the sender gets a PUBACK (acknowledgement message) from the receiving side.
  • QoS 2 (only once) - In this level both the sender and receiving party guarantee that no duplicate messages go beyond the receiving side. Since this requires bidirectional communication, it is considered to be the safest and slowest quality of service.

Security features

Though MQTT is a lightweight transmission protocol, it is built on top of well-established protocols. Therefore, any state-of-art security that apply to the underlying transport layers can be applied to MQTT as well. Therefore, security in MQTT can be achieved by considering 3 layers in the network protocol stack.

Network Level - In this level, VPN connections can be established with the client and broker to provide more secure communication.

Transport Level - To preserve confidentiality, SSL/TLS mechanisms can be used to send encrypted messages. This will provide a proven secured way of transmitting messages where nobody can read.

Application Level - By using device credentials, username/password can be used to authenticate application level communication between devices.


Smart weather notification system

Smart weather notification system is an imaginary scenario where you’ll get weather information of a specific place. We can setup a GPS tracker that emits GPS information (longitude and latitude) about a place. Those geolocation data are published to a MQTT broker. On the other side, Weather Status Monitor is subscribed to a MQTT Broker topic to receive weather information.

In the middle we have placed WSO2 ESB that calls OpenWeatherMap API and gets the weather report of a given location.


Creating the project

For this purpose we have used WSO2 Dev Studio, which provides tooling support for constructing ESB configurations.

The above screenshot depicts the configuration of processing sequence that processes the incoming geo locations and gives out a weather report. To connect with OpenWeatherMaps API, we have used OpenWeatherMaps connector in WSO2 Connector Store. Refer to the following links for more information:

Smart Weather Project artifacts: https://svn.wso2.org/repos/wso2/people/buddhima/smart-weather-project/


Preparing WSO2 ESB

  1. Download WSO2 ESB 4.9.0 (or a later version) from the product page, and extract the zip file.
  2. Download axis2-transport-mqtt-1.0.0.jar & mqtt-client-0.4.0.jar files from this link.
  3. Move axis2-transport-mqtt-1.0.0.jar & mqtt-client-0.4.0.jar file to <ESB_HOME>/repository/components/lib folder.
  4. Go to <ESB_HOME>/bin folder and execute wso2server.sh (wso2server.bat for Windows) file.
  5. At the end of execution through terminal, you’ll see the Management Console URL.
  6. Upload the OpenWeatherMap connector and SmartWeatherCApp_1.0.0.car file in this location.

After successful deployment, WSO2 ESB will be ready for the project.


Starting the mosquitto server

  1. You need to install mosquitto broker (if you haven’t done already).
  2. Use “mosquitto” command to start the broker.
  3. Mocking GPS Tracker and Weather Status Monitor
    For the purpose of this demonstration, we’re going to mock GPS Tracker and Weather Status Monitor, which are MQTT message publisher and MQTT message receiver, respectively.
    So mostquitto_sub (subscribing client provided by mosquitto) need to subscribe to weather/report topic that mocks Weather Status Monitor.
    mosquitto_sub -h localhost -p 1883 -t weather/report
    
  4. Next, we’re using mosquitto_pub client for publishing geo location data, which simulates the GPS Tracker. For this, the client needs to publish data to weather/location topic.
    mosquitto_pub -h localhost -p 1883 -t weather/location  -m "{"lon":"81","lat":"5.5"}"
    

On the left-side you can see the message sent by the GPS Tracker, and on the right-side you can see the published message to the Weather Status Monitor. In this case the Weather Status Monitor can display the weather report according to the message received through the broker. WSO2 ESB is doing broker-to-broker (or same broker 2 different topics) communication in this case.


Troubleshooting system

Tracing wire level data transmission

To examine wire-level message exchange in MQTT protocol, external tool can be used. One such handy tool is Wireshark. Wireshark provides a graphical interface to inspect the message. Once you capture packets, you can use filter to filter out “MQTT” related messages.

Using MQTT Filter


MQTT Messages


View MQTT Message in-detail


Platform support for MQTT

So far we have discussed how WSO2 ESB supports the MQTT protocol and how to implement a use case using WSO2 ESB. WSO2 products are not just individual products, but series of capabilities that share a common platform known as Carbon. On top of the Carbon platform, WSO2 has introduced WSO2 Message Broker (WSO2 MB), which can act as the MQTT Broker.


MQTT support in WSO2 MB

In the above examples we have been using mosquitto as the MQTT broker. But if you are seriously considering the implementation of a use case based on the MQTT protocol, you’ll need get more scalability requirements. For that, WSO2 has taken steps to cater to that requirement through its message broker product. Refer to the following links to do some tryouts with MQTT transport and WSO2 MB.

WSO2 MB product page: https://wso2.com/products/message-broker/
MQTT Transport configuration in WSO2 MB: https://docs.wso2.com/display/MB310/Message+Queueing+and+Telemetry+Transport
Running sample MQTT sample with WSO2 MB: https://docs.wso2.com/display/MB310/Simple+MQTT+Client

Using WSO2 MB as a scalable MQTT Broker: https://wso2.com/library/articles/2015/10/article-using-wso2-message-broker-as-a-scalable-mqtt-broker/

Blog on MQTT architecture of WSO2 MB: https://techexplosives-pamod.blogspot.com/2014/05/mqtt-transport-architecture-wso2-mb-3x.html

With the WSO2 MB, you can do the same experiment. As mentioned in the document, we have managed to do some tweaks to SimpleMQTT client code and conduct the same experiment. The code can be found at here:

https://svn.wso2.org/repos/wso2/people/buddhima/MqttWeatherClient.zip

WSO2 MB working as MQTT broker and default port is 1883. This can be offset by WSO2 MB. Therefore, to make things easier, start WSO2 MB without a port offset and WSO2 ESB with offset of 2. Then once you build (with Maven) the customized MQTT client, it emits the same request that the publishing client did previously and the request is processed with WSO2 ESB.


Analyzing MQTT data

The WSO2 platform also provides real-time and batch processing analyts tools. Those products are:

WSO2 Complex Event Processor: https://wso2.com/products/complex-event-processor/

WSO2 Data Analytic Server: https://wso2.com/products/data-analytics-server/

You can configure those products to receive MQTT messages. Then upon those received messages you can write powerful queries to execute and extract information. Refer to the following for further details:

Configuring to receive MQTT Events: https://docs.wso2.com/display/CEP410/MQTT+Event+Receiver

Sample use-case for MQTT: https://docs.wso2.com/display/CEP410/Sample+0016+-+Receiving+JSON+Events+via+MQTT+Transport

Analyzing data using WSO2 Complex Event Processor: https://docs.wso2.com/display/CEP410/Analyzing++Data

Analyzing data using WSO2 Data Analytic Server: https://docs.wso2.com/display/DAS301/Analyzing+Data


Conclusion

The objective of this article is to provide you comprehensive guidance on the MQTT protocol and discuss to what extent the WSO2 platform supports MQTT. Specifically, we’ve focused on WSO2 ESB, which is the integration backbone of the WSO2 Platform, to demonstrate how you can configure MQTT Transport. In addition, we’ve also provided guidance and resources on how to do a small experiment with MQTT called “smart weather notification system”. In conclusion, we discussed other means of support for the MQTT protocol in the WSO2 platform; it specifically refers to MQTT broker architecturing and MQTT data analyzing for more advanced use cases.


References

 

About Author

  • Buddhima Wijeweera
  • Software Engineer
  • WSO2