Asynchronous Message-Based Communication with WSO2 Enterprise Integrator

  • By Hasitha Abeykoon
  • 18 Dec, 2019

Acronyms

SMS - Short Message Service

JMS - Java message service

AMQP - Advanced Message Queuing Protocol

HTTP - HyperText Transfer Protocol

JNDI - Java Naming Directory Interface

Amazon SQS - Simple Queue Service

HA - High Availability

Introduction

In the previous article, we discussed what asynchronous messaging is and what its use cases are. It is a popular messaging pattern not only within middleware systems but also at edge client systems. This term is also known as message-oriented middleware (MOM). However, we need an integration tool to perform the protocol conversions and integrate different systems in an asynchronous manner, so that the business logic components do not need to bear the complexities related to messaging and associated protocols. In this article, we discuss WSO2 Enterprise Integrator, one such integration platform. Here’s an overview of its integration capabilities regarding asynchronous messaging. 

WSO2 Enterprise Integrator as the integration tool

WSO2 Enterprise Integrator is an integration tool developed by WSO2. It is an agile integration platform for quick, iterative integration of any application, data, or system. It has rich support for asynchronous messaging. With the capability to integrate with a number of open-source and commercial message brokers and messaging platforms, the solution supports all messaging patterns, protocol conversion capabilities, and much more.

Figure 1: WSO2 EI capabilities

Let’s take a look at the above capabilities. 

WSO2 Enterprise Integrator’s capabilities

The following capabilities are available in WSO2 Enterprise Integrator’s integrator profile and WSO2 Micro Integrator runtimes. To develop deployable artifacts (Carbon Applications) for both runtimes, you would need to use WSO2 Integration Studio. To use “connectors” described in the sections below, please download them from the WSO2 connector store using Integration Studio itself and import them to the project. More details can be found here.

After development is done, the project can be compiled into a Carbon Application (CAR file) and deployed into WSO2 Enterprise Integrator or Micro Integrator runtimes.

Connect to any JMS-supported broker

Java Message Service (JMS) is an API provided by Java to standardize asynchronous messaging using Java. As WSO2 Enterprise Integrator runs on Java, it is designed to use JMS. If you need to connect to a JMS broker (i.e., ActiveMQ/Artemis), all you need to do is import the “client library” jar file, which comes with the broker, and make a few configurations. The platform provides support for JMS in several flavors.

Axis2 JMS transport

Axis2 is the basis of many protocol-level transports in WSO2 Enterprise Integrator. JMS is implemented as one of the Axis2 transports.

This transport as two sections: JMSTransportReceiver and JMSTransportSender. JMSTransportReceiver is used to poll messages from a JMS provider (queue or topic), making WSO2 Enterprise Integrator/Micro Integrator act as a JMS message receiver. JMSTransportSender is used to publish messages to a JMS provider, making the solution act as a JMS message producer.

One important feature to mention is that it supports the Java Naming and Directory Interface (JNDI). It is also capable of communicating over SSL or TLS. Transaction support provided by JMS is exposed by Axis2 JMS transport.

Please refer here for information on JMS transport and its features.

Inbound JMS transport

WSO2 Enterprise Integrator has an Axis2-independent implementation to receive JMS messages. This is known as Inbound JMS transport. It can be configured to poll messages from a queue/topic of a JMS provider and inject the message into a mediation logic. This mediation logic can be any complex logic (i.e., invoke an API with the message, transform a message into a CSV file, and store the message in a remote FTP server).

Please refer to Inbound JMS transport and its features here.

In synopsis, you can use either JMSTransportReceiver or Inbound JMS transport to poll messages into integration.

JMS connector

This is an alternative to JMSTransportSender Axis2 based implementation. WSO2 Enterprise Integrator has a connector store with 100+ connectors that enable the solution to communicate with on-premises or cloud systems. JMS is one such connector. As stated at the beginning of this section, you can import the JMS connector to the mediation project using Integration Studio.

Read more about the JMS connector here.

