Saturday, January 3, 2015

Making my home smarter...

Anyone had this idea about building a cloud application for controlling some of your home appliances over web or mobile?  Well… here’s something that might interest you.

Taking a look back at my own university dissertation 15 years ago which was about home automation using power line communication, just wanted to build something useful to make my home a little bit smarter :-).  At a minimum, I wanted to build an application to control some of the home appliances (i.e. lighting systems, various switches, etc) over web and mobile and use analytics on the statistical data so that I could measure things like power consumption, fluctuations, etc. whilst I am away from home.

What about Hardware?

15 years ago, although it wasn't called as Internet of things (IoT), there was a bit of hype about integrating appliances to a common control platform.    At that time, X-10 was one such technology that came out in the market for sending device control instructions over the electricity grid so that you didn't have to wire up a separate control network for passing instructions.  What about WiFi?.... well that wasn't even heard of those days, so being able to use the same electricity grid for both power distribution as well as for sending device control instructions was a pretty cool though (reliability aside :-) ).  However, none of these technologies had a strong emphasis on leveraging internet as a standard option for remote connectivity, simply for the reason of dedicated Internet was a luxury home users didn't have.   Setting up an appliance control infrastructure over highly unreliable dial-up connectivity was not at all a feasible option; hence the peripheries were usually restricted to a closed solution that required a certain control device be wired up to the serial port of a PC for sending out instructions to the power grid.

15 years later, it certainly doesn't look like as X-10 has picked up much momentum on retail appliance control space,  but if you search on eBay, you would surely find a number of alternatives that can interface with your home appliances through some sort of a controller device.   By the way, don’t be too excited when I say ‘control your appliances’, but at a minimum what I mean is that these technologies currently have the ability to switch on/off your appliances and control the electricity flow (and measure power consumption too).  
If you are really interested, take a look at [1] for such hardware you can use here.  Again, my intention here was to build a common API layer that should be ‘interfaceable’ for various hardware devices as long as they provide ways to a.] query topology (i.e. device registrations), b.] pass control instructions to each connected device separately, and c.] query for device’s current state as and when required.

Architecture


Following is a quick sketch of what I wanted to implement.   For client (a Java agent running on a Raspberry Pi) to server side interfacing, MQTT [2] was somewhat an easy choice to make, given that it’s lightweight and have been designed with interoperability in mind and it fits nicely for low-bandwidth connectivity specifically for IoT use cases.  I am still learning about this protocol by the way, but for what I could gather so far, it seems to have some nice features for persistence, connection recovery, etc. 


The primary task of the agent is to collect various messages (topology, status) from the appliance controller and publish them to the broker whist collecting instructions from the server side and pass them on to the appliance controller.  Obviously both these tasks would require the Java agent to transform these in/out messages specific to both parties.   So, if there is a new set of devices I would need to interface with in the future, I will only have to implement the messaging interfaces I have defined (hopefully :-)… umm… may be I should test it out with a different set of hardware now). 
Appliance statistics that gets periodically collected by the java agent and published on to the broker will be dumped in a big data store at the server side.  I used MongoDB [3] for this, not for any specific reason, but I like its JSON based querying API as well as its pretty fast aggregation framework [4].   At this point, I haven’t made use of any big data analytics product out there, but I could write some code to extract average power consumptions, etc and couple it to a couple of cool JQuery/highcharts graphs.
In case if anyone’s interested of the technologies and frameworks I’ve used for this, here’s a quick summary.

Web App

Implemented using ASP MVC.NET.  Used twitter bootstrap + JQuery to make the UI responsive.  Highcharts provided the functionality I needed for those charting components (i.e. data range zooming, etc.).  Most of the UI functions are bound to Ajax calls making the user experience much nicer.

Broker

Mosquitto [5] was my choice.  Actually, this is the first time I used this MQTT broker, hence I may not be the ideal guy to answer all your technical queries related to this (at least not yet :-)).
Paho (which is an eclipse project now) provided me the client libraries I need from both the Java side (Raspberry Pi) and .NET server side [6].

Agent running on Raspberry Pi

This is a lightweight Java application that coordinates messaging between the broker and the hardware.  It does involve quite a bit of message transformations and for the hardware I used, I could use the apache HttpClient libraries to send post HTTP requests and also query for status info which I could use to extract the topology info as well as statistical data.








References
  

[2] MQTT - http://mqtt.org/


[4] MongoDB Aggregation framework - http://docs.mongodb.org/v2.2/applications/aggregation/

