2009/07/14
14 Jul, 2009

Fusion: Eventing with SOA Part 3 - Implementation and Development of EDA

  • Asanka Abeysinghe
  • CTO - WSO2

Introduction

In the very first article in the series, I explained the concepts eventing and an Event Driven Architecture (EDA). In the second, I discussed detailed implementation details of the WSO2 ESB's eventing capabilities. In this final article of the series, we use those concepts introduced in the first and implementation details of the second, to guide you through the process of build an application with eventing capabilities, with a practical example.

A hypothetical trading application will be used to illustrate the architecture and the implementation. To get a better understanding of the requirements, I recommend you read the article titled 'Developing Financial Applications with WSO2 ESB', also available on OxygenTank. The system used here is similar to the trading application described there.

Additional Related Articles

Fusion: Eventing with SOA - Introduction

Fusion: Eventing with SOA Part 2 - Eventing using Synapse and WSO2 ESB

Applies To

WSO2 Carbon 1.5.x/2.0
WSO2 ESB 2.x
Apache Synapse 1.2 SNAPSHOT

 

Table of Content

Abbreviations

Blotter - the graphical user interface use to enter orders and view market data

OMS - Order Management Server

MDD - Market Data Distribution

Order - instructions to buy/sell in the market

Submission - splitting an Order to multiple instructions

Execution - Information send after a successful execution of a submission

RRSubMan - Remote Registry based SUBscription MANager

ERSubMan - embedded Registry based SUBscription MANager

G-Reg - Governance Registry

System Requirement

Requirement

A trader requires to build a system to be used in the trading floor. This system needs to be consist with trading blotters, order management, market data management and linked to back office servers. Order management server needs to be connected to external order gateways, and the market data management server needs to be connected to the market data feeds.

requirement 

 Figure 1 - Requirement 

Usage of the system

Traders who sit at trading desks will use the blotter to enter orders. Traders will get instructions from clients via phone calls, email, sms etc. Orders entered will be submitted to the OMS, and the OMS will convert them submissions, and forward to the order gateway. Order gateway will send execution reports and acknowledgments back to the OMS, which will then transfer the acknowledgments back to relevant blotters. Traders make decisions based on market data displayed on blotters were picked up from the MDD, that got picked up from the market data source.

A trader might trade on specific symbols for the day, or he might trade on all symbols.

usage 

Figure 2 - Usage

EDA Approach

Proposed system uses the WSO2 ESB as the eventing hub/broker, and use events to communicate between system components. System components are built as services, and publish relevant operations. There will be many event generators and event sinks that publish and consume various event types. Events will be categorized based on topics, where subscribers are able to subscribe using these topics.

Architecture

architecture

Figure 3 - Architecture with EDA

In this first approach, we have a single OMS service for the system. The WSO2 ESB will come in at the center as the central communication hub, handling all requests and responses. The WSO2 ESB uses the WSO2 Governance Registry as the repository to store subscriptions. Order blotter will publish order events (event source) and consume  execution/ack, market data events (event sink). MDD will act as an event source and publish market data events. OMS will act as an event sink to capture market data and order events, and as an event source it will publish submissions.

Topic Hierarchy

Events will be categorized based on topics and interested parties are able to subscribe to these topics using subscriptions. Subscriptions are stored in the WSO2 Registry, in a hierarchical tree structure. In this system, topics created are based on the <event types>/<symbol>. System will have several event types,

  • Market Data (mdd)
  • Orders (ord)
  • Submissions (sub)
  • Executions (exc)
  • Acknowledgments (ack)

System will create topics like mdd/msft, mdd/sunw, ord/msft, ack/goog. Each component in the system could subscribe and publish to the the topics available.

Implementation

WSO2 ESB Configuration

<definitions xmlns="http://ws.apache.org/ns/synapse">
	<eventSource name="TradingEventSource">
		<subscriptionManager
			class="org.wso2.carbon.eventing.impl.RemoteRegistryBasedSubscriptionManager">
			<property name="registryURL"
				value="https://localhost:9446/registry/" />
			<property name="username" value="admin" />
			<property name="password" value="admin" />
			<property name="topicHeaderName" value="Topic" />
			<property name="topicHeaderNS"
				value="https://apache.org/aip" />
			<property name="subscriptionStoragePath" value="/topicspace"/>
		</subscriptionManager>
	</eventSource>

	<sequence name="TradingEventSeq">
		<log level="full" />
		<eventPublisher eventSourceName="TradingEventSource" />
	</sequence>

	<proxy name="TradingEventProxy">
		<target inSequence="TradingEventSeq" />
	</proxy>
</definitions>

The WSO2 ESB uses the WSO2 Registry as the topic space/subscription persistent storage. In order for that to get going, configurations needs to use the RRSubMan as the implementation. Subscription manager stores subscriptions under the patch defined in the property "subscriptionStoragePath". A proxy service named "TradingEventProxy" with an event-publisher mediator is used to dispatch trading events. Events are published asynchronously as one way messages, so there will be only a sequence defined in the proxy service.

Subscriber

