[Tutorial] Support for MQTT with WSO2 Business Activity Monitor 2.5.0

  • By Shavantha Weerasinghe
  • 17 Feb, 2015
Archived Content
We do not provide support for WSO2 Business Activity Monitor since it is the predecessor of WSO2 Data Analytics Server (WSO2 DAS). For more information visit the WSO2 DAS product page or watch the Product Release Webinar.

This tutorial will demonstrate the support WSO2 Business Activity Monitor (BAM) 2.5.0 provides for MQTT - a machine-to-machine connectivity protocol - through the use of open source MQTT clients and brokers. The tutorial has been written to give readers a basic understanding of how to get started using MQTT with WSO2 BAM.


This sample demonstrates how to configure the WSO2 Business Activity Monitor 2.5.0 (BAM) to capture incoming MQTT events, store them within the Cassandra and view them from the dashboard. This tutorial will show you how to establish a connection between a MQTT client and a MQTT broker in order to publish events to the WSO2 BAM which will then listen to the incoming events through the input adaptors and store those events in the Cassandra.


To try out the example discussed in this tutorial we will be using the following;

  • Download the WSO2 Business Activity Monitor 2.5.0 with embedded Cassandra here
  • Download MQTT Spy an open source MQTT client here
  • Download HiveMQ (version 2.1.0) a MQTT broker here
  • Download the mqtt-client-0.4.0.jar here
  • The commands used in this tutorial assume that the operating system used is a Linux based operating system

Setting Up the Sample

Starting the MQTT broker

First we need to start the MQTT broker. The broker is responsible for validation, transformation routing and mediate communication amongst applications to interested clients based on the topic of a message. For this tutorial, we have downloaded HiveMQ which can be used for the scope of this example. For this use case we can use the broker without any changes to the configurations so once the broker is downloaded navigate to the path hivemq-2.1.0/bin and issue the command ./run.sh. This will result in the broker starting up with a generated message as shown below;

2015-01-21 11:27:25,230 INFO  - HiveMQ home directory: /home/shavantha/projects/software/MQTT/broker/hivemq-2.1.0
2015-01-21 11:27:25,236 INFO  - Starting HiveMQ Server
2015-01-21 11:27:27,029 WARN  - No license file found. Using free personal licensing with restrictions to 25 connections.
2015-01-21 11:27:27,130 INFO  - Activating statistics callbacks with an interval of 60 seconds
2015-01-21 11:27:27,130 INFO  - Activating $SYS topics with an interval of 60 seconds
2015-01-21 11:27:27,393 INFO  - Starting on all interfaces and port 1883
2015-01-21 11:27:27,401 INFO  - Started HiveMQ 2.1.0 in 2172ms
2015-01-21 11:27:27,864 INFO  - A new HiveMQ Version (2.2.0) is available. Visit https://www.hivemq.com/ for more details.

Note: This tutorial uses HiveMQ 2.1.0 version.

Configuring and running WSO2 BAM 2.5.0

Once the broker is configured and up and running, we can configure the WSO2 Business Activity Monitor to listen to the broker. To achieve this, first, add the mqtt-client-0.4.0.jar to the BAM_HOME/repository/components/lib directory. This is a prerequisite for configuring the BAM to listen and capture MQTT events.

Next, start the WSO2 BAM instance by navigating to BAM_HOME/bin and typing sh wso2server.sh command. Once the product is up and running, the first task is to upload the toolbox that will generate the stream given below. The event stream is where all the processing happens based on how it’s defined.