[5] Mosquitto - http://mosquitto.org/

[6] Mosquito MQTT Broker and client libs - https://github.com/mqtt/mqtt.github.io/wiki/libraries

Sunday, February 23, 2014

What's inside the new WSO2 carbon throttle core?

So what’s coming up with the new throttle core we’ve been working on during the last few weeks?  Here’s a quick overview of the new design and the features that has been made available to the dependent applications.

Cluster transparency
Cluster synchronization of run-time throttle data has been made fully transparent to the caller applications.  For callers, it is as simple as calling a single method by passing basic parameters such as callerID and callerConfig references that returns back the throttle decision.  There is no cluster related information the caller has to deal with anymore.

ThrottleContext throttleContext = throttle.getThrottleContext(contextID);

boolean isAllowed = throttleContext.canAccess(callerID, callerConfig);


Persistence of run-time throttle data
A throttle policy with a long spanned throttle window can now benefit from the in-built persistence engine so that the last known caller contexts are restored during the re-initialization process. In other words, you may reboot the whole cluster but the throttle core is now capable of restoring the last persisted set of caller context so that the cluster can service the new requests from where it stopped.

Cleanup
Abandoned caller contexts are cleaned up at the end of a configurable tolerance window using callers last access times.

Context groupings made easy
Throttle core can now be extended for various context grouping needs.  For example, say you want to group callers based on their subnet masks or perhaps the reverse looked up domain names.  To achieve this, you can easily extend the CallerConfiguration and CallerContext classes and override the caller identification methods to satisfy the grouping criteria you wish to use, and that’s really it!  There is absolutely no need to modify the throttle engine!

Minimum impact to the main thread

Throttle handler has been enhanced to ensure that persistence and cluster replication only makes the least required involvement with the main request thread.



... and the Design

  • The new throttle core encapsulates two maps.  The first map keeps the caller configuration objects that is usually created during the initialization phase.  These configuration objects are created using a set of predefined rules such as the throttling policy definition (i.e. WSO2 API Manager does exactly that through the tiers.xml).
  • The second map contains the caller context objects that gets dynamically created based on the caller attributes such as application or resource identifiers.  This is what we use to track run-time throttle data for the engine to decide on either granting or denying access to a resource.
  • The purpose of the Replicator Task is to absorb the cluster replication complexities from the throttle engine so that the engine will no longer need to undertake the task of notifying context changes to the neighboring cluster members.  The replicator task is also responsible of synchronizing the global counters between cluster members within a configurable time interval.
  • PersisterTask too runs on a separate thread for periodically persisting the active caller contexts on to a configurable datastore.  We are planning to incorporate the cluster manager’s coordinator selection into this so that only the cluster elected coordinator will dynamically pick the task of persisting and reloading the caller context list.


We are currently doing further integration tests with the APIM 1.6 and expect to append this as a carbon core component with the next releases.  We are also planning to further enhance the throttle mediator based on this implementation bringing in all key considerations that have been addressed by this new design.

Wednesday, November 20, 2013

Building a data entitlements ecosystem with WSO2 IS, ESB, and DSS

Introduction 

Most enterprises have a tendency to consider data security as a function attached to or supported by the business logic layer.  A common practice is to use a key identifier such as user roles or permissions to be part of the data filtering criteria often combined with other business specific filtering.  Traditionally this practice has evolved with the comfort which system designers took in making sure that business components and data components would reside within the same deployment and system administration boundaries.

Things have evolved.  With enterprises adapting more loosely coupled and SOA driven architectures, the need for federated security has become a point of discussion across all application layers, including data.  Commonly known as data entitlements, the idea is to use enterprise-wide security models at the point of producing and storing data in and out of the data providers so that the applications will not have to handle filtering based on entitlements.

So why is this important?  I think the answer can be found if you take a closer look at some typical issues seen on enterprises related to data duplication and consolidation nightmares, overlapping security policies and policy administration overheads.

Let me try to explain the problem through a common use case.

'SomeCo' is a premium advertising company with multiple business units to handle a large client base.  They have a divided sales organization specialized for banking & financial, manufacturing & retail, food and beverages.  The company maintains a pool of applications that needs to comply with data entitlements policies across the enterprise.  The platform needs to make sure that sales information related to a particular domain is only made visible to the members of that particular sales group as well as anyone with an organizational role above a senior manager.  This policy needs to be enforced across all applications that present sales information to the users.




