2007/12/03
3 Dec, 2007

Writing a Task in WSO2 ESB

  • Upul G
  • - WSO2

Applies to

WSO2 ESB 1.5

Introduction

Writing a Task in ESBWSO2 ESB is a fast, light-weight and versatile Enterprise Service Bus product released under the Apache License v2.0. Using WSO2 ESB you can filter, transform, route and manipulate SOAP, binary, plain XML, and text messages that pass through your business systems by HTTP, HTTPS, JMS, mail etc.

A task in WSO2 ESB allows you to run a piece of code triggered by a timer. This way you can run scheduled jobs at specified intervals. WSO2 ESB is already bundled with a task which can be used to inject a message to ESB given by the configuration at a scheduled interval. See Sample 300 in the Samples Guide in the WSO2 ESB documentation to learn how to use it.

Place Stock Order Task

In this tutorial, we will create a task which will read a text file at a specified location and place orders for stocks that are given in the text file. The text file will have entries such as this one:

IBM,100,120.50

Each line in the text file contains details for a stock order: symbol, quantity and price. The place stock order task will read the text file, a line at a time, and create orders using the given values to be sent to a sample Axis2 server. The text file will then be tagged as processed to include a system time stamp. The task will be scheduled to run every 15 seconds.

Writing the Task Class

The PlaceStockOrderTask class implements org.apache.synapse.startup.Task. Each task should therefore implement the Task interface. This interface is almost as simple as any - it simply has a single execute method. This method contains the code that is to be run at the specified intervals.

In our execute() method, we first check whether the file exists at the desired location. If it does, we then read the file line by line composing place order messages for each line in the text file. Individual messages are then injected to the synapse environment with the given To endpoint reference. We set each message as OUT_ONLY since we do not expect any response for messages. In addition to the execute() method, you can also make the class implement a JavaBean interface. The WSO2 ESB console can then be used to configure the properties of this JavaBean. You can see this in the following example.

The complete code listing of the Task class is given below:

 

public class PlaceStockOrderTask implements Task, ManagedLifecycle {

private Log log = LogFactory.getLog(PlaceStockOrderTask.class);

private String to;
private String stockFile;
private SynapseEnvironment synapseEnvironment;

public void execute() {
log.debug("PlaceStockOrderTask begin");

if (synapseEnvironment == null) {
log.error("Synapse Environment not set");
return;
}
if (to == null) {
log.error("to not set");
return;
}

File existFile = new File(stockFile);
if(!existFile.exists()) {
log.debug("waiting for stock file");
return;
}

try {

// file format IBM,100,120.50
BufferedReader reader = new BufferedReader(new FileReader(stockFile));
String line = null;
while( (line = reader.readLine()) != null){
line = line.trim();
if(line == "") {
continue;
}
String[] split = line.split(",");
String symbol = split[0];
String quantity = split[1];
String price = split[2];

MessageContext mc = synapseEnvironment.createMessageContext();
mc.setTo(new EndpointReference(to));
mc.setSoapAction("urn:placeOrder");
mc.setProperty("OUT_ONLY", "true");

OMElement placeOrderRequest = createPlaceOrderRequest(symbol, quantity, price);
PayloadHelper.setXMLPayload(mc, placeOrderRequest);

synapseEnvironment.injectMessage(mc);

log.info("placed order symbol:" + symbol + " quantity:" + quantity + " price:" + price);
}

reader.close();

} catch (IOException e) {
throw new SynapseException("error reading file", e);
}

File renamefile = new File(stockFile);
renamefile.renameTo(new File(stockFile + "." + System.currentTimeMillis()));

log.debug("PlaceStockOrderTask end");
}

public static OMElement createPlaceOrderRequest(String symbol, String qty, String purchPrice) {
OMFactory factory = OMAbstractFactory.getOMFactory();
OMNamespace ns = factory.createOMNamespace("https://services.samples/xsd", "m0");
OMElement placeOrder= factory.createOMElement("placeOrder", ns);
OMElement order = factory.createOMElement("order", ns);
OMElement price = factory.createOMElement("price", ns);
OMElement quantity = factory.createOMElement("quantity", ns);
OMElement symb = factory.createOMElement("symbol", ns);
price.setText(purchPrice);
quantity.setText(qty);
symb.setText(symbol);
order.addChild(price);
order.addChild(quantity);
order.addChild(symb);
placeOrder.addChild(order);
return placeOrder;
}

public void destroy() {

}

public void init(SynapseEnvironment synapseEnvironment) {
this.synapseEnvironment = synapseEnvironment;
}

public SynapseEnvironment getSynapseEnvironment() {
return synapseEnvironment;
}

public void setSynapseEnvironment(SynapseEnvironment synapseEnvironment) {
this.synapseEnvironment = synapseEnvironment;
}

public String getTo() {
return to;
}

public void setTo(String to) {
this.to = to;
}

public String getStockFile() {
return stockFile;
}

public void setStockFile(String stockFile) {
this.stockFile = stockFile;
}


}

 As you can see, this is a bean implementing two properties - To and StockFile. These are used to configure the Task. You will also notice the use of the ManagedLifecycle interface. This is explained next.

Implementing ManagedLifecycle for Initialization and Cleanup

Since a task implements ManagedLifecyle interface, ESB will call the init() method at the initialization of a Task object and destroy() method when a Task object is destroyed.

public interface ManagedLifecycle {
public void init(SynapseEnvironment se);
public void destroy();
}

