ei
2014/04/22
22 Apr, 2014

Demystifying WSO2 ESB Pass-Through Transport - Part III

  • Kasun Indrasiri
  • Director - Integration Technologies - WSO2

Applies to

WSO2 ESB 4.8.x

Table of contents

Introduction

WSO2 ESB is the world’s fastest open source ESB and has been battle tested with thousands of deployments worldwide with
production use cases of over billions of transactions. The non-blocking HTTP transport of WSO2 which is known as Pass-Through Transport is the key component behind all this success.

This series of articles gives an in-depth analysis of WSO2 ESB’s Pass-Through Transport architecture for any advanced user
who is interested in knowing about it in detail.

This is Part III of this article and the main objective here is to give a comprehensive understanding of Pass-Through Transport implementation and the message flow inside the ESB non-blocking HTTP transport layers.

Pass-Through Transport - deep dive

Pass-Through Transport is based on Apache HTTP Core-NIO and hence most of the concepts that we covered in Part I of this article can be directly applied here too. We will discuss in detail how the message processing is done in Pass-Through Transport in the following sections. The idea is to give a comprehensive understanding of the entire message flow within the Pass-Through Transport layer of the ESB (which also gives an overview of end-to-end message flow in WSO2 ESB)

Understanding the state machine

Pass-Through Transport is built on top of a state machine which inherits from the HTTP Core-NIO.

In the request receiving side, we should have a transport listener implementation in which we can listen for inbound HTTP requests. PassThroughHttpListener serves this purpose by implementing the transport listener interface.

The initialization of the PassThroughHttpListener takes care of creating the instance of the NHttpServerEventHandler implementation of Pass-Through Transport, which is known as ‘SourceHandler’.

The SourceHandler receives all the events associated with any particular connection between the client and the ESB.

The ServerIODispatch, which wraps the SourceHandler, is used to notify various IO events from the listener. This is explained below.

init() also creates the source side configuration of the transport which is known as SourceConfiguration. The transport listener side configurations are read and applied by SourceConfiguration through BaseConfiguration (i.e. passthru.properties)

Listening IO Reactor : The start method of the PassThroughHttpListener is responsible for creating the HTTP listener that can accept connection on a given port. This is known as DefaultListeningIOReactor. As the name implies, this is based on the reactor pattern that we discussed in Part I.

Finally, the DefaultListeningIOReactor will be executed on a separate thread by passing the above ‘ServerIODispatch instance. Which means, the reactor can notify the Source Handler via ServerIODispatch on various events related to specific inbound connections.

At this point, we are all set to accept any incoming request from the ESB’s HTTP interface (Pass-Through Transport).

Understanding the internal states

As we discussed in the previous section, multiple operations on SourceHandler and TargetHandler get triggered based on various events that occur in the HTTP Core layer. These different states need to be preserved internally for a given message flow (request/response). Therefore the state of the message flow needs to be kept on both the source and the target handler sides. The states are kept in SourceContext and TargetContext. The following state transition diagram gives a full overview of internal state transitions.

Receiving a request

connected

  • When we receive a request from a client, the SourceHandler gets notified about the incoming connection in its connected (NHttpServerConnection) method. At this point we add this inbound connection in to the SourceConnections, which keeps track of the connections coming in to the transport.
  • In addition, we create the SourceContext, which represents the information about the TCP connection at a given point in time. The newly created SourceContext is added in to the current HTTPContext that is associated with the current connection as ‘CONNECTION_INFORMATION’ attribute.
  • At this point, we have set the state of our internal state machine as ‘REQUEST_READY’. This means that the connection is at the initial stage and is ready to receive a request.

requestReceived

  • SourceHandler’s requestReceived is invoked when we have an incoming HTTP request.
  • We have all the HTTP Headers available at this point (but not the body of the request). Hence we update the state to REQUEST_HEAD. This means that the connection is reading the request headers. When we are updating the state, we simply retrieve the SourceContext that we have set as ‘CONNECTION_INFORMATION’ in the current connection’s HTTP context.
  • As we have the HTTP Request available at this point, we can create SourceRequest, which is the internal representation of the HTTP request (all required information such as headers is acquired from org.apache.http.HttpRequest and copied over to SourceRequest).
  • Then it invokes the start() method of the SourceRequest.
  • This is a very important phase of the message flow. This is where the ‘pipe’ gets created for inbound requests. This instance is known as the ‘source’ pipe and it comprises of input and output buffers (for content aware scenarios). We set that pipe as a ‘reader’ (as we are in the request reading path) into the SourceContext.
  • Then the request is handed over to a worker thread known as ‘ServerWorker’ (runnable). This is a worker thread for executing an incoming request into the transport. These threads are managed in the worker pool which we can configure with worker_pool_size_core and
    worker_pool_size_max in the passthru-http.properties file.
  • One other important thing that happens when we create the ServerWorker is the creation of messagecontext (org.apache.axis2.context.MessageContext) and populate that with all the required properties that are needed for further message processing.
  • Please note that the PASS_THROUGH_PIPE is also set as a message context property, which we will acquire and use while sending the message out.
  • It is important to keep in mind that Message Context is the main information carrier between the transport listener, Axis2, mediation engine (synapse) and the transport sender. So, it has all the information and references related to a given message.
  • Once we process the message, we can inject the message into the Axis2 via AxisEngine.receive(ctx). The message goes through the Axis2 layer and then to the mediation engine (via synapse and proxy service message receivers)
  • However, the entity body of the message is read after the IO event triggers the inputReady method of the source handler. Despite the availability of the content, we can proceed with the current request (with its headers etc.). Therefore the state of the internal state machine is set to REQUEST_HEAD.