There are two event subscribers that exists within the architecture described. They are blotters, OMS and the order gateway adaptor. A simple Axis2 client code can be used to perform the subscriptions to the event-Source (WSO2 ESB). Subscribers can subscribe, unsubscribe, renew subscriptions in addition to check the status of their subscriptions by sending relevant requests to the event-Source. OMS subscription for Orders will look like the following code snippet:

        Options options = new Options();
        ServiceClient serviceClient;
        ConfigurationContext configContext = null;


        if (repo != null && !"null".equals(repo)) {
            configContext =
                    ConfigurationContextFactory.
                            createConfigurationContextFromFileSystem(repo,
                                    repo + File.separator + "conf" + File.separator + "axis2.xml");
            serviceClient = new ServiceClient(configContext, null);
        } else {
            serviceClient = new ServiceClient();
        }
        OMFactory factory = OMAbstractFactory.getOMFactory(); 

        OMNamespace nsxmlins =
                factory.createOMNamespace("https://www.w3.org/2001/XMLSchema", "xmlns");
        OMNamespace nss11 =
                factory.createOMNamespace("https://schemas.xmlsoap.org/soap/envelope", "s11");
        OMNamespace nswsa = factory.createOMNamespace(
                "https://schemas.xmlsoap.org/ws/2004/08/addressing", "wsa");
        OMNamespace nswse =
                factory.createOMNamespace("https://schemas.xmlsoap.org/ws/2004/08/eventing", "wse");
           /**
			 * Constuct the subscription in this place and add to subscribeOM
			 */
        OMElement subscribeOm = factory.createOMElement("Subscribe", nswse);
        OMElement deliveryOm = factory.createOMElement("Delivery", nswse);
        deliveryOm.addAttribute(factory.createOMAttribute("Mode", null,
                "https://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push"));
        OMElement notifyToOm = factory.createOMElement("NotifyTo", nswse);
        OMElement addressOm = factory.createOMElement("Address", nswsa);
        factory.createOMText(addressOm, address);
        OMElement expiresOm = factory.createOMElement("Expires", nswse);
        factory.createOMText(expiresOm, expires);
        OMElement filterOm = factory.createOMElement("Filter", nswse);
        filterOm.addAttribute(factory.createOMAttribute("Dialect", null,
                "https://synapse.apache.org/eventing/dialect/topicFilter"));
        factory.createOMText(filterOm, topic);


        notifyToOm.addChild(addressOm);
        deliveryOm.addChild(notifyToOm);
        subscribeOm.addChild(deliveryOm);
        if (!(expires.equals("*"))) {
            subscribeOm.addChild(expiresOm); // Add only if the value
												// provided
        }
        subscribeOm.addChild(filterOm);        
            // set addressing, transport and proxy url

            serviceClient.engageModule("addressing");
            options.setTo(new EndpointReference(addUrl));

            options.setAction("https://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe");
            serviceClient.setOptions(options);
            System.out.println("Subscribing \n" + subscribeOm.toString());
            try {
                OMElement response = serviceClient.sendReceive(subscribeOm);
                System.out.println("Subscribed to topic " + topic);
                Thread.sleep(1000);
                System.out.println("Response Received: " + response.toString());
                String subId =
                        response.getFirstChildWithName(
                                new QName(nswse.getNamespaceURI(), "SubscriptionManager"))
                                .getFirstChildWithName(
                                        new QName(nswsa.getNamespaceURI(), "ReferenceParameters"))
                                .getFirstChildWithName(
                                        new QName(nswse.getNamespaceURI(), "Identifier")).getText();
                System.out.println("Subscription identifier: " + subId);
            } catch (AxisFault e) {
                System.out.println("Fault Received : " + e.toString());
                System.out.println("Fault Code     : " + e.getFaultCode().toString());
            }

 A working sample can find from the WSO2 ESB binary distribution under ${ESB_HOME}/samples/axis2Client/src/samples/userguide/EventSubscriber.java.

Subscription

There are several types of subscriptions required for the above application architecture. They are order subscriptions, submission subscriptions, execution subscriptions, ACK subscriptions and market data subscriptions. Each subscription message will be identical other than the subscription topic.

<?xml version='1.0' encoding='UTF-8'?>
<soapenv:Envelope
	xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/">
	<soapenv:Header xmlns:wsa="https://www.w3.org/2005/08/addressing">
		<wsa:To>
			https://localhost:8280/services/TradingEventSource
		</wsa:To>
		<wsa:MessageID>
			urn:uuid:268BD7DAE421979A381246694624277
		</wsa:MessageID>
		<wsa:Action>
			https://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe
		</wsa:Action>
	</soapenv:Header>
	<soapenv:Body>
		<wse:Subscribe
			xmlns:wse="https://schemas.xmlsoap.org/ws/2004/08/eventing">
			<wse:Delivery
				Mode="https://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push">
				<wse:NotifyTo>
					<wsa:Address
						xmlns:wsa="https://schemas.xmlsoap.org/ws/2004/08/addressing">
						https://localhost:7070/services/MDDEventSink
					</wsa:Address>
				</wse:NotifyTo>
			</wse:Delivery>
			<wse:Filter
				Dialect="https://synapse.apache.org/eventing/dialect/topicFilter">
				/mdd/
			</wse:Filter>
		</wse:Subscribe>
	</soapenv:Body>
