[Tutorial] Scheduled database polling with WSO2 Data Services Server
- Nadeesha Gamage
- Lead Solutions Engineer - WSO2
Objectives
Demonstrates the capability of the WSO2 DSS to poll a database table on predefined intervals to identify changes in the table contents and trigger an event
that would call a proxy service within the WSO2 ESB.
Pre-requisites
WSO2 Data Services Server
WSO2 Enterprise Service Bus
Applies to
WSO2 ESB |
4.0.0 or above |
WSO2 DSS |
3.0.0 or above |
Table of contents
Problem
Task scheduling with WSO2 DSS
Scenario
Setting up the proxy service in the ESB
Adding the scheduler for the Polling Service
Improvements
Resources
Problem
Organizations depend heavily on databases as a data storage medium that can be accessed and queried by multiple systems within the organization’s IT
infrastructure. It is a common use-case to have the ability to listen to a database table to know if a database table is updated and trigger an event.
Similarly a trigger would also need to invoke a web service that resides on a different system and pass some information from the database. Even though
some database vendors support triggers natively, some databases lack this functionality. The WSO2 DSS can be used to implement database polling and
triggers irrespective of the underline database architecture of the organization.
Task scheduling with WSO2 DSS
Let’s look at how WSO2 DSS can be used to schedule tasks; figure 1.1 illustrates the process flow for scheduled database polling scenario using the WSO2
DSS and WSO2 ESB.
WSO2 DSS would poll the database at pre-defined intervals to identify any change in table contents. If a change is identify the WSO2 DSS would trigger an
event by calling a service exposed by the WSO2 ESB. A result-set from the database table would be sent to the WSO2 ESB as a SOAP message. Alternatively
WSO2 DSS can also be configured to support email notifications where a result-set can be sent via an email.
Scenario
For this scenario WSO2 DSS would poll a H2 database table on predefined intervals. Polling operation would execute a select statement and extract data in a
given database table. Each polling operation would only extract the data that was added after the last polled operation.
The H2 database would contain two tables. One table would contain the data that is required for polling and the other table would maintain the timestamp of
the last polled time. The timestamp table would only contain one record, the timestamp of the latest retrieved record. This would be updated with every
poll operation.
Similarly the WSO2 DSS would contain two queries, the first query would read the student_registration table and the timestamp table to find the list of
table entries added after the timestamp stored in the timestamp table. WSO2 DSS would send the query results to WSO2 ESB as a SOAP message. The WSO2 ESB
would process the message and invoke an exposed service within the WSO2 DSS with the latest timestamp. WSO2 DSS would update the timestamp table with the
latest timestamp. This process can be illustrated by the figure 1.2.
The table structure of the two tables is given below
Student_registration table
FIELD |
TYPE |
NULL |
KEY |
DEFAULT |
ID |
INTEGER |
YES |
NULL |
|
NAME |
VARCHAR |
YES |
NULL |
|
CONTACT |
VARCHAR |
YES |
NULL |
|
REGDATETIME |
TIMESTAMP |
YES |
NULL |
Timestamp table
FIELD |
TYPE |
NULL |
KEY |
DEFAULT |
TIMESTAMP |
TIMESTAMP(23) |
YES |
NULL |
|
ID |
INTEGER(10) |
YES |
NULL |
SQL Statements required to create the following tables are given below
CREATE TABLE Student_registration (ID INT, Name VARCHAR, Contact VARCHAR, regdatetime TIMESTAMP);
CREATE TABLE timestamp_table(ID INT, timestamp TIMESTAMP);
Start the WSO2 DSS and log into the admin console of the WSO2 DSS. Click on the create data service icon from the dashboard as shown below.
This would redirect the user to the ‘create data service’ page. Once inside select an appropriate data service name and add the following Service
Namespace. Once this is done set the advanced configuration as shown below and select next.
Service Namespace - https://ws.wso2.org/dataservice/samples/eventing_sample
In the next page a data source needs to be created. We will use the sample H2 database that is shipped with the WSO2 DSS distribution. To create a
datasource provide a name for datasource Id, select the database type and provide the database credentials as shown in the figure below. Once the database
is created the connection can be tested through the ‘Test Connection’ button. Database URL and credentials are given below
Database – RDBMS
Database Engine – H2
URL-jdbc:h2:file:./samples/database/DATA_SERV_SAMP
Username -wso2ds
Passeword -wso2ds
Once the data source is created save and move to the next page, the next page would allow the user to add queries. For this scenario we will need to have
two separate queries. Let’s add the first query.
Click on the ‘Add New Query’ button and you would be redirected to the add query page. Enter an appropriate name for the Query ID field and select the
datasource name which was entered in the previous screen form the datasource drop down menu. Add the SQL statement given below in the SQL textbox indicated
in the figure below.
select * from Student_registration LEFT JOIN (select count(*) from Student_registration) where RegDateTime > (select timestamp from timestamp_table where id= 1);
This statement would select all records from the Student_registration table which is updated after the timestamp given in the timestamp table.
Once the SQL statement is added we need to add the output mapping required for the SQL query. The output mapping requires to have the ‘grouped by element’
and a ‘row name’ set before adding any mapping. Add ‘Students’ and ‘student’ respectively to ‘grouped by element’ and ‘row name’ fields as shown in the
figure below. Once these fields are filled click on the ‘Add new output mapping’ icon as shown below.
Once inside the ‘Add output mapping’ page add the following output mapping details.
Mapping Type |
Data Source Type |
Output Field Name |
Data Source Column Name |
Parameter Type |
Schema Type |
element |
column |
count |
COUNT(*) |
Scalar |
integer |
element |
column |
ID |
ID |
Scalar |
string |
element |
column |
Name |
Name |
Scalar |
string |
element |
column |
Contact |
Contact |
Scalar |
string |
element |
column |
RegDateTime |
RegDateTime |
Scalar |
string |
After adding the output mappings go back to the main configuration page by clicking on the ‘main configuration’ button. Once inside the
main configuration page select manage events section as shown below
Add the following configuration in the add event page and select save. The following configuration defines the condition that needs to be satisfied in
order to trigger an event. In this case whenever the count returns a value more than 0 an event would be triggered that would call the PollingProxy service
in the WSO2 ESB.
Event Id |
Xpath |
Target Topic |
Event Sink URL |
pollingTrigger |
//*[local-name()='count' and namespace-uri()='https://ws.wso2.org/dataservice/samples/eventing_sample']>0 |
polling_Topic |
After the event is added click on the ‘main configuration’ button. The polling event added would be available in the ‘Output Event Trigger’ dropdown as
shown below.
Select the ‘pollingTrigger’ event from the ‘Output Event Trigger’ drop down. Once all the above is done click on the save button. This would create the
query required for polling of data from the database table. Another query is required to be added to update the timestamp once the polling is done. Click
on the ‘Add New Query’
Provide a name for the query and select the datasource from the dropdown menu as we did for the previous query. Add the SQL statement below in the Query
Details section
update timestamp_table set timestamp=:timestamp where id=1;
Once this is done we need to add a new input mapping, this can be done by clicking on the ‘add New input Mapping’ button as shown below
Add the following values, save and return back to the ‘Add Query’ Page
Mapping Name |
Parameter Type |
SQL Type |
Default Value |
IN/OUT Type |
Ordinal |
timestamp |
Scalar |
String |
- |
IN |
2 |
Once this is done the ‘Add Query’ page should look like the following.
If all the above steps are done then click on the save button which would save the second query.
After adding both the queries required click on the next button that would redirect to the Operations page.
Click on the ‘Add New Operation’ button and add the following two operations.
Polling Operation
Add the below in the text boxes provided.
Operation |
QueryId |
pollingOperation |
pollingQuery |
Update Timestamp Operation
Update Timestamp operation would require operation parameters to be added to accept the timestamp parameter passed from the WSO2 ESB. Add the following
values in the textboxes given and the WSO2 DSS would automatically add the relevant parameters for this operation.
Operation |
QueryId |
updateTimestampOperation |
updateTimestamp |
Once the operation details are added it should look like the following
After adding both the operations, the operations page should look like the following.
Once both operations are added click on the finish button, this would add the pollingService to the WSO2 DSS.
Setting up the proxy service in the ESB
In Case both the WSO2 ESB and the WSO2 DSS are running on the same machine, the WSO2 ESB has to be started with the port offset of 1. Port offset can be
set in the <ESB_HOME>/repository/conf/carbon.xml file where <ESB_HOME> is the ESB binary distribution folder. Set the offset value as below.
1
WSO2 ESB would need to have a proxy service that would be exposed to the WSO2 DSS. WSO2 DSS would send a SOAP envelope containing the result-set from the
database poll. The ESB Service would process the result-set and send the latest timestamp back to the WSO2 DSS. WSO2 DSS would in turn update the timestamp
table with the latest timestamp. This process would ensure that the results are polled correctly from the database. The synapse configuration for the WSO2
ESB proxy service is given below. The below synapse configuration would log the results-set passed from the WSO2 DSS and update the timestamp in the
database.
$1
Adding the scheduler for the Polling Service
After creating the Polling Service we can add the scheduler required for this service. This can be done by clicking the ‘Scheduled Task’ link as shown
below.
In the scheduled task page click on the ‘Add New Task’ link
Fill the fields in the scheduled task page as shown above and click on the schedule button.
Task has now been scheduled and the database table would be polled based on the parameters in the ‘New Scheduled Task’ page.
Set a default timestamp in the timestamp table and insert values to the student_registration table, the new records would be fetched by the WSO2 DSS and
passed to the WSO2 ESB. The WSO2 ESB would log these messages and update the timestamp in the timestamp table.
Improvements
· ESB proxy service can be extended to pass the polling result-set to a queue.
· This scenario doesn’t have any error handling. To introduce the error handling, an error handling sequence must be introduced to the proxy service in the
WSO2 ESB.
· Endpoints are specified inside synapse.xml but it is a best practice to store the endpoints in the Governance Registry of the WSO2 ESB.
Reference
https://wso2.com/products/enterprise-service-bus
https://wso2.com/products/data-services-server/
https://docs.wso2.org/display/DSS310/RDBMS
https://docs.wso2.org/display/DSS310/Scheduling+Tasks
Author
Nadeesha Gamage, Senior Engineer - Technical Sales, WSO2 Inc.