To publish a message from WSO2 Enterprise Integrator to a JMS broker, you can either use JMSTransportSender or JMS connector. The former is the usual choice as it does not need any external download or import.

IBM MQ integration

IBM provides two flavors of MQ. One is IBM Websphere MQ server, which is purely for messaging. Apart from that, the company also provides an MQ bus running on their application server. IBM provides a comparison between the two here.

  • In order to send and receive messages from Websphere MQ, we can use the JMS transport described above, as it is a JMS broker. IBM provides a way to generate JNDI bindings; this binding file needs to be configured at JMS transport. Please refer to more details here. You need to import IBM MQ-specific client libraries to WSO2 Enterprise IntegratorI. However, the solution re-packages these libraries to make them OSGI compliant.
  • To send and receive messages from the IBM WebSphere Application Server, the same JMS transport is used. However, we need to use a different initial context factory provided by IBM - ibm.websphere.naming.WsnInitialContextFactory. Read more about this capability here.

Microsoft MSMQ integration

Microsoft Message Queuing or MSMQ is a message queue implementation developed by Microsoft. As it is a different protocol, it has a separate Axis2-based transport to send and receive messages. Refer here to read more.

Oracle AQ integration

Oracle AQ is a messaging platform provided by Oracle. To send and receive messages asynchronously with this platform, you can use same JMS transport. The details are here.

Other JMS brokers supported

RabbitMQ integration

 RabbitMQ is popular broker with integration architects to implement asynchronous message flows. Hence, WSO2 Enterprise Integrator includes inbuilt support to perform messaging with RabbitMQ.

Axis2 RabbitMQ transport

Similar to JMS transport, RabbitMQ has Axis2-based RabbitMQListener and RabbitMQSender implementations. The former is used to receive messages from the RabbitMQ server and the latter is used to publish messages to the RabbitMQ server. This transport uses Advanced Message Queuing Protocol (AMQP) for the communication. It is also capable of making connections over SSL so that communication is encrypted.

You can reliably receive and process messages or reliably publish messages to the server using transaction support in RabbitMQ transport.

Read about Axis2 RabbitMQ transport at WSO2 documentation here.

RabbitMQ inbound

Similar to JMS inbound transport, RabbitMQ also has inbound transport. It can be used to poll messages from the RabbitMQ server and inject them for further mediation in WSO2 Enterprise Integrator. The RabbitMQ inbound transport has all the features available in Axis2 RabbitMQ transport.

Read about RabbitMQ inbound transport here.

Apache Kafka integration

Kafka is a distributed, partitioned, and replicated commit log service. It provides the functionality of a messaging system. Kafka maintains feeds of messages in topics. Producers write data to topics and consumers read from topics.

Kafka inbound protocol

The Kafka inbound endpoint acts as a message consumer. It can be configured to read from a specific topic, and, additionally, topic filters can be applied. To use the transport, you need to import Kafka client libraries to the WSO2 EI runtime. The recommended version of Kafka is kafka_2.9.2-0.8.1.1. 

Read more about the Kafka inbound endpoint here.

Kafka Connector

The Kafka Connector is used to publish messages to Kafka topics. It can be configured to use SSL so that messages published to the server is encrypted at the transport layer. As stated earlier, the connector needs to be downloaded from the Connector Store and used in Integration Studio.

Read more about the Kafka connector here.

Amazon SQS integration

Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications. SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering, and at-least-once delivery.

WSO2 Enterprise Integrator comprises transports required to connect and send/receive messages from a configured account at Amazon SQS.

AmazonSQS Streaming Inbound Endpoint

To poll messages from AmazonSQS, there is an inbound endpoint implemented. It creates a connection to an Amazon SQS account, consumes messages from the Amazon SQS queue, and injects the messages into the ESB sequence. It supports transactions, so that you can rollback the messages if the processing is unsuccessful.

The feature needs to be imported into WSO2 EI, and is not shipped out of the box. Please refer to the documentation here.

Amazon SQS connector