</soapenv:Envelope>

Event Publisher

Several application components in the above architecture will act as event publishers. Blotter, OMS, MDD and the order gateway adaptor will publish different types of events. Events will be constructed based on user and system activities, and sent to the event broker (WSO2 ESB) to be published based on topics. Event publishers can use similar service client codes as event subscribers. A working sample can be found in the WSO2 ESB binary distribution, under ${ESB_HOME}/samples/axis2Client/src/samples/userguide/EventSender.java

Event

Events will not have a standard format, and can be constructed based on input and output data requirements of the event publisher and the event sink. However, it is mandatory to define the topic of the event that the event broker will use to filter and publish the events to the sinks. A sample Order event will look like the following: 

<?xml version='1.0' encoding='UTF-8'?>
<soapenv:Envelope
	xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/">
	<soapenv:Header xmlns:wsa="https://www.w3.org/2005/08/addressing">
		<aip:Topic xmlns:aip="https://apache.org/aip">
			/mdd/DELL
		</aip:Topic>
		<wsa:To>
			https://localhost:8280/services/TradingEventProxy
		</wsa:To>
		<wsa:ReplyTo>
			<wsa:Address>
				https://www.w3.org/2005/08/addressing/none
			</wsa:Address>
		</wsa:ReplyTo>
		<wsa:MessageID>
			urn:uuid:F189B018E12DAA55AE1246696970729
		</wsa:MessageID>
		<wsa:Action>urn:event</wsa:Action>
	</soapenv:Header>
	<soapenv:Body>
		<m:placeOrder xmlns:m="https://services.samples">
			<m:Order>
				<m:CIOrdID>ORD_1</m:CIOrdID>
				<m:HandInst Value="2" />
				<m:MinQty>1000</m:MinQty>
				<m:Instrument>
					<m:Symbol>DELL</m:Symbol>
					<m:IDSource>1</m:IDSource>
					<m:SecurityID>277461109</m:SecurityID>
				</m:Instrument>
				<m:Side Value="1" />
				<m:TransactTime>20000907.09:25:56</m:TransactTime>
				<m:OrderQuantity>
					<m:OrderQty>5000</m:OrderQty>
				</m:OrderQuantity>
				<m:OrderType>
					<m:LimitOrder Value="2">
						<m:Price>62.5</m:Price>
					</m:LimitOrder>
				</m:OrderType>
				<m:Currency Value="usd" />
				<m:Rule80A Value="A" />
			</m:Order>
		</m:placeOrder>
	</soapenv:Body>
</soapenv:Envelope>

Improvements

Once the above architecture is implemented during peak trading hours and peak trading days, a single OMS will prove insufficient to process orders in the trading floor. With the loosely-coupled nature of an EDA, the system can be plugged into additional OMS quite easily and specify the a set of symbols to service. So each OMS will subscribe to events specific for that set of symbols associated with a instance of the OMS. To balance work distribution among the OMS, symbols can be divided using a weighted average algorithm to create topic tables. Topic tables will be create daily, prior to trading. Accumelated total of the previous day's closing balance for each symbol will be used to calculate the weighted average. Each OMS will use the topic table for the instance for subscription. Before the day closes, during the post trading session, OMS instances can unsubscribe for the topics subscribed for the day. Similar to the multiple instances of OMS, system can devide the blotters based on symbols and allow traders to trade on specific symbols from different trading desks.

Figure 4 - Architecture with multiple OMS instances

Summary

WSO2 ESB 2.x ships with in-built WS-Eventing capability that allows users architect and build applications using EDA. The WSO2 ESB does not only provide eventing capability, but it also provides sample code, sample configuration and documentation for many audiences including architects, analysts and developers who build such systems. Enhanced mediation and routing capabilities of the WSO2 ESB powers the eventing functionality. Topic based event filtering allows process-to-process messages, and thereby limiting the exchange of unwanted messages in the wire. EDA provides user the ability to build sopisticated loosely-coupled systems, and facilitated the creation of dynamic, static load balancing systems using topics.

Fusion: Eventing with SOA - Introduction

Fusion: Eventing with SOA Part 2 - Eventing using Synapse and WSO2 ESB

References

 [1] WS-Eventing Specification - https://www.w3.org/Submission/WS-Eventing/

 [2] WSO2 ESB Eventing Samples - https://wso2.org/project/esb/java/2.0.2/docs/samples/eventing_samples.html

 [3] Building Financial Applications using WSO2 ESB - https://wso2.org/library/articles/developing-financial-applications-wso2-esb

Author

Asanka Abeysinghe is an Architect at WSO2. asankaa at wso2 dot com

https://www.asankama.com

 

About Author

  • Asanka Abeysinghe
  • CTO
  • WSO2, Inc