Library

Large Scale Data Transfer with Data Services Streaming

Archived Content
This article is provided for historical perspective only, and may not reflect current conditions. Please refer to relevant product page for more up-to-date product information and resources.
  • By Anjana Fernando
  • 5 Jan, 2010

Applies To


WSO2 Data-Services Server 2.2.0

Objective

The objective of this tutorial is to present the data streaming capability of WSO2 Data Services Server. In using the data streaming functionality, theoretically there is no limit to the data size of a data service response. Some of the main advantages of this approach are:

  • Memory efficiency – There is no memory build up in the server, as in the case where the full result is stored in memory. But here, as the data is generated, it is streamed to the client.
  • Low response time – Since the data is returned as soon as it is generated from the server, the response will be instantaneous for the client, and will be able to process the data as its streamed real-time from the server.

Using a sample MySQL database, a data service will be created to show data streaming in action.

Prerequisites

  • Download WSO2 Data Services Server latest build from http://wso2.org/downloads/data-services-server.
  • Download and install Apache Ant from http://ant.apache.org/.
  • Install WSO2 Data Services Server as a stand-alone server (Install location will be referred to as DS_HOME here after).
  • Start the server (Run DS_HOME/bin/wso2server.bat | wso2server.sh).
  • Open a web browser and navigate to https://localhost:9443/carbon.
  • For first time user, login using the default credentials: username=admin, password=admin.

Step 1 – Create Database

We will be creating a sample database with a table which will contain an id field, and two text fields each having 1KB of data. There are 524288 records, which adds up to a Gigabyte of textual data. The following script will create the database and populate the data.

stream_db.sql

DROP DATABASE IF EXISTS StreamTestDB;
CREATE DATABASE StreamTestDB;
GRANT all on StreamTestDB.* to 'user1'@'localhost' identified by 'pass1';
USE StreamTestDB;
CREATE TABLE Data (id INT, field1 VARCHAR(1024), field2 VARCHAR(1024));

DELIMITER $$
 DROP PROCEDURE IF EXISTS PopulateData$$
 CREATE PROCEDURE PopulateData()
       BEGIN
               DECLARE count INT;
               DECLARE strDataEntry VARCHAR(1024);
               SET count = 1;
               SET strDataEntry =  '';
               WHILE count <= 1024 DO
                           SET strDataEntry = CONCAT(strDataEntry, 'x');
                           SET count = count + 1;
               END WHILE;
               SET count = 1;
               WHILE count <= 524288 DO
                           INSERT INTO Data VALUES (count, strDataEntry, strDataEntry);
                           SET count = count + 1;
               END WHILE;
               SELECT strDataEntry;
       END$$
   DELIMITER ;

CALL PopulateData();

Run the above script with the following command,

# mysql -u root -p < stream_db.sql

You will be first prompted for the MySQL root password, and then the database will be created.

Step 2 – Create the Data Service

Here we are defining a rather simple data service with a single operation which will return all the records in the database we created earlier. The data service definition is as follows.

StreamingTestDS.dbs

<?xml version="1.0" encoding="UTF-8"?> 
<data name="StreamingTestDS"> 
    <config id="ds1"> 
        <property name="org.wso2.ws.dataservice.driver">com.mysql.jdbc.Driver</property> 
        <property name="org.wso2.ws.dataservice.protocol">jdbc:mysql://localhost:3306/StreamTestDB</property> 
        <property name="org.wso2.ws.dataservice.user">user1</property> 
        <property name="org.wso2.ws.dataservice.password">pass1</property> 
        <property name="org.wso2.ws.dataservice.minpoolsize">2</property> 
        <property name="org.wso2.ws.dataservice.maxpoolsize">10</property> 
    </config> 
    <query id="dataQuery" useConfig="ds1"> 
        <sql>select id, field1, field2 from Data</sql> 
        <result element="Data" rowName="DataElement" defaultNamespace="http://org.test.streaming"> 
            <element name="id" column="id" xsdType="xs:integer"/> 
            <element name="field1" column="field1" xsdType="xs:string"/> 
            <element name="field2" column="field2" xsdType="xs:string"/> 
        </result> 
    </query> 
    <operation name="getData"> 
        <call-query href="dataQuery"/> 
    </operation> 
</data>

Save the above contents to a file called 'StreamingTestDS' and upload to Data Services as shown in Figure 2.1.

Data Service Upload Screen

Figure 2.1: Data Service Upload Screen

Step 3 – Create Service Client

