JMS Message Delivery Reliability and Acknowledgement Patterns
- Hasitha Abeykoon
- Associate Technical Lead - WSO2
Applies to
WSO2 Message Broker | 1.0.2 |
Content
- Introduction
- Acknowledgement patterns
- Auto Acknowledge acknowledgement pattern
- Duplicates allowed acknowledgement pattern
- Client Acknowledge acknowledgement pattern
- Other Acknowledgement Patterns
- Scenarios enhancing different acknowledgement patterns
- Conclusion
- References
Introduction
Messaging is a method of communication between software components or applications. A messaging system is a peer-to-peer facility: A messaging client can send messages to, and receive messages from, any other client. Each client connects to a messaging agent that provides facilities for creating, sending, receiving, and reading messages.
The idea of messaging is to make communication between distributed systems loosely coupled. One component sends messages to an intermediate destination which is called as JMS Broker and the recipient receives messages from there. For that the communication sender and the receiver do not have to be available at the same time. Specifically sender knows nothing about the receiver - only the message format preferred by the receiver has to be known. The endpoint for a JMS client on the message broker, which is called a destination, can be either a queue or a topic that resides on the JMS broker. A queue of a topic provides the connection between the message producer and consumer or the publisher and the subscriber.
The Java Message Service is a Java API that allows applications to create, send, receive, and read messages. JMS enables messaging not only loosely coupled but also asynchronous and reliable.
- Asynchronous: A JMS provider can deliver messages to a client as they arrive; a client does not have to request messages in order to receive them.
- Reliable: The JMS API can ensure that a message is delivered once and only once.
Here reliability of message delivery plays a vital role as most applications do not tolerate duplication of messages. On the other hand this can create an issue of message loosing. The basic mechanisms for achieving or affecting reliable message delivery are as follows:
- Controlling message acknowledgement: You can specify various levels of control over message acknowledgement.
- Specifying message persistence: You can specify that messages are persistent, meaning that they must not be lost in the event of a provider failure.
- Setting message priority levels: You can set various priority levels for messages, which can affect the order in which the messages are delivered.
- Allowing messages to expire: You can specify an expiration time for messages so that they will not be delivered if they are obsolete.
- Creating temporary destinations: You can create temporary destinations that last only for the duration of the connection in which they are created.
Acknowledgement Patterns
Acknowledgement is a consumer side concept. Acknowledgement is the way that a consumer informs the JMS provider that it has successfully received a message. On the producer side, the only notion of acknowledgement consists of a successful invocation of either the topic publishe’s publish method or the queue sender’s send method.
If an acknowledgement is given for a message it indicates that the JMS provider must not deliver that same message to the consumer in question again and also the JMS provider can release any resources it is holding on behalf of the said message (i.e if a JMS queue is considered after acknowledgement of successful delivery, that message is removed from the queue). In order to minimize resource consumption, consumer applications need to acknowledge messages as quickly as possible after successful delivery.
1. Duplicates allowed acknowledgement pattern
Using this mode, the session acknowledges messages in a lazy way. This pattern has pros and cons.
Pros
- Provides faster message processing as the client does not try to acknowledge immediately after a message is delivered.
Cons
- If JMS fail happens, the same message can be delivered multiple times creating duplicates.
- Only applications that can tolerate message duplication should use this acknowledgement model.
In a general JMS client this mode can be configured using DUPS_OK_ACKNOWLEDGE constant in the Session interface.
2. Auto Acknowledge acknowledgement pattern
In Auto Acknowledgement model for each message session it automatically acknowledges whether the client has received the message or not. It is important to see what are the scenarios that can have reliability issues with auto-acknowledgement mode. Usually, message receipt can be done in two ways - synchronously and asynchronously. In these two different models, the acknowledgement happens at different times and bring different results.
- Synchronous: acknowledgement happens just before the call to a message consumer's “receive” or “receiveNoWait” method returns a message. Thus if the JMS provider or message consumer crashes before “receive” returns the message to the consumer, the message will be lost.
- Asynchronous: acknowledgement happens right after the “onMessage” method returns successfully after invoking consumer’s MessageListener. Thus if JMS crashes after “onMessage” completed but before the acknowledgement was recorded, a duplicate of the message will redeliver.
The above situations cannot be considered as limitations of JMS specification, but it is the nature of doing auto-acknowledgement in a distributed system. To prevent above situations the only thing that can be done is avoiding the specified scenarios by maintaining persistent state in client or using a distributed transaction.
3. Client Acknowledge acknowledgement pattern
This acknowledgement mode gives consumer applications more control over message acknowledge. For an example the consumer can group a set of messages and then specifically invoke “acknowledge” method to instruct the JMS provider that the message, together with all other messages received up to this point, has been successfully consumed.
There are few things to be noted in this acknowledgement pattern.
- When a consumer uses client acknowledge, it can use “recover” method to revert back to its last check point. It can instruct the session to redeliver all messages that have not been yet acknowledged by the consumer.
- If a client crashes and later again re-connects to its topic or queue the session will be recovered and all unacknowledged messages will be delivered again.
4. Other Acknowledgement Patterns
These acknowledgement modes are JMS provider specific, and therefore, they compromise the JMS application portability.
Transacted Session
A transacted session is a related group of consumed or produced messages that is treated as a single unit of work. A transaction, as it generally means, can either be committed or rollback.
- When the session’s “commit” method is called, all the consumed messages in that group are acknowledged or the produced messages are sent.
- When the session’s rollback method is called, produced messages are destroyed or consumed messages are recovered.
A transacted session always has a “current” transaction and applications do not explicitly start a transacted session. When commit or rollback is called current transaction ends and a new one immediately starts. Other than Transacted session there are two other ways acknowledgements can happen - known as Message-driven beans with CMTD and Message-driven beans with BMTD.
Following tree demonstrate the transaction options a typical JMS client can use.
Scenarios enhancing different acknowledgement patterns
Setting up the environment
To demonstrate the impact of various acknowledgement patterns as well as redelivery, I will use one sender. The WSO2 Message Broker will be used as the JMS broker. The sender sends simple integers as object messages to a queue created in the Message Broker. Each transaction option has a different receiver. Each receiver demonstrates the impact of choosing a particular transaction option as well as highlights the impact on message redelivery. The sender and receivers utilize common administered objects: connection factory and queue. Note that scenarios explained at [1] are adapted to suit WSO2 Message Broker for the following explanations.
First message sender should be setup as follows.
- Download WSO2 Message Broker 1.0.2 from here and extract it.
- Create a new Java project adding java libraries listed under wso2mb_Home/client lib to your classpath. You will also need jars required for logging which can be downloaded from here.
JMS Sender
Add following class to your project as the JMS sender. Note that WSO2 Message Broker uses Qpid broker TCP port : 5672 by default.
package org.sample.jms; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.jms.*; import java.util.Properties; public class MessageSendingClient { public void sendMessage() { Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); try { InitialContext initialContext = new InitialContext(initialContextProperties); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); // Send message Queue queue = queueSession.createQueue("myQueue;{create:always, node:{durable: true}}"); QueueSender queueSender = queueSession.createSender(queue); for(int count =0; countJMS Receiver
The following class acts as a super class for all types of message receiver classes with different acknowledgement patterns which does all the environment setup for message delivery. In later stages we create customized JMS receiver classes extending this class. Note that WSO2 Message Broker uses Qpid broker TCP port : 5672 by default.
package org.sample.jms; import javax.jms.*; import javax.naming.InitialContext; import java.io.InputStreamReader; import java.util.Properties; public abstract class SuperJMSReceiver { protected void setBackground() { QueueConnectionFactory queueConnectionFactory = null; QueueConnection queueConnection = null; QueueSession queueSession = null; QueueReceiver queueReceiver = null; Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); try { InitialContext initialContext = new InitialContext(initialContextProperties); queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); queueConnection = queueConnectionFactory.createQueueConnection(); queueSession = createQueueSession(queueConnection); final QueueSession queueSession1 = queueSession; Queue aQueue = queueSession.createQueue("myQueue"); queueReceiver = queueSession.createReceiver(aQueue); MessageListener aML = new MessageListener() { public void onMessage(Message aMessage) { try { processMessage(aMessage, queueSession1); } catch (JMSException e) { e.printStackTrace(); } } }; //register the message listener queueReceiver.setMessageListener(aML); queueConnection.start(); InputStreamReader aISR = new InputStreamReader(System.in); char aAnswer = ' '; do { aAnswer = (char) aISR.read(); if ((aAnswer == 'r') || (aAnswer == 'R')) { queueSession.recover(); } } while ((aAnswer != 'q') && (aAnswer != 'Q')); } catch (Exception e) { e.printStackTrace(); } finally { try { if (queueReceiver != null) { queueReceiver.close(); } if (queueSession != null) { queueSession.close(); } if (queueConnection != null) { queueConnection.stop(); queueConnection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } protected void processMessage(Message aMessage, QueueSession queueSession) throws JMSException { if (aMessage instanceof ObjectMessage) { ObjectMessage aOM = (ObjectMessage) aMessage; System.out.print(aOM.getObject() + " "); //we will sleep the thread for some time so that we have enough time to observe try { Thread.sleep(1500); } catch (InterruptedException e) { System.out.println("interrupted"); } } } //we will create different sessions on different ACK patterns in sub classes protected abstract QueueSession createQueueSession(QueueConnection aQC) throws JMSException; }Scenario 1: send 10 messages to WSO2 Message Broker queue and then run consumer JMS client in Auto Acknowledgement mode.
Write a new JMS client extending the above JMS receiver class - AutoAcknowledgeJMSReceiver.
import javax.jms.*; public class AutoAcknowledgeJMSReceiver extends SuperJMSReceiver{ public static void main(String[] args) { System.out.println("Starting Receive..."); new AutoAcknowledgeJMSReceiver().setBackground(); System.out.println("Ending Receive..."); } protected QueueSession createQueueSession( QueueConnection aQC ) throws JMSException { return aQC.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); } }Run scenario 1:
- Publish 10 new messages running JMS sender above.
- Under Mange>>queues>>List in Message Broker UI you will see like below.
- When you run AutoAcknowledgeJMSReceiver, you will see the output as,
Starting Receive...
0 1 2 3 4 5 6 7 8 9
Here each message is automatically acknowledged after the message returns from the onMessage() method. Thus receiver receives all messages one after the other and continues to listen for new messages. As soon as a new message hit the queue it will be delivered to the receiver.
If you stop AutoAcknowledgeJMSReceiver and send some new messages to WSO2 Message Broker, you will see that the message count goes up. They are stored in the broker queue. Now as soon as you run AutoAcknowledgeJMSReceiver all of the new messages will be delivered to the JMS client, and message count of the queue will go down to zero.
Scenario 2: send 10 messages to WSO2 Message Broker queue and then run consumer JMS client in duplicates_OK acknowledgement mode.
Write a new JMS message receiver client extending above JMS receiver class - DuplicatesOkayReceiver.
import javax.jms.*; public class AutoAcknowledgeJMSReceiver extends SuperJMSReceiver{ public static void main(String[] args) { System.out.println("Starting Receive..."); new AutoAcknowledgeJMSReceiver().setBackground(); System.out.println("Ending Receive..."); } protected QueueSession createQueueSession( QueueConnection aQC ) throws JMSException { return aQC.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); } }
Run scenario 2:
- Publish 10 new messages running JMS sender above.
This is due to the lazy acknowledgement in duplicates_ok mode.
Scenario 3: send 10 messages to WSO2 Message Broker queue and then run consumer JMS client in Client_Acknowledge acknowledgement mode.
To use client acknowledgement mode when creating receiver’s session first argument should be “false” to indicate non-transacted session and as the second argument you have to specify acknowledgement mode as “CLIENT_ACKNOWLEDGE”. In client acknowledgement mode calling “acknowledge()” the received messages can be acknowledged. There’s no use of calling “acknowledge()” without setting JMS client’s acknowledgement mode.
Following JMS client uses Client Acknowledgement pattern. In the processMessage() method, the ClientReceiver acknowledges only message “5”.
import javax.jms.*; public class ClientAckReceiver extends SuperJMSReceiver{ public static void main(String[] args) { System.out.println("Starting Receive..."); new ClientAckReceiver().setBackground(); System.out.println("Ending Receive..."); } protected QueueSession createQueueSession( QueueConnection aQC ) throws JMSException { return aQC.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); } protected void processMessage(Message aMessage, QueueSession aQS) throws JMSException { if (aMessage instanceof ObjectMessage) { ObjectMessage aObjectMessage = (ObjectMessage) aMessage; System.out.print(aObjectMessage.getObject() + " " ); Integer value = (Integer) aObjectMessage.getObject(); int intVal = value.intValue(); if (intVal == 5) { //send the acknowledge aObjectMessage.acknowledge(); } try { Thread.sleep(1500); } catch (InterruptedException e) { System.out.println("interrupted"); } } } }
Run scenario 3:
- Publish 10 new messages running JMS sender above.
- Then run ClientAckReceiver JMS client class.
- When message “3” is delivered stop the JMS client.
- Now go to Manage>>queues>>List in Message Broker UI and observe how many messages are there. You would see all 10 messages remain. No messages have been removed from the queue.
- Again run ClientAckReceiver. Wait until message “6” is printed, and check the queue size. It will be four. That is because only message “5” is acknowledged. So up to message “5”, messages are removed from the queue. But the message “6” is not acknowledged and it will remain in the queue.
- If you run ClientAckReceiver again you will see message “6” is redelivered.
Things to note:
- Generally, acknowledging a particular message acknowledges all prior messages the session receives.
- In above example only message “5” is explicitly acknowledged. All prior messages are implicitly acknowledged and messages after 5 are not acknowledged.
- To stop message delivery within a session and redeliver all the unacknowledged messages, you can use the Session.recover method. When you call recover for a session, the message service:
- Stops message delivery within the session.
- Marks all unacknowledged messages “redelivered”, including those that have been delivered.
- Restarts sending all unacknowledged messages, beginning with the oldest message.
- The method can be called only by a non-transacted session; it throws an IllegalStateException if it is called by a transacted session.
Scenario 4: send 10 messages to WSO2 Message Broker queue and then run consumer JMS client in transacted acknowledgement mode.
In order to implement this acknowledgement pattern when creating JMS receiver’s session the first argument should be set to “true”. Just ignore the second argument as there’s no constant as “Session.TRANSACT_ACKNOWLEDGE”. The application indicates successful message processing by invoking the Session’s commit() method. The application can reject a message or a message group by invoking Session’s rollback() method. Calling the commit() method commits all the messages the session receives. Similarly, calling the rollback() method rejects all the messages the session receives.
Session's commit() and rollback() methods can only be invoked with the transacted session option set.
You can check it changing SuperJMSReceiver class as follows.
do { aAnswer = (char) aISR.read(); if ((aAnswer == 'r') || (aAnswer == 'R')) { queueSession.rollback(); } if ((aAnswer == 'c') || (aAnswer == 'C')) { queueSession.commit(); } } while ((aAnswer != 'q') && (aAnswer != 'Q'));
Conclusion
Using the correct pattern of acknowledgement is very important when implementing JMS client applications, because it affects message loosing and message duplicates. Using WSO2 Message Broker as the message broker you can observe what happens. Specially when using WSO2 ESB as a JMS client these parameters are important to guarantee the delivery and recover-ability of messages in some failure during transactions. According to [2] and above scenarios the following table explains how delivery guarantee behaves with the acknowledgement patterns.
References
Author
Hasitha Abeykoon, Software Engineer, WSO2 inc.