Note: Once created the stream will be saved within the path /_system/governance/StreamDefinitions. The stream definition for this tutorial is shown below;

  "name": "mqtt_stats",
  "version": "1.0.0",
  "nickName": "Statistics",
  "description": "mqtt statistics",
  "metaData": [
  	"name": "request_url",
  	"type": "STRING"
  	"name": "remote_address",
  	"type": "STRING"
  	"name": "content_type",
  	"type": "STRING"
  	"name": "user_agent",
  	"type": "STRING"
  	"name": "host",
  	"type": "STRING"
  	"name": "referer",
  	"type": "STRING"
  "payloadData": [
  	"name": "service_name",
  	"type": "STRING"
  	"name": "operation_name",
  	"type": "STRING"
  	"name": "timestamp",
  	"type": "LONG"
  	"name": "response_time",
  	"type": "LONG"
  	"name": "request_count",
  	"type": "INT"
  	"name": "response_count",
  	"type": "INT"
  	"name": "fault_count",
  	"type": "INT"

Next, we need to configure the business activity monitor to receive the events. For this we will use an input event adapter that handles MQTT events. It receives MQTT events over a TCP connection. To configure the input event adaptor, navigate to the configure menu tab and click on the option to create a new input adaptor. This will open up the window shown below;

Figure 01

In this screen the main input details are;

  • Event Adaptor Name: a unique name to identify the input adaptor
  • Event Adaptor Type: mqtt
  • Broker URL: specify the URL on which the MQTT broker is started
  • Username/Password: user credentials to access the broker
  • Keep-A-Live: the number of milliseconds to keep the connection alive

The creation of the input adaptor will result in the generation of an xml file with the details below;

Figure 02

Note: The xml file is available within wso2bam-2.5.0/repository/deployment/server/inputeventadaptors

As shown from the above configuration, the broker URL is the same as what the MQTT broker (HiveMQ) was started on to listen to events that are sent to that URL.

Create an event builder once the input adaptor is produced. An event builder converts different input event formats into a uniform format that is supported by the event consumer.

To create an event builder, click on the option “Receive from External Event Stream(via Event Builder)” under the InFlow option of the created stream. As shown below, within the event builder we specify a topic that will be given within the MQTT client to publish the payload.

Figure 03

In this screen the main input details are;

  • Event Builder Name: a unique name to identify the event builder
  • Input Adaptor Name: the MQTT input adapter should be selected from this list
  • Topic: a unique topic name
  • Client ID: a unique name for client ID
  • Input Mapping type: since we are generating the payload in xml format we select xml

The creation of the event builder will result in the generation of an xml file with the details shown below;

Figure 04

Note: the xml file is available within wso2bam-2.5.0/repository/deployment/server/eventbuilders

Starting the MQTT client

Now our configuration for the WSO2 BAM 2.5.0 is complete. Next, we need to start an MQTT client to publish the payload data. For the client in this tutorial we will be using MQTT Spy - an open source MQTT client. To start the client, execute the command java -jar mqtt-spy-0.1.5-jar-with-dependencies.jar*. Once the client is loaded we need to perform two initial configurations to enable the client to generate events.

  1. Connectivity: in this step we specify the server URL as well as the client ID that we will be assigning the MQTT client to
  2. Authentication: in this step we will specify the username / password combinations to obtain authenticated access

Figure 05

Note: For the client used in this tutorial you need to have Java 1.8.* installed.

Running the Sample

What has been explained above are the basic steps needed to setup the initial configurations for enabling MQTT transport for the BAM. Next, we will see how we can perform an easy test to determine whether the configured flow works. To achieve this we need to perform the below tasks;

  1. Select the activated connection from the client by clicking on the green tab
  2. Specify the topic and pass a payload as given below via the MQTT client

In this example the topic will be mqtt_topic and the payload data will be as shown below;

Figure 06


Note: the payload can be found under the created event stream in the business activity monitor.

Once events are published from the client we can verify whether our flow has worked successfully by accessing the Cassandra explorer and checking whether a record with the stream name exists under EVENT_KS. As shown in the below diagram, when we generate events from the client an event keystore is created with the name mqtt_stats within Cassandra.

Figure 07

Once we click on the event_ks named mqtt_stats the number of events captured will be listed as shown below;

Figure 08

To view the messages (events) stored on the Cassandra data store, click on the Message Console option under dashboards to view the events as shown below. This view can be useful if the users do not have access to the Cassandra data store.

Figure 09

Figure 10

How does it work?

In the previous section we have discussed how to test the WSO2 Business Activity Monitor 2.5.0 with the MQTT flow. In this section lets see what happens when the test is conducted.

As explained above, when we pass events as an xml payload through the MQTT client, the message is delivered via the MQTT broker to the business activity monitor based on the topic of a message. If the messages are generated in xml format and if the messages match the stream definition and the topic name, they are accepted and stored in the Cassandra data store.


This tutorial discussed how to configure the WSO2 Business Activity Monitor version 2.5.0 to capture and process MQTT events. The tutorial initially focused on configurations that we had to perform to get the client, the broker and the WSO2 BAM ready and then looked at how we can test the flow to understand how MQTT is supported by the WSO2 Business Activity Monitor.

All artefacts for this tutorial are available for download here.


[1] https://wso2.com/products/business-activity-monitor/

[2] https://code.google.com/p/mqtt-spy/