In PlaceStockOrderTask we store the Synapse environment object reference in an instance variable for later use with the init() method. The SynapseEnvironment is needed to inject messages into the ESB.

Customizing a task

We can pass values to a task at run time using property elements. In this example, we give the location of the stock order file and its address using two properties within the Task object. The properties can be either String type or OMElement type. For OMElement type you can pass XML elements as values in the configuration file. When creating a Task object, WSO2 ESB will initialize the properties with the given values in the configuration file.

For example, the following properties in our Task class,

public String getStockFile() {
return stockFile;
}

public void setStockFile(String stockFile) {
this.stockFile = stockFile;
}

 

are initialized with the given values within the property element of the task in the configuration.

<syn:property xmlns="http://ws.apache.org/ns/synapse" name="stockFile"
value="/home/upul/test/stock.txt"/>

 

For those properties given as XML elements, properties need to be defined within the Task class using the following format:

public void setMessage(OMElement elem) {
message = elem;
}

 

It can be initialized with an XML elmenent as follows:
(OMElement comes from Apache AXIOM which is used by WSO2 ESB. AXIOM is an object model similar to DOM. To learn more about AXIOM, see the AXIOM tutorial.)

<property name="message">
<m0:getQuote xmlns:m0="https://services.samples/xsd">
<m0:request>
<m0:symbol>IBM</m0:symbol>
</m0:request>
</m0:getQuote>
</property>

Packaging the Custom Mediator

After compiling and bundling the Task class you need to add it to the WSO2 ESB class path. For this, you can add the packaged jar file to the ESB lib directory and the Task class will be available for use from the next time you start WSO2 ESB.

Scheduling the Task

The task can be scheduled in different ways. You can use either interval and count attributes to run the task a specified number of times at a given interval or you can give the scheduled time as a cron style entry. Otherwise you can make the task run only once after ESB starts by using the attribute once. Some examples are given below:

to run every 5 seconds continuously -

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask"> 
<trigger interval="5000"/>
</task>

 

To run every 5 seconds for 10 times -

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask">
<trigger interval="5000" count="10"/>
</task>

 

You can also give cron-style values. .

To run daily at 1:30 AM -

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask">
<trigger cron="30 1 * * * *"/>
</task>

 

To run only once after ESB starts -

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask">
<trigger once="true"/>
</task>

Testing the Mediator

Deploy the SimpleStockQuoteService to sample Axis2 server. Start the sample Axis2 server. For further instructions, see ESB Samples Guide.

We are going to use the following configuration file to test the task.

<?xml version="1.0" encoding="UTF-8"?>
<syn:definitions xmlns:syn="http://ws.apache.org/ns/synapse">
<syn:registry provider="org.wso2.esb.registry.ESBRegistry">
<syn:parameter name="root">file:registry/</syn:parameter>
</syn:registry>
<syn:sequence name="main">
<syn:in>
<syn:log level="full"/>
<syn:filter source="get-property('To')" regex="https://localhost:9000.*">
<syn:send/>
</syn:filter>
</syn:in>
<syn:out>
<syn:send/>
</syn:out>
</syn:sequence>
<syn:sequence name="fault">
<syn:log/>
</syn:sequence>
<syn:task name="placeOrder" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask">
<syn:trigger interval="15000"/>
<syn:property xmlns="http://ws.apache.org/ns/synapse" name="to" value="https://localhost:9000/soap/SimpleStockQuoteService"/>
<syn:property xmlns="http://ws.apache.org/ns/synapse" name="stockFile" value="/home/upul/test/stock.txt"/>
</syn:task>
</syn:definitions>

 

After adding the Task class to class path and restarting the WSO2 ESB, you can add configuration settings using the ESB console. Follow the steps given below:

  1. Log in to the ESB console.
  2. Click Tasks in Manage group.
  3. Click Add. New Task page will open.

    New Task page
    Figure 1

  4. Enter placeOrder as Task name.
  5. Enter org.wso2.esb.tutorial.tasks.PlaceStockOrderTask as Task Implementation and click Load Class. ESB console will load the custom Task class and show you the properties it finds.
  6. Enter https://localhost:9000/soap/SimpleStockQuoteService for To property. Keep property type as Literal.
  7. Copy the stockfile.txt to a new directory. The stockfile.txt will contain the following entries:
    IBM,100,120.50
    MSFT,200,70.25
    SUN,400,60.75
  8. Enter stock.txt file location for stockFile property. Keep property type as Literal.
  9. Select Trigger Type as Simple.
  10. Enter 15000 to Interval.
  11. Click Schedule.

 

ESB will now execute the task every 15 seconds. It will read stockfile.txt and place orders to the sample Axis2 server. The file will then be renamed to include a timestamp.

 

In the sample Axis2 server console you can see the placed orders as follows.

Sun Nov 25 16:58:46 LKT 2007 samples.services.SimpleStockQuoteService  :: Accepted order for : 100 stocks of IBM at $ 120.5
Sun Nov 25 16:58:46 LKT 2007 samples.services.SimpleStockQuoteService :: Accepted order for : 200 stocks of MSFT at $ 70.25
Sun Nov 25 16:58:46 LKT 2007 samples.services.SimpleStockQuoteService :: Accepted order for : 400 stocks of SUN at $ 60.75

 

In this fashion you can write you own tasks to create scheduled jobs within the WSO2 ESB.

Source code: place-stock-order-task.zip

 

Author

Upul Godage, Senior Software Engineer, WSO2 Inc. upul AT wso2 DOT com

 

About Author

  • Upul G
  • IT