After the data service is deployed, we have to write a service client in order to access it. Here we will be writing an Apache Axis2 client for consuming the data service.

StreamingDSClient.java

import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamReader;

import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMFactory;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;

public class StreamingDSClient {

	private static OMElement getPayload() {
		OMFactory fac = OMAbstractFactory.getOMFactory();
		OMElement op = fac.createOMElement(new QName("getData"));
		return op;
	}

	public static void main(String[] args) throws Exception {
		/* service target end-point */
		String epr = "http://localhost:9763/services/StreamingTestDS";
		
		/* create the service client */
		ServiceClient client = new ServiceClient();
		Options options = new Options();
		client.setOptions(options);
		options.setTo(new EndpointReference(epr));
		options.setAction("urn:getData");
		
		/* invoke service */
		long beginTime = System.currentTimeMillis();
		OMElement res = client.sendReceive(getPayload());
		
		/* process the result */
		XMLStreamReader reader = res.getXMLStreamReaderWithoutCaching();
		int eventType;
		long count = 0, idSum = 0;
		boolean isId = false;
		while (reader.hasNext()) {
			eventType = reader.next();
			if (eventType == XMLStreamConstants.START_ELEMENT) {
				if (reader.getLocalName().equals("id")) {
					isId = true;
					count += 2;
					if (count % 1024 == 0) {
						System.out.println("Data Read: " + count / 1024 + " MB");
					}
				}
			} else if (isId && eventType == XMLStreamConstants.CHARACTERS) {
				idSum += Integer.parseInt(reader.getText());
				isId = false;
			}
		}
		long endTime = System.currentTimeMillis();
		System.out.println("\nID Sum: " + idSum);
		System.out.println("Time: " + ((endTime - beginTime) / 1000) + " seconds.");		
	}
}

We will be using 'ant' in order to build the client and run it. The following build file will be used for that purpose.

build.xml

<?xml version="1.0" encoding="UTF-8"?> 
<project default="run"> 
    <property environment="env"/> 
    <property name="axis2.home" value="${env.AXIS2_HOME}"/> 
    <path id="class.path"> 
        <pathelement path="."/> 
        <pathelement path="${java.class.path}"/> 
        <fileset dir="${axis2.home}"> 
            <include name="lib/*.jar"/> 
        </fileset> 
    </path> 
    <target name="compile"> 
        <javac fork="true" destdir="." srcdir="."> 
            <classpath refid="class.path"/> 
        </javac> 
    </target> 
    <target name="run" depends="compile"> 
        <java fork="true" classname="StreamingDSClient"> 
            <classpath> 
                <path refid="class.path"/> 
            </classpath> 
        </java> 
    </target> 
</project>

Keep the build.xml and StreamingDSClient.java in the same directory. Before running the client, the AXIS2_HOME environment variable should be set to DS_HOME/repository path.

e.g. :-

Unix/Linux:

# export AXIS2_HOME=/home/laf/home/laf/dev/bin/wso2dataservices-2.2.0/repository

Windows:

# set AXIS2_HOME=c:\wso2dataservices-2.2.0\repository

Now to build and run the client, navigate to the directory which has the source code and type 'ant'. There the code will be compiled and run. The output should resemble the following.

 

.
.
.
     [java] Data Read: 1019 MB 
     [java] Data Read: 1020 MB 
     [java] Data Read: 1021 MB 
     [java] Data Read: 1022 MB 
     [java] Data Read: 1023 MB 
     [java] Data Read: 1024 MB 
     [java] 
     [java] ID Sum: 137439215616 
     [java] Time: 92 seconds. 

BUILD SUCCESSFUL 
Total time: 1 minute 34 seconds 

In the service client, it prints a message for every one megabyte of streamed data. It also calculates the sum of all the 'id' fields that the client reads. By using the id sum, we can be sure that all of the 524288 records will be read. The formula for adding consecutive numbers from 1 to n is (n+1)*n/2 which equals to 137439215616 when n=524288.

So here with a web service invocation which persisted for 92 seconds (the time most probably will be different for each user, which depends on the user's environment), in this time period, we have managed to stream a Gigabyte of data.

Summary

This tutorial looked into the streaming functionality of the WSO2 Data Services Server. The advantages of data services streaming are: low memory consumption in the server and also quick response times for the client. With this functionality, it effectively allows for a server and client to make data service calls, which could return an unlimited size data set.

Author
Anjana Fernando, Software Engineer, WSO2 Inc. anjana at wso2 dot com