The connector implementation is used to send messages from WSO2 Enterprise Integrator to an Amazon SQS queue. Developers are issued an AWS access key ID and an AWS secret access key when they register. They need to be configured at the connector, so that you can connect to SQS using the API provided by Amazon.

The features and configurations are described here.

Nats integration (coming soon)

NATS is a messaging platform that is becoming popular today, especially in the microservices space. It promises a modern technology that provide features to make this easier, scalable, secure, location-independent, and observable.

WSO2 is yet to provide integration support for NATS. However, the features are under development and will be released soon.

Message store processor pattern

As described in “Making HTTP calls asynchronous”, store and forward is one of the commonly used integration patterns. WSO2 Enterprise Integrator provides this feature using two components.

  • Message store - This component is an abstraction of a queue of a messaging provider. The incoming messages are stored into a message store for asynchronous reliable processing.
  • Message forwarding processor - This component points to a particular defined message store. It polls messages from the pointed message store and forwards them to an “endpoint”. The endpoint can be any protocol-based (HTTP, JMS, File, SMS or email etc) outbound from WSO2 Enterprise Integrator. This means the message forwarder can reliably invoke an API in a service with the message, store the message into a file, or send an email out of the message. If forwarding fails, the operation will be retried automatically up to a maximum number of iterations. Then either the processor will stop or drop the message and continue with the next message. Hence, the construct of message forwarding processor provides reliable, in order, guaranteed message delivery.
  • Message sampling processor - This component points to a particular defined message store. The difference between the forwarding processor and sampling processor is that the sampling processor will forward the polled message from the store to a mediation sequence for further mediation, whereas the forwarding processor just sends the message. The sampling processor does not provide any delivery reliability, it just provides an async injection of messages.

When using message store-and-processor, they need to be configured to store messages in someplace. As message storing platforms, following can be configured as per today’s implementation.

Extending transports

If there is any messaging platform you need to integrate WSO2 Enterprise Integrator with, which is unsupported out of the box, you can write your own transport, build a Jar file with the implementation, import it into the runtime, and use it. The following extension points are available:

A summary of WSO2 Enterprise Integrator’s capabilities

The following table presents a summary of WSO2 EI’s integration capabilities.

HA support

When a system is deployed and running in production, it should have fault tolerance.

  • Failover support: Messaging platforms may have their own methods to cater to this non-functional requirement of fault tolerance. Some brokers behave as active-active while some are active-passive. When messaging clients connect to nodes, they can use a “failover” pattern to achieve high availability. The client application is aware of a list of nodes it can connect and it connects to the first node. If the connected node crashes, it fails over to the next available node seamlessly and continues message sending or receiving.
  • Cluster coordination: If we take message store and forward pattern, it needs to provide in order message delivery. At the same time, it needs to provide high availability in the deployment. In order to achieve that, WSO2 Enterprise Integrator nodes need to run a message processor in one of the nodes in the cluster, and if that node crashes, another node needs to pick it up and run.
  • Inbound transport sync: Any inbound transport has a coordination parameter to enable. Once enabled, only one node in the cluster will do message polling and if that node crashes, another node in the cluster will take over.

Conclusion

In this article, we discussed what asynchronous messaging is and what its use cases are. It is a popular messaging pattern not only within middleware systems but also at edge client systems. To integrate synchronous systems and asynchronous systems and to deliver value-added integration patterns to the components of a system using async messaging, an integration tool is required. This article considered a popular integration tool by WSO2, Enterprise Integrator, and summarized asynchronous messaging capabilities provided by it.

About Author

  • Hasitha Abeykoon
  • Associate Technical Lead
  • WSO2
Hasitha is an Associate Technical Lead at WSO2. He has been part of the Message Broker team since he joined in December 2011. He holds a B.Sc. in Computer Science & Engineering from the Department of Computer Science and Engineering of University of Moratuwa, Sri Lanka. Hasitha has been a consultant for several customers on behalf of WSO2, for products such as WSO2 ESB, WSO2 Message Broker, and WSO2 Data Services Server. He focuses his research and development in asynchronous messaging, messaging reliability, NoSQL databases, and big data concepts for performance.