Let’s take a look at a typical ecosystem that WSO2 can provide for SomeCo’s enterprise data entitlements need.  For this solution, I am going to leverage the WSO2 ESB, IS and DSS.   I believe the purpose of using DSS should be quite familiar to most… and that is to expose SomeCo’s sales data as services.   We are going to use WSO2 IS’s attribute based authorization service built on XACML to define entitlements policies for our sample service call.   The idea is, based on some user identifier (i.e. UsernameToken), we will query the WSO2 IS for a set of claims through a XACML request and built our dynamic query based on claims received as part of the response before calling the DSS service.


Solution




- Our aim is to provide a unified interface to the consumer applications to access Sales data so that applications do not need to handle data filtering based on users’s security attributes (such as user roles).

- Assume the scenario where the users of these different sales organizations need to query sales details through different applications (i.e. Sales forecasting, revenue predictions etc).    To achieve this, our solution needs to expose a service API (i.e. let’s say an API called getSalesInfo), which typically is an ESB endpoint which these applications can connect to.

- Each application needing to consume this API will send its request payload with the authentication headers (in this sample a UsernameToken) to the ESB endpoint.

- After the ESB authenticates the request (usually against the same user store which the applications are connected to), the ESB flow needs to acquire a set of claims this user is entitled for.  WSO2 ESB provides an entitlements mediator for this purpose.  The primary role of the entitlements mediator is to create and send a XACML request to WSO2 IS, and branch out the API flow based on the decision (i.e. Permit, Deny, indeterminate, Not Applicable).  In addition to the decision, this mediator can also receive claims (in the form of advices) back from IS which is really what we are interested in to build the dynamic DSS query.

- As an example, we can receive user’s roles as advices inside the XACML response so that we can build the DSS service’s filter criteria based on roles.

- As you might have noticed, the caller applications didn’t have to worry at all about filtering for data entitlements and it was all handled by the service API we have created using the WSO2 components.


ESB Mediation Flow



The heart of the mediation flow is the part where we extract claims out of the XACML response to build our dynamic query.  In many instances, the filter we might need to construct would usually be quite specific to transnational data; hence we may need to translate those claims into a filter query which the DSS service will be able to successfully map to SQL.   Well, we can think of many possible options for doing this, but for simplicity I implemented a simple mediator to build the filter query.  

Please refer [R2] for the Java code I used to read the advices from the Message context, and then create the filter query for the DSS service.

Resources

[R1] Sample XACML Policy with AdviceExpressions for returning claims to the caller (marked in red). 
Note the XACML policy defines that user's role is returned with the decision. The entitlements mediator can grab this and pass this on to the ESB sequence which can construct the dynamic query based on these claims.  In this example, we are using the user's role, but it can be any attribute that is in the user store and retrieved by Policy Information Point (PIP)

<Policy xmlns="urn:oasis:names:tc:xacml:3.0:core:schema:wd-17"  PolicyId="CustomerServiceSales" RuleCombiningAlgId="urn:oasis:names:tc:xacml:1.0:rule-combining-algorithm:first-applicable" Version="1.0">
   <Target></Target>
   <Rule Effect="Permit" RuleId="Rule1">
      <Target>
         <AnyOf>
            <AllOf>
               <Match MatchId="urn:oasis:names:tc:xacml:1.0:function:string-regexp-match">
                  <AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">customerproxy</AttributeValue>
                  <AttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:resource:resource-id" Category="urn:oasis:names:tc:xacml:3.0:attribute-category:resource" DataType="http://www.w3.org/2001/XMLSchema#string" MustBePresent="true"></AttributeDesignator>
               </Match>
               <Match MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
                  <AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">read</AttributeValue>
                  <AttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:action:action-id" Category="urn:oasis:names:tc:xacml:3.0:attribute-category:action" DataType="http://www.w3.org/2001/XMLSchema#string" MustBePresent="true"></AttributeDesignator>
               </Match>
               <Match MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
                  <AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">sales</AttributeValue>
                  <AttributeDesignator AttributeId="http://wso2.org/claims/role" Category="urn:oasis:names:tc:xacml:1.0:subject-category:access-subject" DataType="http://www.w3.org/2001/XMLSchema#string" MustBePresent="true"></AttributeDesignator>
               </Match>
            </AllOf>
         </AnyOf>
      </Target>
   </Rule>
   <AdviceExpressions>
      <AdviceExpression AdviceId="customerService" AppliesTo="Permit">
         <AttributeAssignmentExpression AttributeId="employee.role">
            <AttributeDesignator AttributeId="http://wso2.org/claims/role" Category="urn:oasis:names:tc:xacml:1.0:subject-category:access-subject" DataType="http://www.w3.org/2001/XMLSchema#string" MustBePresent="true"></AttributeDesignator>
         </AttributeAssignmentExpression>
      </AdviceExpression>
   </AdviceExpressions>
