Fusion: Eventing with SOA Part 3 - Implementation and Development of EDA
- Asanka Abeysinghe
- CTO - WSO2
Introduction
In the very first article in the series, I explained the concepts eventing and an Event Driven Architecture (EDA). In the second, I discussed detailed implementation details of the WSO2 ESB's eventing capabilities. In this final article of the series, we use those concepts introduced in the first and implementation details of the second, to guide you through the process of build an application with eventing capabilities, with a practical example.
A hypothetical trading application will be used to illustrate the architecture and the implementation. To get a better understanding of the requirements, I recommend you read the article titled 'Developing Financial Applications with WSO2 ESB', also available on OxygenTank. The system used here is similar to the trading application described there.
Additional Related Articles
Fusion: Eventing with SOA - Introduction
Fusion: Eventing with SOA Part 2 - Eventing using Synapse and WSO2 ESB
Applies To
WSO2 Carbon | 1.5.x/2.0 |
WSO2 ESB | 2.x |
Apache Synapse | 1.2 SNAPSHOT |
Table of Content
- Introduction
- Applies To
- Abbreviations
- System Requirement
- EDA Approach
- Implementation
- Summary
- References
- Author
Abbreviations
Blotter - the graphical user interface use to enter orders and view market data
OMS - Order Management Server
MDD - Market Data Distribution
Order - instructions to buy/sell in the market
Submission - splitting an Order to multiple instructions
Execution - Information send after a successful execution of a submission
RRSubMan - Remote Registry based SUBscription MANager
ERSubMan - embedded Registry based SUBscription MANager
G-Reg - Governance Registry
System Requirement
Requirement
A trader requires to build a system to be used in the trading floor. This system needs to be consist with trading blotters, order management, market data management and linked to back office servers. Order management server needs to be connected to external order gateways, and the market data management server needs to be connected to the market data feeds.
Figure 1 - Requirement
Usage of the system
Traders who sit at trading desks will use the blotter to enter orders. Traders will get instructions from clients via phone calls, email, sms etc. Orders entered will be submitted to the OMS, and the OMS will convert them submissions, and forward to the order gateway. Order gateway will send execution reports and acknowledgments back to the OMS, which will then transfer the acknowledgments back to relevant blotters. Traders make decisions based on market data displayed on blotters were picked up from the MDD, that got picked up from the market data source.
A trader might trade on specific symbols for the day, or he might trade on all symbols.
Figure 2 - Usage
EDA Approach
Proposed system uses the WSO2 ESB as the eventing hub/broker, and use events to communicate between system components. System components are built as services, and publish relevant operations. There will be many event generators and event sinks that publish and consume various event types. Events will be categorized based on topics, where subscribers are able to subscribe using these topics.
Architecture
Figure 3 - Architecture with EDA
In this first approach, we have a single OMS service for the system. The WSO2 ESB will come in at the center as the central communication hub, handling all requests and responses. The WSO2 ESB uses the WSO2 Governance Registry as the repository to store subscriptions. Order blotter will publish order events (event source) and consume execution/ack, market data events (event sink). MDD will act as an event source and publish market data events. OMS will act as an event sink to capture market data and order events, and as an event source it will publish submissions.
Topic Hierarchy
Events will be categorized based on topics and interested parties are able to subscribe to these topics using subscriptions. Subscriptions are stored in the WSO2 Registry, in a hierarchical tree structure. In this system, topics created are based on the <event types>/<symbol>. System will have several event types,
- Market Data (mdd)
- Orders (ord)
- Submissions (sub)
- Executions (exc)
- Acknowledgments (ack)
System will create topics like mdd/msft, mdd/sunw, ord/msft, ack/goog. Each component in the system could subscribe and publish to the the topics available.
Implementation
WSO2 ESB Configuration
<definitions xmlns="http://ws.apache.org/ns/synapse"> <eventSource name="TradingEventSource"> <subscriptionManager class="org.wso2.carbon.eventing.impl.RemoteRegistryBasedSubscriptionManager"> <property name="registryURL" value="https://localhost:9446/registry/" /> <property name="username" value="admin" /> <property name="password" value="admin" /> <property name="topicHeaderName" value="Topic" /> <property name="topicHeaderNS" value="https://apache.org/aip" /> <property name="subscriptionStoragePath" value="/topicspace"/> </subscriptionManager> </eventSource> <sequence name="TradingEventSeq"> <log level="full" /> <eventPublisher eventSourceName="TradingEventSource" /> </sequence> <proxy name="TradingEventProxy"> <target inSequence="TradingEventSeq" /> </proxy> </definitions>
The WSO2 ESB uses the WSO2 Registry as the topic space/subscription persistent storage. In order for that to get going, configurations needs to use the RRSubMan as the implementation. Subscription manager stores subscriptions under the patch defined in the property "subscriptionStoragePath". A proxy service named "TradingEventProxy" with an event-publisher mediator is used to dispatch trading events. Events are published asynchronously as one way messages, so there will be only a sequence defined in the proxy service.
Subscriber
There are two event subscribers that exists within the architecture described. They are blotters, OMS and the order gateway adaptor. A simple Axis2 client code can be used to perform the subscriptions to the event-Source (WSO2 ESB). Subscribers can subscribe, unsubscribe, renew subscriptions in addition to check the status of their subscriptions by sending relevant requests to the event-Source. OMS subscription for Orders will look like the following code snippet:
Options options = new Options(); ServiceClient serviceClient; ConfigurationContext configContext = null; if (repo != null && !"null".equals(repo)) { configContext = ConfigurationContextFactory. createConfigurationContextFromFileSystem(repo, repo + File.separator + "conf" + File.separator + "axis2.xml"); serviceClient = new ServiceClient(configContext, null); } else { serviceClient = new ServiceClient(); } OMFactory factory = OMAbstractFactory.getOMFactory(); OMNamespace nsxmlins = factory.createOMNamespace("https://www.w3.org/2001/XMLSchema", "xmlns"); OMNamespace nss11 = factory.createOMNamespace("https://schemas.xmlsoap.org/soap/envelope", "s11"); OMNamespace nswsa = factory.createOMNamespace( "https://schemas.xmlsoap.org/ws/2004/08/addressing", "wsa"); OMNamespace nswse = factory.createOMNamespace("https://schemas.xmlsoap.org/ws/2004/08/eventing", "wse"); /** * Constuct the subscription in this place and add to subscribeOM */ OMElement subscribeOm = factory.createOMElement("Subscribe", nswse); OMElement deliveryOm = factory.createOMElement("Delivery", nswse); deliveryOm.addAttribute(factory.createOMAttribute("Mode", null, "https://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push")); OMElement notifyToOm = factory.createOMElement("NotifyTo", nswse); OMElement addressOm = factory.createOMElement("Address", nswsa); factory.createOMText(addressOm, address); OMElement expiresOm = factory.createOMElement("Expires", nswse); factory.createOMText(expiresOm, expires); OMElement filterOm = factory.createOMElement("Filter", nswse); filterOm.addAttribute(factory.createOMAttribute("Dialect", null, "https://synapse.apache.org/eventing/dialect/topicFilter")); factory.createOMText(filterOm, topic); notifyToOm.addChild(addressOm); deliveryOm.addChild(notifyToOm); subscribeOm.addChild(deliveryOm); if (!(expires.equals("*"))) { subscribeOm.addChild(expiresOm); // Add only if the value // provided } subscribeOm.addChild(filterOm); // set addressing, transport and proxy url serviceClient.engageModule("addressing"); options.setTo(new EndpointReference(addUrl)); options.setAction("https://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe"); serviceClient.setOptions(options); System.out.println("Subscribing \n" + subscribeOm.toString()); try { OMElement response = serviceClient.sendReceive(subscribeOm); System.out.println("Subscribed to topic " + topic); Thread.sleep(1000); System.out.println("Response Received: " + response.toString()); String subId = response.getFirstChildWithName( new QName(nswse.getNamespaceURI(), "SubscriptionManager")) .getFirstChildWithName( new QName(nswsa.getNamespaceURI(), "ReferenceParameters")) .getFirstChildWithName( new QName(nswse.getNamespaceURI(), "Identifier")).getText(); System.out.println("Subscription identifier: " + subId); } catch (AxisFault e) { System.out.println("Fault Received : " + e.toString()); System.out.println("Fault Code : " + e.getFaultCode().toString()); }
A working sample can find from the WSO2 ESB binary distribution under ${ESB_HOME}/samples/axis2Client/src/samples/userguide/EventSubscriber.java.
Subscription
There are several types of subscriptions required for the above application architecture. They are order subscriptions, submission subscriptions, execution subscriptions, ACK subscriptions and market data subscriptions. Each subscription message will be identical other than the subscription topic.
<?xml version='1.0' encoding='UTF-8'?> <soapenv:Envelope xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/"> <soapenv:Header xmlns:wsa="https://www.w3.org/2005/08/addressing"> <wsa:To> https://localhost:8280/services/TradingEventSource </wsa:To> <wsa:MessageID> urn:uuid:268BD7DAE421979A381246694624277 </wsa:MessageID> <wsa:Action> https://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe </wsa:Action> </soapenv:Header> <soapenv:Body> <wse:Subscribe xmlns:wse="https://schemas.xmlsoap.org/ws/2004/08/eventing"> <wse:Delivery Mode="https://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push"> <wse:NotifyTo> <wsa:Address xmlns:wsa="https://schemas.xmlsoap.org/ws/2004/08/addressing"> https://localhost:7070/services/MDDEventSink </wsa:Address> </wse:NotifyTo> </wse:Delivery> <wse:Filter Dialect="https://synapse.apache.org/eventing/dialect/topicFilter"> /mdd/ </wse:Filter> </wse:Subscribe> </soapenv:Body> </soapenv:Envelope>
Event Publisher
Several application components in the above architecture will act as event publishers. Blotter, OMS, MDD and the order gateway adaptor will publish different types of events. Events will be constructed based on user and system activities, and sent to the event broker (WSO2 ESB) to be published based on topics. Event publishers can use similar service client codes as event subscribers. A working sample can be found in the WSO2 ESB binary distribution, under ${ESB_HOME}/samples/axis2Client/src/samples/userguide/EventSender.java
Event
Events will not have a standard format, and can be constructed based on input and output data requirements of the event publisher and the event sink. However, it is mandatory to define the topic of the event that the event broker will use to filter and publish the events to the sinks. A sample Order event will look like the following:
<?xml version='1.0' encoding='UTF-8'?> <soapenv:Envelope xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/"> <soapenv:Header xmlns:wsa="https://www.w3.org/2005/08/addressing"> <aip:Topic xmlns:aip="https://apache.org/aip"> /mdd/DELL </aip:Topic> <wsa:To> https://localhost:8280/services/TradingEventProxy </wsa:To> <wsa:ReplyTo> <wsa:Address> https://www.w3.org/2005/08/addressing/none </wsa:Address> </wsa:ReplyTo> <wsa:MessageID> urn:uuid:F189B018E12DAA55AE1246696970729 </wsa:MessageID> <wsa:Action>urn:event</wsa:Action> </soapenv:Header> <soapenv:Body> <m:placeOrder xmlns:m="https://services.samples"> <m:Order> <m:CIOrdID>ORD_1</m:CIOrdID> <m:HandInst Value="2" /> <m:MinQty>1000</m:MinQty> <m:Instrument> <m:Symbol>DELL</m:Symbol> <m:IDSource>1</m:IDSource> <m:SecurityID>277461109</m:SecurityID> </m:Instrument> <m:Side Value="1" /> <m:TransactTime>20000907.09:25:56</m:TransactTime> <m:OrderQuantity> <m:OrderQty>5000</m:OrderQty> </m:OrderQuantity> <m:OrderType> <m:LimitOrder Value="2"> <m:Price>62.5</m:Price> </m:LimitOrder> </m:OrderType> <m:Currency Value="usd" /> <m:Rule80A Value="A" /> </m:Order> </m:placeOrder> </soapenv:Body> </soapenv:Envelope>
Improvements
Once the above architecture is implemented during peak trading hours and peak trading days, a single OMS will prove insufficient to process orders in the trading floor. With the loosely-coupled nature of an EDA, the system can be plugged into additional OMS quite easily and specify the a set of symbols to service. So each OMS will subscribe to events specific for that set of symbols associated with a instance of the OMS. To balance work distribution among the OMS, symbols can be divided using a weighted average algorithm to create topic tables. Topic tables will be create daily, prior to trading. Accumelated total of the previous day's closing balance for each symbol will be used to calculate the weighted average. Each OMS will use the topic table for the instance for subscription. Before the day closes, during the post trading session, OMS instances can unsubscribe for the topics subscribed for the day. Similar to the multiple instances of OMS, system can devide the blotters based on symbols and allow traders to trade on specific symbols from different trading desks.
Figure 4 - Architecture with multiple OMS instances
Summary
WSO2 ESB 2.x ships with in-built WS-Eventing capability that allows users architect and build applications using EDA. The WSO2 ESB does not only provide eventing capability, but it also provides sample code, sample configuration and documentation for many audiences including architects, analysts and developers who build such systems. Enhanced mediation and routing capabilities of the WSO2 ESB powers the eventing functionality. Topic based event filtering allows process-to-process messages, and thereby limiting the exchange of unwanted messages in the wire. EDA provides user the ability to build sopisticated loosely-coupled systems, and facilitated the creation of dynamic, static load balancing systems using topics.
Fusion: Eventing with SOA - Introduction
Fusion: Eventing with SOA Part 2 - Eventing using Synapse and WSO2 ESB
References
[1] WS-Eventing Specification - https://www.w3.org/Submission/WS-Eventing/
[2] WSO2 ESB Eventing Samples - https://wso2.org/project/esb/java/2.0.2/docs/samples/eventing_samples.html
[3] Building Financial Applications using WSO2 ESB - https://wso2.org/library/articles/developing-financial-applications-wso2-esb
Author
Asanka Abeysinghe is an Architect at WSO2. asankaa at wso2 dot com