inputReady

  • inputReady is called when the channel is ready to read a new portion of the request entity body.
  • Similar to previous methods, we update the state to REQUEST_BODY of the associated SourceContext, which is acquired from the connection's http context ('CONNECTION_INFORMATION').
  • SourceRequest’s read(..) is invoked in order to read the request into the input buffer from the wire.
  • Then we invoke the produce method of Pipe (source) with the associated content decoder(ContentDecoder). i.e. pipe.produce(decoder). The decoder reads bytes from the underlying stream into the input buffer residing in the Pipe.
  • Once the content is fully decoded (produce/read into the input buffer) we set the state of our state machine to ‘REQUEST_DONE’
  • REQUEST_DONE means the request is completely received

exception

  • This is triggered when an I/O error occurs while reading from or writing to the underlying channel or when an HTTP protocol violation occurs while receiving an HTTP request.

timeout

  • This is triggered when no input is detected on this connection over the maximum period of inactivity.

closed

  • This is triggered when the connection has been closed.

Sending a Request

  • When we are sending out the request from the ESB we should have a transport sender implementation in which we can send the outbound HTTP requests. PassThroughHttpSender serves this purpose by implementing the transport sender interface. 
  • The initialization of the PassThroughHttpSender takes care of creating the instance of the NHttpClientEventHandler implementation of Pass-Through Transport, which is known as ‘TargetHandler’. 
  • The TargetHandler receives all the events associated with any particular connection between the ESB and the back-end service.
  • As with the listener side, we have the TargetConfiguration which is created based on the passthru-http.properties and has all the configuration parameters for outbound connections. 
  • The initialization of  PassThroughHttpSender creates a DefaultConnectingIOReactor instance for outbound requests. In addition it wraps the TargetHandler instance with the ClientIODispatch. This allows us to receive IO events at the TargetHandler side. 
  • In addition, we create a DeliveryAgent instance which we use as a gateway for deferred delivery of the messages. When a message is to be delivered it is submitted to this class. If a connection is available to the target this, class will try to deliver the message immediately over that connection. If a connection is not available it will queue the message and request a connection from the pool. When a new connection is available a queued message will be sent through it. (This is used when creating the TargetHandler)  
  • The DefaultConnectingIOReactor is executed with the above ClientIODispatch on a separate thread. 
  • All the things we have explained so far happens during the initialization of the Transport Sender. 
  • When the mediation engine sends a message out, it invokes the invoke(.) method of the transport sender with the MessageContext. You can see how the message context acts as the message meta-model across the message flow (from listener to sender). 
  • The MessageContext is handed over to the DeliveryAgent via submit(.). The DeliveryAgent basically enqueues the message for delivery. 
  • Within the submit operation of the delivery agent we acquire the connection for the required bound request from the TargetConnections pool (TargetConnections#getConnection). There is no connection in the pool that exists for the given route (HTTPRoute : hostname, port etc), when we create a new connection by calling DefaultListeningIOReactor.connect(..). 
  • At this point, event triggering on the TargetHandler starts. 

connected

  • As we did in the SourceHandler side, in the connected method of the TargetHandler we create a TargetContext which is used as a holder for information required during the life-cycle of this outbound connection. We set the created TargetContext as an attribute in the current connection’s HTTP Context under name ‘CONNECTION_INFORMATION’. 
  • The initial state of the TargetContext is set to REQUEST_READY.
  • The DeliveryAgent’s connected method gets notified so that it can poll the queued message from Map<HttpRoute, Queue<MessageContext>> waitingMessages. 
  • At the end of the above flow, the submitRequest method of the DeliveryAgent gets called. 
  • This is one of the most important phases of the outbound request as we are creating TargetRequest from the current message context. Also, we extract the pass-through pipe (‘source’) from the message context and add a consumer to the pipe as well as linking the TargetRequest with the pipe. 

requestReady

  • For the new outgoing HTTP Request, the requestReady gets invoked as soon as the HTTP request headers are available. 
  • We can get the associated TargetRequest from the TargetContext for the given connection and then we simply invoke the start method of the TargetRequest. 
  • A new HTTPRequest is created and all the required HTTP headers are copied. 
  • Finally the crated HTTPRequest is submitted via NHTTPClientConnection#submitRequest(request) method. 
  • The state of the TargetContext is updated to REQUEST_HEAD. 

outputReady

  • outputReady is called when the channel is ready to write a new portion of the request entity. We get the TargetRequest associated with the connection and call the write method with the connection and the encoder. (request.write(conn, encoder))
  • Within the TargetRequest#write(..), it consumes the data from the pipe and writes it to the wire via pipe.consume(encoder). 
  • This is where the content of the input buffer of source pipe is consumed and written in to the wire by the ContentEncoder. 
  • When the encoder is completed, the state of the TargetContext for that particular connection is changed to REQUEST_DONE. 

exception

  • This is triggered when an I/O error occurs while reading from or writing to the underlying channel or when an HTTP protocol violation occurs while receiving an HTTP response.

timeout

  • This is triggered when no input is detected on this connection over the maximum period of inactivity.

closed

  • This is triggered when the connection has been closed.

Receiving a response

responseReceived

  • In the response path, when a new HTTP Response is coming in the TargetHandler’s responseReceived is invoked. At this point all the HTTP headers of the incoming response is available. 
  • A new TargetResponse instance is created and all the required headers are copied to the TargetResponse. 
  • We can correlate the request associated with the response that we get by simply obtaining the request message context from the TargetContext for the current connection (TargetContext.get(conn).getRequestMsgCtx()). 
  • One of the most important thing that happen in this phase is the creation of a new pipe for the response path. This happens when the TargetResponse#start(.) is invoked. A new instance of Pipe named “target” is created. This is exactly similar to the request path.  
  • Here we create Pipe from the incoming request and set that as a reader (because we are reading the response). 
  • Then we create a new worker known as ‘ClientWorker’(Runnable) and pass the TargetResponse along with the request message context. This worker pool configuration is similar to the ServerWorker. 
  • In the ClientWorker constructor we create a new instance of the message context and add all the information that is coming from the transport layer into the message context. 
  • The PASS_THROUGH_PIPE is set as a property in the response message context. 
  • The ClientWorker executes the invocation of the AxisEngine.receive with the response message context. 

inputReady

  • When the channel is ready to read a new portion of the response entity, the inputReady method of the targetHandler gets invoked. If the state of the TargetContext related to the connection is in the expected state, then the state is updated to RESPONSE_BODY. 
  • The relevant TargetResponse is acquired from the associated connection and the read(..) method of the TargetResponse is called with the connection and decoder as input parameters. 
  • This is where we produce the content of the incoming response message’s entity body to the ‘target’ pipe. So, we are reading/decoding the content from the wire into the buffer of our ‘target’ pipe. 
  • Once the decoder is completed, we update the status to ‘RESPONSE_DONE’. 

Sending a response

  • When we are sending back the response, in the PassThroughHttpSender, submitResponse is invoked with the message context and a SourceResponse instance is created out of that message context. All the required transport header and other properties are read from the message context and added into the 
  • One thing to note here is that we do have reference to the SourceReqest associated with the current connection. So, we can refer the initial request message we got for a given response message. In fact when we are creating the SourceResponse we are using the SourceRequest as well. 
  • At the same time we extract the PASS_THROUGH_PIPE from the message context in the submitResponse method of the transport sender. 
  • Then we set the pipe as a writer (as we have to write the response to the wire) at the SourceContext associated with the connection.

responseReceived

  • When a new HTTP Response is available (when the headers are available but not the entity body) responseReceived gets invoked and we start sending the response by calling start(.) method of the SourceResponse. A new HTTPResponse is created from the HTTPResponseFactory (HTTPCore) and all the required transport related values, which are taken from the message context, are set on the HTTPResponse.
  • Finally the connection’s submitResponse method is called with the new http response.
  • The state of the SourceContext associated with that connection is set to RESPONSE_HEAD.

outputReady

  • When the channel is ready for writing the next portion of the response entity the outputReady method is invoked. Prior to writing the content in to the wire, the state is changed to RESPONSE_BODY for that SourceContext.
  • Then SourceResponse#write method is called with the respective connection and the encoder. The encoder reads data from the buffer and writes it into the wire. The consume operation of the ‘target’ pipe is invoked with the encoder and this is where the data residing in the buffer is written into the wire.
  • Once the encoder is completed we should change the state to RESPONSE_DONE.

Summary

Part III of the article has provided you with a knowledgeable insight of Pass-Through Transport design and implementation. Refer to Part 1 to get an understanding of the required fundamentals related to the architecture and the implementation of Pass-Through Transport and Part II for an in-depth explanation of the Pass-Through Transport architecture.

References

  1. Reactor: an object behavioral pattern for concurrent event demultiplexing and event handler dispatching
  2. Reactor Pattern Introduction
  3. Open Source ESB Performance comparison
 

About Author

  • Kasun Indrasiri
  • Director - Integration Technologies
  • WSO2 Inc