WSO2 MB Samples - SQS SOAP Client Sample

[Documentation Index]

WSO2 MB - Samples : SQS SOAP Client Sample

This guide demonstrates how to use SQS in Message Broker with SOAP Client.

Contents

SQS SOAP Client Sample

We will create a queue and perform operations such as send,receive,delete messages and finally delete the queue. This way you can try out all the operations in SQS web service.

You can generate code with wsdl2java tool for Amazon Simple Queue Service (API Version 2009-02-01) wsdl and try out following sample. Latest SQS WSDL can be found at [1].

To start with SQS, you need to have two keys namely Access Key ID (for example: 44b59c52fc336f1d57d4) and Secret Access Key (for example: 6f1d57d425a077b579f4440e95cc8d8b384375d8). These keys can be obtained once you log into Message Broker admin console and under Manage menu > MessageBoxes/SQS > Access keys.

You can create a java project using wsdl2java tool generated pom.xml. SQSClient.java can be used to create queue, send, receive and delete messages as done in main method.

                package org.wso2.mb.sqs.sample;

                import org.apache.axiom.om.OMAbstractFactory;
                import org.apache.axiom.om.OMElement;
                import org.apache.axiom.om.OMFactory;
                import org.apache.axiom.om.OMNamespace;
                import org.apache.xml.security.utils.Base64;

                import javax.crypto.Mac;
                import javax.crypto.spec.SecretKeySpec;
                import java.math.BigInteger;
                import java.rmi.RemoteException;
                import java.security.SignatureException;
                import java.util.ArrayList;
                import java.util.Date;
                import java.util.List;

                /**
                 * Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
                 *
                 * Licensed under the Apache License, Version 2.0 (the "License");
                 * you may not use this file except in compliance with the License.
                 * You may obtain a copy of the License at
                 *
                 * http://www.apache.org/licenses/LICENSE-2.0
                 *
                 * Unless required by applicable law or agreed to in writing, software
                 * distributed under the License is distributed on an "AS IS" BASIS,
                 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
                 * See the License for the specific language governing permissions and
                 * limitations under the License.
                 */
                public class SQSClient {
                    public static final String QUEUE_NAME = "Shared";
                    public static final String DEFAULT_VISIBILITY_TIMEOUT = "60";
                    public static final String MAX_NUMBER_OF_MESSAGES = "10";
                    public static final String EPR = "http://localhost:9763/services/QueueService";
                    private String accessKey = "4a73a5f10f05d95a61ed";
                    private String secretAccessKey = "d95a61edf72803ee482f45f69e7b57d550e6183e";

                    public static void main(String[] args) throws RemoteException {
                        SQSClient sqsClient = new SQSClient();
                        sqsClient.sendMessages();
                        sqsClient.receiveMessages();
                        sqsClient.deleteMessages();
                        sqsClient.deleteQueue();
                    }

                    /**
                     * Create a queue, send a message
                     *
                     * @throws RemoteException
                     */
                    public void sendMessages() throws RemoteException {
                        QueueServiceStub queueServiceStub = new QueueServiceStub(EPR);
                        QueueServiceStub.CreateQueue createQueue = new QueueServiceStub.CreateQueue();
                        createQueue.setQueueName(QUEUE_NAME);
                        createQueue.setDefaultVisibilityTimeout(new BigInteger(DEFAULT_VISIBILITY_TIMEOUT));
                        // add security soap header for action CreateQueue
                        addSoapHeader(queueServiceStub, "CreateQueue");
                        QueueServiceStub.CreateQueueResponse createQueueResponse = queueServiceStub.createQueue(createQueue);
                        System.out.println("createQueueResponse.getCreateQueueResult().getQueueUrl() = " + createQueueResponse.getCreateQueueResult().getQueueUrl());

                        MessageQueueStub messageQueueStub = new MessageQueueStub(createQueueResponse.getCreateQueueResult().getQueueUrl().toString());
                        MessageQueueStub.SendMessage sendMessage = new MessageQueueStub.SendMessage();
                        sendMessage.setMessageBody("TEST MESSAGE");
                        addSoapHeader(messageQueueStub, "SendMessage");
                        messageQueueStub.sendMessage(sendMessage);
                    }

                    /**
                     * Receive messages from queue
                     *
                     * @throws RemoteException
                     */
                    public void receiveMessages() throws RemoteException {
                        QueueServiceStub queueServiceStub = new QueueServiceStub(EPR);
                        QueueServiceStub.CreateQueue createQueue = new QueueServiceStub.CreateQueue();
                        createQueue.setQueueName(QUEUE_NAME);
                        createQueue.setDefaultVisibilityTimeout(new BigInteger(DEFAULT_VISIBILITY_TIMEOUT));
                        addSoapHeader(queueServiceStub, "CreateQueue");
                        QueueServiceStub.CreateQueueResponse createQueueResponse = queueServiceStub.createQueue(createQueue);
                        System.out.println("createQueueResponse.getCreateQueueResult().getQueueUrl() = " + createQueueResponse.getCreateQueueResult().getQueueUrl());

                        MessageQueueStub messageQueueStub = new MessageQueueStub(createQueueResponse.getCreateQueueResult().getQueueUrl().toString());
                        MessageQueueStub.ReceiveMessage receiveMessage = new MessageQueueStub.ReceiveMessage();
                        receiveMessage.setMaxNumberOfMessages(new BigInteger(MAX_NUMBER_OF_MESSAGES));
                        receiveMessage.setVisibilityTimeout(new BigInteger("20"));
                        addSoapHeader(messageQueueStub, "ReceiveMessage");
                        MessageQueueStub.ReceiveMessageResponse receiveMessageResponse = messageQueueStub.receiveMessage(receiveMessage);
                        MessageQueueStub.Message_type0[] message_type0s = receiveMessageResponse.getReceiveMessageResult().getMessage();
                        if (message_type0s != null) {
                            for (MessageQueueStub.Message_type0 message_type0 : message_type0s) {
                                System.out.println("message_type0.getBody() = " + message_type0.getBody());

                            }
                        }
                    }

                    /**
                     * Delete messages from queue
                     *
                     * @throws RemoteException
                     */
                    public void deleteMessages() throws RemoteException {
                        QueueServiceStub queueServiceStub = new QueueServiceStub(EPR);
                        QueueServiceStub.CreateQueue createQueue = new QueueServiceStub.CreateQueue();
                        createQueue.setQueueName(QUEUE_NAME);
                        createQueue.setDefaultVisibilityTimeout(new BigInteger(DEFAULT_VISIBILITY_TIMEOUT));
                        addSoapHeader(queueServiceStub, "CreateQueue");
                        QueueServiceStub.CreateQueueResponse createQueueResponse = queueServiceStub.createQueue(createQueue);
                        System.out.println("createQueueResponse.getCreateQueueResult().getQueueUrl() = " + createQueueResponse.getCreateQueueResult().getQueueUrl());

                        MessageQueueStub messageQueueStub = new MessageQueueStub(createQueueResponse.getCreateQueueResult().getQueueUrl().toString());
                        MessageQueueStub.ReceiveMessage receiveMessage = new MessageQueueStub.ReceiveMessage();
                        receiveMessage.setMaxNumberOfMessages(new BigInteger(MAX_NUMBER_OF_MESSAGES));
                        receiveMessage.setVisibilityTimeout(new BigInteger("20000"));
                        addSoapHeader(messageQueueStub, "ReceiveMessage");
                        MessageQueueStub.ReceiveMessageResponse receiveMessageResponse = messageQueueStub.receiveMessage(receiveMessage);
                        MessageQueueStub.Message_type0[] message_type0s = receiveMessageResponse.getReceiveMessageResult().getMessage();
                        List<String> receiptHandlers = new ArrayList<String>();
                        if (message_type0s != null) {
                            for (MessageQueueStub.Message_type0 message_type0 : message_type0s) {
                                receiptHandlers.add(message_type0.getReceiptHandle());
                            }
                        }
                        MessageQueueStub.DeleteMessage deleteMessage = new MessageQueueStub.DeleteMessage();
                        deleteMessage.setReceiptHandle(receiptHandlers.toArray(new String[receiptHandlers.size()]));
                        addSoapHeader(messageQueueStub, "DeleteMessage");
                        messageQueueStub.deleteMessage(deleteMessage);
                    }

                    /**
                     * Delete queue
                     *
                     * @throws RemoteException
                     */
                    public void deleteQueue() throws RemoteException {
                        QueueServiceStub queueServiceStub = new QueueServiceStub(EPR);
                        QueueServiceStub.CreateQueue createQueue = new QueueServiceStub.CreateQueue();
                        createQueue.setQueueName(QUEUE_NAME);
                        createQueue.setDefaultVisibilityTimeout(new BigInteger(DEFAULT_VISIBILITY_TIMEOUT));
                        addSoapHeader(queueServiceStub, "CreateQueue");
                        QueueServiceStub.CreateQueueResponse createQueueResponse = queueServiceStub.createQueue(createQueue);
                        System.out.println("createQueueResponse.getCreateQueueResult().getQueueUrl() = " + createQueueResponse.getCreateQueueResult().getQueueUrl());

                        MessageQueueStub messageQueueStub = new MessageQueueStub(createQueueResponse.getCreateQueueResult().getQueueUrl().toString());
                        MessageQueueStub.DeleteQueue deleteQueue = new MessageQueueStub.DeleteQueue();
                        addSoapHeader(messageQueueStub, "DeleteQueue");
                        messageQueueStub.deleteQueue(deleteQueue);
                    }

                    /**
                     * Add security headers for queue service stub
                     *
                     * @param queueServiceStub - queue service stub created with given end point
                     * @param action           - the action to be performed as CreateQueue, ListQueue
                     */
                    private void addSoapHeader(QueueServiceStub queueServiceStub, String action) {

                        OMFactory factory = OMAbstractFactory.getOMFactory();
                        OMNamespace awsNs = factory.createOMNamespace("http://security.amazonaws.com/doc/2007-01-01/", "aws");
                        OMElement accessKeyId = factory.createOMElement("AWSAccessKeyId", awsNs);
                        accessKeyId.setText(accessKey);
                        OMElement timestamp = factory.createOMElement("Timestamp", awsNs);
                        timestamp.setText(new Date().toString());
                        OMElement signature = factory.createOMElement("Signature", awsNs);

                        try {
                            signature.setText(calculateRFC2104HMAC(action + timestamp.getText(), secretAccessKey));
                        } catch (SignatureException e) {
                        }

                        queueServiceStub._getServiceClient().removeHeaders();

                        queueServiceStub._getServiceClient().addHeader(accessKeyId);
                        queueServiceStub._getServiceClient().addHeader(timestamp);
                        queueServiceStub._getServiceClient().addHeader(signature);


                    }

                    /**
                     * Add security headers for message queue service stub
                     *
                     * @param messageQueueStub - message queue service stub created with queue url
                     * @param action           - the action to be performed as SendMessage,DeleteMessage
                     */
                    private void addSoapHeader(MessageQueueStub messageQueueStub, String action) {
                        OMFactory factory = OMAbstractFactory.getSOAP11Factory();
                        OMNamespace awsNs = factory.createOMNamespace("http://security.amazonaws.com/doc/2007-01-01/", "aws");
                        OMElement header = factory.createOMElement("Header", awsNs);
                        OMElement accessKeyId = factory.createOMElement("AWSAccessKeyId", awsNs);
                        accessKeyId.setText(accessKey);
                        OMElement timestamp = factory.createOMElement("Timestamp", awsNs);
                        timestamp.setText(new Date().toString());
                        OMElement signature = factory.createOMElement("Signature", awsNs);

                        try {
                            signature.setText(calculateRFC2104HMAC(action + timestamp.getText(), secretAccessKey));
                        } catch (SignatureException e) {
                        }

                        messageQueueStub._getServiceClient().removeHeaders();

                        messageQueueStub._getServiceClient().addHeader(accessKeyId);
                        messageQueueStub._getServiceClient().addHeader(timestamp);
                        messageQueueStub._getServiceClient().addHeader(signature);
                    }

                    /**
                     * Calculate signature for given data using secret access key
                     *
                     * @param data - data to be signed, action+timestamp
                     * @param key- secret access key
                     * @return signature
                     * @throws java.security.SignatureException
                     *
                     */
                    public static String calculateRFC2104HMAC(String data, String key)
                            throws java.security.SignatureException {
                        final String HMAC_SHA1_ALGORITHM = "HmacSHA1";
                        String result;
                        try {
                            SecretKeySpec signingKey = new SecretKeySpec(key.getBytes(), HMAC_SHA1_ALGORITHM);
                            Mac mac = Mac.getInstance(HMAC_SHA1_ALGORITHM);
                            mac.init(signingKey);
                            byte[] rawHmac = mac.doFinal(data.getBytes());
                            result = Base64.encode(rawHmac);
                        } catch (Exception e) {
                            throw new SignatureException("Failed to generate HMAC : " + e.getMessage());
                        }
                        return result;
                    }


                }


            

References