</Policy> 



[R2] Mediator code to build filter query

@Override
public boolean mediate(MessageContext ctx) {

  try {
  // Retrieve advices from the message context
  String rawXML = ctx.getProperty("adviceXml").toString();

   
  OMElement advice = AXIOMUtil.stringToOM(rawXML);

  Iterator<OMElement> advicesIter = advice
.getFirstChildWithName(new QName("", "Result"))
.getFirstChildWithName(new QName("", "AssociatedAdvice"))
.getFirstChildWithName(new QName("", "Advice"))
.getChildren();

   String sqlfilter = " ";

   // Build filter query.  In this sample we will use the role names as they are.
   while (advicesIter.hasNext()) {

 OMElement elem = advicesIter.next();

 String filterColumnName = elem.getAttribute(
 new QName("", "AttributeId")).getAttributeValue();
         sqlfilter += filterColumnName + "='" + elem.getText() + "'";

 if (advicesIter.hasNext()) {

 sqlfilter += " OR ";
 }
}

ctx.setProperty("DSSFilter", sqlfilter);

    } catch (XMLStreamException e) {
    
    }

  return true;

}



[R3[ ESB flow

<proxy name="getSalesInfo"
  transports="https http"
  startOnLoad="true"
  trace="disable">
  
  <description/>
  
  <target inSequence="InSeq" outSequence="OutSeq">
     <endpoint>
     <address uri="http://localhost:9765/services/customerservice"/>
     </endpoint>
   </target>
   </proxy>

   <sequence name="OutSeq">
      <send/>
      <log level="full"/>
   </sequence>
   
<sequence name="InSeq">
      <entitlementService remoteServiceUrl="https://localhost:9444/services/" remoteServiceUserName="admin" remoteServicePassword="enc:kuv2MubUUveMyv6GeHrXr9il59ajJIqUI4eoYHcgGKf/BBFOWn96NTjJQI+wYbWjKW6r79S7L7ZzgYeWx7DlGbff5X3pBN2Gh9yV0BHP1E93QtFqR7uTWi141Tr7V7ZwScwNqJbiNoV+vyLbsqKJE7T3nP8Ih9Y6omygbcLcHzg=">
         <onReject>
            <makefault version="soap12">
               <code xmlns:soap12Env="http://www.w3.org/2003/05/soap-envelope"
                     value="soap12Env:Receiver"/>
               <reason value="UNAUTHORIZED"/>
               <node/>
               <role/>
               <detail>XACML Authorization failed</detail>
            </makefault>
         </onReject>
         <onAccept>
            <send>
               <endpoint>
                  <address uri="http://localhost:9765/services/customerservice"/>
               </endpoint>
            </send>
         </onAccept>
         <obligations/>
         <advice/>
      </entitlementService>

   </sequence>

References

[a] Blog : Guide to write XACML policies in WSO2 Identity Server 2.0
http://blog.facilelogin.com/2009/06/guide-to-write-xacml-policies-in-wso2.html

[b] Managing Entitlement
http://docs.wso2.org/display/IS450/Managing+Entitlement

Friday, November 8, 2013

Integrating HornetQ with WSO2 ESB

Introduction

HornetQ is a JMS compliant open source asynchronous messaging project from JBoss. WSO2 ESB provides a simplified configuration model to integrate any JMS compliant messaging system.  Recently I came across few discussion threads looking for a sample configuration between HorenetQ and WSO2 ESB, hence this is a short article to outline the steps. 

Sample Scenario

In this example, I am going to expose an ESB proxy service which accepts sample SOAP messages and push them to a JMS queue configured in HornetQ.  This ESB proxy will only execute an outward operation and will not return anything back to the caller.

Steps

1.       First, create a sample queue by editing $HORNET_HOME/config/stand-alone/non-clustered/hornetq-jms.xml.

NOTE : There are multiple execution modes supported by HornetQ, but I am only going to use the stand-alone configuration for this example.

<queue name="wso2">
      <entry name="/queue/mySampleQueue"/>
</queue>


 2.       Put the following 2 connection factory entries that will be required if the ESB needs to act as a JMS consumer.  However, this example will only cover the scenario of WSO2 ESB acting as a producer of which the incoming payload will simply be pushed to a queue.

<connection-factory name="QueueConnectionFactory">
      <xa>false</xa>
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="/QueueConnectionFactory"/>
      </entries>
</connection-factory>



<connection-factory name="TopicConnectionFactory">
      <xa>false</xa>
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="/TopicConnectionFactory"/>
      </entries>

</connection-factory>

 3.       Copy HoenetQ client JARs into $ESB_HOME/repository/components/lib.


NOTES :
[a.] There is a current limitation on the ESB that whenever multiple non-OSGi JARs having the same package names are copied, the process to convert them as OSGi bundle may drop these packages on subsequent JARs.  Hence I had to assemble those multiple client side JARs into a single JAR.  There are multiple ways to do that (i.e Maven JAR assembly plugin, etc.), and for the convenience I am attaching the assembled JAR for HornetQ 2.3.0.  You just have to copy this hornet-all.jar into $ESB_HOME/repository/components/lib if you decide to use this already assembled JAR.

Download the jar from here


[b.] If you opt to pack the JARs yourself, please make sure the remove the javax.jms package from this assembled JAR to avoid the carbon runtime from picking this implementation of JMS over the bundled-in distribution.

 4.       Uncomment the following line to enable JMS transportSender on axis2 core @ $ESB_HOME/repository/conf/axis2/axis2.xml

<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>


 5.       Let’s configure a simple proxy service as below which accept a simple SOAP message and push the SOAP envelope into the mySampleQueue. 

<proxy xmlns="http://ws.apache.org/ns/synapse" name="toJMSProxy" transports="https,http" statistics="disable" trace="disable" startOnLoad="true">
   <target>
      <inSequence>
         <property name="Accept-Encoding" scope="transport" action="remove"/>
         <property name="Content-Length" scope="transport" action="remove"/>
         <property name="Content-Type" scope="transport" action="remove"/>
         <property name="User-Agent" scope="transport" action="remove"/>
         <property name="OUT_ONLY" value="true"/>
         <send>
            <endpoint>
               <address uri="jms:/queue/mySampleQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory&java.naming.provider.url=jnp://localhost:1099&transport.jms.DestinationType=queue"/>
            </endpoint>
         </send>
      </inSequence>
      <outSequence>
         <send/>
      </outSequence>
   </target>
   <publishWSDL uri="file:repository/samples/resources/proxy/sample_proxy_1.wsdl"/>
   <description>HornetQ-WSO2 ESB sample</description>
</proxy>


NOTES :
[a] We are creating a proxy for the SimpleStockQuoteService shipped with the ESB samples.  You don’t really need to have the actual service running for this example, as our plan is to push the incoming payload into the JMS queue (and not calling the actual back-end service). 

[b] Extra HTTP headers are removed to present only the XML content for validation. 

[c] This is only a one way operation pushing the payload into the queue, hence OUT_ONLY is set to true. 

[d] You may need to change the hostname, port etc. on the JMS endpoint string matching to your environment. 

6.       Start the WSO2 ESB server and use a client application of your choice to call the Proxy service endpoint.  Here I am using soapUI to create a request calling the placeOrder operation of the proxied SimpleStockQuoteService.




7.       Multiple ways to check if the payload was successfully pushed to the JMS queue in HornetQ, but I used a simple Java client to pull the message out from mySampleQueue.

public class HornetQJMSTestClient {

       public static void main(String[] args) throws Throwable {

              // Step 1. Create an initial context to perform the JNDI lookup.              Hashtable<String, String> env = new Hashtable<String, String>();
              env.put(Context.PROVIDER_URL, "jnp://localhost:1099");
              env.put(Context.INITIAL_CONTEXT_FACTORY,
                           "org.jnp.interfaces.NamingContextFactory");
              env.put(Context.URL_PKG_PREFIXES,
                           "org.jboss.naming:org.jnp.interfaces  ");
              Context ctx = new InitialContext(env);

              // Step 2. Lookup the connection factory              ConnectionFactory cf = (ConnectionFactory) ctx
                           .lookup("/ConnectionFactory");

              // Step 3. Lookup the JMS queue              Queue queue = (Queue) ctx.lookup("/queue/mySampleQueue ");

              // Step 4. Create the session
              Connection connection = cf.createConnection();
              Session session = connection.createSession(false,
                           Session.AUTO_ACKNOWLEDGE);

              // Step 5. Create a JMS Message Consumer to receive message
              MessageConsumer messageConsumer = session.createConsumer(queue);

              // Step 6. Start the Connection so that the server
              connection.start();

              // Step 7. Receive the message              TextMessage messageReceived = (TextMessage) messageConsumer
                           .receive(5000);
              System.out.println("Received message: " + messageReceived.getText());

              // clean up all the JMS resources              connection.close();

       }
 
} 

NOTE :If you need to configure the WSO2 ESB as a JMS consumer, you will need to enable the transportReceiver block with HornetQ configuration parameters as follows on the axis2.xml file.


<transportReceiver name="jms"
class="org.apache.axis2.transport.jms.JMSListener">
<parameter name="myTopicConnectionFactory" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.jnp.interfaces.NamingContextFactory</parameter>
<parameter name="java.naming.factory.url.pkgs" locked="false">org.jboss.naming:org.jnp.interfaces</parameter>
<parameter name="java.naming.provider.url" locked="false">jnp://localhost:1099</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName"
locked="false">TopicConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType"
locked="false">topic</parameter>
</parameter>
<parameter name="myQueueConnectionFactory" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.jnp.interfaces.NamingContextFactory</parameter>
<parameter name="java.naming.factory.url.pkgs" locked="false">org.jboss.naming:org.jnp.interfaces</parameter>
<parameter name="java.naming.provider.url" locked="false">jnp://localhost:1099</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName"
locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType"
locked="false">queue</parameter>
</parameter>
<parameter name="default" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.jnp.interfaces.NamingContextFactory</parameter>
<parameter name="java.naming.factory.url.pkgs" locked="false">org.jboss.naming:org.jnp.interfaces</parameter>
<parameter name="java.naming.provider.url" locked="false">jnp://localhost:1099</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName"
locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType"
locked="false">queue</parameter>
</parameter>

</transportReceiver>

Thursday, August 15, 2013

ISO8583 with WSO2 ESB

Introduction

ISO8583 is a messaging standard that is commonly used by banks and financial sector institutes for transactions between devices such as ATMs and switches, as well as for card payments.  Many might call it more of a ‘device-to-device’ protocol but since recently, I have come across situations where several financial institutions were looking for solutions that can provide applications with a more ‘simpler’ northbound API capable of absorbing the complexity of a southbound ISO8583 message and transport.

With message transformation and transport level flexibilities provided by WSO2 ESB, I thought it might be useful for developers and system designers if I could demonstrate how this can be achieved with few configuration artifacts and + java code.

Prerequisites
  • Fair knowledge on WSO2 ESB API configuration
  • Previous experience with creating mediators/transports for WSO2. (Well, even if you haven’t done this before, my attempt is to give you as much insight as possible on how to do this)

Products used

WSO2 ESB – Version 4.7.0


ISO8583 Java ports

Well, I really wasn’t looking to (re)implement a Java API for ISO8583 at this point, hence I did some research about the options available.  I wouldn’t say that I spent enough time analyzing pros and cons of each of these available options, but I thought both jPOS [1] and jISO8583 [2] looked promising.  I opted to use jPOS for this PoC as it had enough documentation I needed for the scenario I wanted to cover.  Also jPOS seems to have adequately addressed the implementation of multiple delivery channels (i.e. BASE24, ASCII, etc.) making it adaptable for organizational specific message formats.

Sample Scenario

Let’s take the scenario of a certain financial application needing to make a credit transaction by sending an XML message that needs to be converted to an ISO8583 byte stream before passed on to the wire through a TCP channel.



Design

In the context of WSO2 ESB, there are multiple ways to do this.  We can write a mediator which could be the option that many would choose, but I wanted to make this slightly more exciting for myself as well as for the readers.  With the idea of a one-to-one XML field mapping to ISO8583 fields, I am going to implement a Transport Sender which will create a jPOS ISOMsg object and serialize into an ASCII Channel that will establish a TCP connection with an ISO8583 server port, deliver the payload and close the socket connection. In this example, I am not going to implement code to receive anything back from the ISO8583 server, hence we will only configure an OUT_ONLY API. If you are to implement a full blown adapter of such nature, you will typically need to implement the following components.

  • Message Builder – Typically for ESB southbound (request) where the northbound payload (XML in this case) is transformed to an ISO8583 payload.
  • Transport Sender – ESB outbound channel where the ISO8583 output stream is sent to the wire typically through a TCP channel.
  • Transport Listener – ESB inbound channel where the ISO8583 binary message is read from the wire typically through a TCP channel.
  • Message Formatter – Typically for ESB northbound (response) where the incoming ISO8583 payload out of the Transport Listener is transformed back to a northbound payload (XML in this case).


Implementation

First, we need to define our ISO8583 field definition.  This might be a bit confusing to some.  If we are dealing with a specification, why do we need a field definition?  This is because that ISO8583 specification is not hard-binding any data elements and/or field ordering. It is entirely up to the application designer to define which field types/IDs need to be placed for their specific transnational requirements.

At a glance, the field definition file looks like the following.

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE isopackager SYSTEM "genericpackager.dtd">
<isopackager>
  <isofield
      id="0"
      length="4"
      name="Message Type Indicator"
      class="org.jpos.iso.IFA_NUMERIC"/>
  <isofield
      id="1"
      length="16"
      name="Bitmap"
      class="org.jpos.iso.IFA_BITMAP"/>
  <isofield
      id="2"
      length="19"
      name="Primary Account number"
      class="org.jpos.iso.IFA_LLNUM"/>
  <isofield
      id="3"
      length="6"
      name="Processing Code"
      class="org.jpos.iso.IFA_NUMERIC"/>
</isopackager>

Please refer to [3] & [4] for a complete reference of ISO8583.   As per now, let me just say that each field should have an ID, a length and type specified in its definition.  I have only listed a snippet of the XML config here, and you may find the full definition jposdef.xml inside the codebase.
I have created a simple maven project to implement this transport.  Make sure that you have included the jPOS dependencies on pom.xml as follows.
              <dependency>
                     <groupId>org.jpos</groupId>
                     <artifactId>jpos</artifactId>
                     <version>1.9.0</version>
              </dependency>
              <dependency>
                     <groupId>com.sleepycat</groupId>
                     <artifactId>je</artifactId>
                     <version>4.0.92</version>
             </dependency>


To implement the transport sender, you need to subclass the AbstractTransportSender and implement its sendMessage method as follows.
public class ISO8583TransportSender extends AbstractTransportSender {

       @Override
       public void sendMessage(MessageContext msgCtx, String targetEPR,
                     OutTransportInfo outTransportInfo) throws AxisFault {

              try {

                     ISOMsg isoMsg = toISO8583(msgCtx);

                     URI isoURL = new URI(targetEPR);

                     ISOPackager packager = new GenericPackager(this.getClass()
                                  .getResourceAsStream("jposdef.xml"));

                     ASCIIChannel chl = new ASCIIChannel(isoURL.getHost(),
                                  isoURL.getPort(), packager);
                    
                     chl.connect();
                     chl.send(isoMsg);
                     chl.disconnect();

              } catch (Exception e) {
                     throw new AxisFault(
                                  "An exception occurred in sending the ISO message");
              }
    }
}

As I mentioned before, in a full blown implementation, you need to split the message builder/formatter from the transport sender and receiver logic.  Largely due to the fact that I am using the jPOS library for both ISO message encapsulation as well as for streaming through the TCP channel, I am going to implement the following logic in my transport sender class itself.
A new API needs to be configured on WSO2 ESB to accept our application friendly northbound XML payload.

   <api name="iso" context="/iso">
      <resource methods="POST">
         <inSequence>
            <property name="OUT_ONLY" value="true"/>
            <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
            <send>
               <endpoint name="isoserver">
                  <address uri="iso8583://localhost:5000"/>
               </endpoint>
            </send>
            <drop/>
         </inSequence>
         <outSequence>
            <drop/>
         </outSequence>
      </resource>
   </api>

So what we are doing here is exposing an API endpoint to northbound applications to send an XML payload which will be sent to an endpoint of type iso8583.  So how does the WSO2 ESB configuration know exactly what this iso8583:// is?  For this, we need to put the following entry on axis2.xml under $ESB_HOME/repository/conf/axis2 which will instruct the ESB core to pick our transport sender for an endpoint of iso8583 type.

<transportSender name="iso8583" class="org.wso2.iso8583.transport.ISO8583TransportSender"/>

I am going to keep it simple and will be sending the following XML from the northbound API in this sample scenario, but this is by no means a proper XML message structure you might want to use for your real EAI scenario.
<iso8583message>
       <config>
              <mti>1800</mti>
       </config>
       <data>
              <field id="3">110</field>
              <field id="5">4200.00</field>
              <field id="48">Simple Credit Transaction</field>
              <field id="6">645.23</field>
              <field id="88">66377125</field>
       </data>
</iso8583message>

In simple terms, what I have here is a set of values defined against several field types specified in jposdef.xml.  
Going back to our code, the first thing we will do is to convert the XML payload to an ISOMsg object.

       public ISOMsg toISO8583(MessageContext messageContext) throws AxisFault {
              SOAPEnvelope soapEnvelope = messageContext.getEnvelope();
              OMElement isoElements = soapEnvelope.getBody().getFirstElement();

              ISOMsg isoMsg = new ISOMsg();

              @SuppressWarnings("unchecked")
              Iterator<OMElement> fieldItr = isoElements.getFirstChildWithName(
                           new QName(ISO8583Constant.TAG_DATA)).getChildrenWithLocalName(
                           ISO8583Constant.TAG_FIELD);

              String mtiVal = isoElements
                           .getFirstChildWithName(new QName(ISO8583Constant.TAG_CONFIG))
                           .getFirstChildWithName(new QName(ISO8583Constant.TAG_MTI))
                           .getText();

              try {
                     isoMsg.setMTI(mtiVal);

                     while (fieldItr.hasNext()) {

                           OMElement isoElement = (OMElement) fieldItr.next();

                           String isoValue = isoElement.getText();

                           int isoTypeID = Integer.parseInt(isoElement.getAttribute(
                                         new QName("id")).getAttributeValue());

                           isoMsg.set(isoTypeID, isoValue);

                     }

                                  return isoMsg;


If you want to get a deeper understanding of AXIOM, you should perhaps refer to [5].  As per our scenario, what I am doing here is to iterate through the list of fields coming from northbound payload (encapsulated within an OMElement object), and populate the ISOMsg object.  Further, I am using the config section of my XML payload to encapsulate the request/response level meta-data for my simple transaction.  I am sure there is more to that if you read the full ISO8583 spec, but it is not the intention to cover that all here.
Now that we have our ISOMsg object populated, the next step is to initialize a “Packager” instance that corresponds to our ISO8584 field definition.


ISOPackager packager = new GenericPackager(this.getClass().getResourceAsStream("jposdef.xml"));

Ok, I guess we are all set to send it down the wire now.  As a matter of fact, the whole purpose of the Transport Sender is to do exactly that, so let’s use the jPOS API to do that for us.  Again, if you are writing your own socket level code, this part will be a bit more than just 3-4 lines of code.

ASCIIChannel chl = new ASCIIChannel(isoURL.getHost(),
                                  isoURL.getPort(), packager);
                    
                     chl.connect();
                     chl.send(isoMsg);
                     chl.disconnect();

So for this sample, I am using the ASCIIChannel to send my ISO payload to the server, and please feel free to explore the other delivery channels provided by jPOS when you run this code.
So… where is our ISO server then? Thankfully, jPOS provides a mock server implementation to test our scenario, and typically there should not be any change to the code if you have the option of connecting to a real ISO8583 server endpoint.   Well… I am not at all suggesting you should try and make a credit card transaction on a real production environment whilst testing this code.

       static final String hostname = "localhost";
       static final int portNumber = 5000;


       public static void main(String[] args) throws ISOException {
             

              ISOPackager packager = new GenericPackager("jposdef.xml");
              ServerChannel channel = new ASCIIChannel(hostname, portNumber, packager);
              ISOServer server = new ISOServer(portNumber, channel, null);

              server.addISORequestListener(new MockISO8583Server());

              System.out.println("ISO8583 server started...");
              new Thread(server).start();
       }




Normally I use the Advanced REST client on Chrome to send my XML payload, but you may use your favorite REST client to fire the request to the API endpoint we’ve configured on the WSO2 ESB.  Just make sure that you have the MockISO8583Server running before you do this.


If everything goes well, you should see the following output on you MockISO8583Server console.

ISO8583 incoming message on host [127.0.0.1]
ISO8583 Message received...
----ISO MESSAGE-----
  MTI : 1800
    Field-3 : 000110
    Field-5 : 000004200.00
    Field-6 : 000000645.23
    Field-48 : Simple Credit Transaction!!
    Field-88 : 0000000066377125
--------------------

You can download the codebase from here

What’s next?

Well, technically speaking, the pattern we have used here should be adaptable to most banking/financial scenarios and messaging protocols. In my next post, I am planning to cover an implementation of a Transport Listener to handle the responses back from the ISO8583 endpoint and presenting as an XML payload to the northbound.

References