Wednesday, November 2, 2011

Correlate one event into multiple instances of the same composite in SOA 11g

Inspired by the blog written by Lucas on this topic , I thought of extending it by providing an implementation for the same ( with a slightly different approach).

The Use-Case is as follows :


We have a SOA Composite Application for the Order process. Whenever a customer places an order, a new instance of this process kicks off. At any one time we will potentially have multiple instances for orders from the same customer. These instances are uniquely identified by the order id.

It happens that the F&A department – because of for example financial difficulties with a certain customer or government regulations regarding certain countries – publishes an event: “Terminate Customer”. This event should result in having all running Orders for that Customer being terminated


In Simple terms, we need to fire an event that abruptly terminates all the running instances of the Order Process for a specific customer.


The Approach :

Customer's can place an Order using the operation 'process' of Order Process bpel component exposed as SOAP service to the external users. The OrderProcess component has another operation 'terminate' that is used to terminate the order instance whose OrderId (the correlation key) matches with the orderId passed in the payload.while terminating the order we match it with the same correlation key(defined earlier in the process) to terminate the right instance of the process.

When the Organisation processing the orders intends to terminate a specific customer's(identified by customerId) running orders , it calls another bpel process with the customerId as its payload. This BPEL process in turn calls a java api ( using Spring Component) passing the customerId as input. The api responses back to the call returning it with the list of all OrderId's for that CustomerId. For each of these OrderId's an AbortEvent is published .

The Abort Event is subscribed by another BPEL process which in turn calls the 'terminate' operation of the OrderProcess bpel compoenent passing the OrderId as input.
Back inside the OrderProcess a mid process receive which is waiting on the terminate operation wakes up and terminates the instance(based on the OrderId correlation key) after doing neccessary processing.

Depicted below the process:

a) The Composite



b) OrderProcess BPEL Component:




c) BPEL Component that calls the Java Api ( using Spring Context Component) to get the Order Id's for which Abort Order Event is published.





d) Spring Component :

The following entry in the SpringContext.xml file is added :

Contents of "MyInterface.java":

public interface MyInterface
{
List getInstancesWithCompositeSensorFilter(String customer);
}

Contents of "MyInterface.class":

package com.sridhar;

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;


import javax.naming.Context;



import oracle.soa.management.facade.bpel.BPELInstance;
import oracle.soa.management.facade.ComponentInstance;
import oracle.soa.management.facade.Composite;
import oracle.soa.management.facade.CompositeInstance;
import oracle.soa.management.facade.Locator;
import oracle.soa.management.facade.LocatorFactory;
import oracle.soa.management.facade.Sensor;



import oracle.soa.management.util.ComponentInstanceFilter;
import oracle.soa.management.util.CompositeInstanceFilter;
import oracle.soa.management.util.Operator;
import oracle.soa.management.util.SensorFilter;


public class MyClass {
public MyClass() {
super();
}

public List getInstancesWithCompositeSensorFilter(String customer){
System.out.println("Customer Name ---------> "+customer);

List instaceList = new ArrayList();
Locator locator = null;
Hashtable jndiProps = new Hashtable();
jndiProps.put(Context.PROVIDER_URL, "t3://localhost:8001/soa-infra");
jndiProps.put(Context.INITIAL_CONTEXT_FACTORY,
"weblogic.jndi.WLInitialContextFactory");
jndiProps.put(Context.SECURITY_PRINCIPAL, "weblogic");
jndiProps.put(Context.SECURITY_CREDENTIALS, "weblogic123");
jndiProps.put("dedicated.connection", "true");

try
{
locator = LocatorFactory.createLocator(jndiProps);

Composite composite =
(Composite)locator.lookupComposite("default/SpringProjectTrials!1.0");


CompositeInstanceFilter compInstFilter = new CompositeInstanceFilter();
List sFilterList = new ArrayList ();
SensorFilter sFilter =
new SensorFilter("customerIdCompositeSensor" /* sensorname */,
Sensor.SensorDataType.STRING /* type */,
Operator.EQUALS /* operator for comparison */,
customer);
sFilterList.add(sFilter);

compInstFilter.setSensorFilter(sFilterList);
compInstFilter.setCompositeDN(composite.getDN());

List compInstances =
composite.getInstances(compInstFilter);

if (compInstances != null && compInstances.size()>0) {


for (CompositeInstance instance : compInstances)
{
// setup a component filter
ComponentInstanceFilter cInstanceFilter = new ComponentInstanceFilter ();
// get child component instances ..
List childComponentInstances = instance.getChildComponentInstances(cInstanceFilter);
// for each child component instance (e.g. a bpel process)

for (ComponentInstance cInstance : childComponentInstances)
{
BPELInstance bpelInstance = (BPELInstance)cInstance;
//The OrderId which is corelation key for the Order Process BPEL is set as Index 1
instaceList.add(bpelInstance.getIndex(1));
}
}
}//if
else{
System.out.println("====InstaceList -->No Elements Found ===");
}
return instaceList;
}//try
catch (Exception e) {
return instaceList;
}
}
}

e) The BPEL Process Subcribed(Listening) to AbortOrder Event , that inturn calls 'terminate' operation on the Order Process.



That's it about implementing this.


Some Clarifications/ Open Questions :

1) You would have got a doubt about why I created another bpel process ( listed in section 'e' above , that subcribes to the Abort event and routing the same to the terminate operation of the order process , instead of having the mid process recieve activity in the Order Process to directly listen to the Abort Event.

Reason : Having a mid process recieve activity of an event type and setting the correlation set same as defined in the initial receive gives me an error. SCA-50012. I didn't get the straightforward description of the error in the logs , but by trial and error i understood that doing so is probably not supported in this release.

2) In the Spring Context file while retrieving the OrderId's, I am not sure of how to retrieve the field's value of OrderId in the input paylod. ( In 10 g I know we have getField() method on IInstanceHandle interface where the same is not supported in 11g). As a workaround I have created Index( here First Index) on the that field in the Order BPEL process and used the following api to retrieve the Index value thus getting the required OrderId's.

BPELInstance bpelInstance = (BPELInstance)cInstance;
instaceList.add(bpelInstance.getIndex(1));

There should be a better way to do this by directly getting the feild's value. I appreciate input from the blog readers on how to do this as I spent enough time to find how this can be done but unsuccessful.

Comments are welcome...

Friday, July 15, 2011

Using ws-adressing to callback to another process.

Scenario :
1. Service-A.
2. Service-B.
3. Service-C ("Asynchronous request-response" service )

Service C is a legacy service that takes an input from a client , process it and then response back to the same client (by default ). However your business is in such a way that your response should not go to the initial caller(Service-A) but to another process(Service-B). Depicted below is the same:



For this , you may need to tweak the SOAP Headers / WS-Adressing properties by sending the WSA ReplyTo to the Service-C's URI in the call from Service-A to Service-C.


ie., The Service-C's input looks like this:

<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
<span style="font-weight:bold;"> <soap:Header xmlns:ns1="http://schemas.xmlsoap.org/ws/2003/03/addressing">
<ns1:MessageID>ws:uniqueAddress</ns1:MessageID>
<ns1:ReplyTo>
<ns1:Address>http://localhost:8001/soa-infra/services/default/SOATestProject/ServiceB?WSDL</ns1:Address>
</ns1:ReplyTo>
</soap:Header>
</span> <soap:Body xmlns:ns1="http://xmlns.oracle.com/TestApplication_jws/SOATestProject/MyAsynchronousService">
<ns1:process>
<ns1:input>Order123</ns1:input>
</ns1:process>
</soap:Body>
</soap:Envelope>

As you see we give the Service-B's WSDL(http://localhost:8001/soa-infra/services/default/SOATestProject/ServiceB?WSDL) in the Reply To tag.

However care has to be taken while creating Service B that you use the same port type and message types that Service C uses to callback. Otherwise you may end up with exception : javax.xml.ws.soap.SOAPFaultException: oracle.fabric.common.FabricException: Unable to find operation: null

Conclusion:


Not changing your legacy Asynchronous request/response service,you can route the call back response to another service apart from the caller using WS-Addressing.

Monday, July 11, 2011

FTP Adapter - Reading File Name that changes for each instance

We often end up in some business requirements where we need to read a file or write to a file where file name is decided based on the message of the instance.

Consider an example where we have one order process which calls another application asynchronously to validate the order. If the validation is successful , it places a file in a FTPshare with name _Success.xml else _Failed.xml. Now the Order process instance will need to read the file from that FTP share. So here the requirement is to read a file whose name would change for every Order ( ie., for every instance).
So here we need to dynamically change the "file name" of the FTPAdapter configuration.

Providing FileName Dynamically :

1. Create a OutboundHeaderVar(lets say outHeaderVar) in the scope. This is of messageType {http://xmlns.oracle.com/pcbpel/adapter/ftp/}OutboundHeader_msg. (Or) you can select the message Type -> click on browse -> Type Explorer -> Project WSDL files -> Select ftpAdapterOutboundHeader.wsdl -> MessageTypes->OutboundHeader_msg.

2. Drag an assign activity and select the from-spec query like for eg : concat('bpws:getVariableData("orderNumber")','_Success.xml').In the To-Spec Query select
outHeaderVar/outboundHeader Query = "/ns2:OutboundFTPHeaderType/ns2:fileName"

This will assign the file name as required at the design time dynamically.

Tuesday, July 5, 2011

MQ Adapters with Opaque message type - Different ways to send a message.

MQ Adapters define two kinds of message payloads:

1. Opaque Schema
2. Schema file ( xsd) that contains a specified schema element.

Chosing opaque schema do not need to specify a message schema.By default the file/message is passed through in base-64 encoding.However

If the message is already base-64 encoded, the MQ Adapter will decode the message before placing in the Queue.


This is very useful certain times i) when the destination service who is listening/dequeuing to/from the MQ is expecting the message in the already decoded format.ii) (i) and when the destination is not confirmed to one message type and hence sender forced to use Opaque format.iii)when destination is listening to a message schema which has targetnamespace as null/empty.

For Case iii) situations , sender could have used the message xsd(destination is expecting) while sending instead of using an opaque schema , if the schema has a proper namespace(not null or empty) defined. As an empty target namespace xsd restricts the wizard of MQ adapter to define it , we are forced to use opaque schema.
Although it's not a right design for the destination app to have bad namespaces defined,we often end up in having this kind of services in the real scnarios.

So in these cases sender can encode the message to binary-64 before sending it to MQ so that the destination gets a decoded non binary64 message.

Given below is a sample code that encodes a message using a java embedding activity in BPEL process :

try
{

String input = (String)getVariableData("invServiceRequestStr");
String encoded;
com.collaxa.common.util.Base64Encoder Encoder = new com.collaxa.common.util.Base64Encoder();
encoded = Encoder.encode(input);
setVariableData("encodedMessageVar", encoded);
}
catch(Exception e)
{
e.printStackTrace();
}

Caveats:
1. If you are using 11g then you must use "oracle.soa.common.util.Base64Encoder" instead of com.collaxa.common.util.Base64Encoder.
In your .bpel you may need this :

FYI - This is present at - $JDEVHOME\soa\modules\oracle.soa.fabric_11.1.1\fabric-runtime.jar.

2. If your input to be converted is not plain string but element then use the below
String base64 = ((oracle.xml.parser.v2.XMLElement)getVariableData("MyElement")).getFirstChild().getNodeValue();

Summary :
Sender of the MQ message can encode the message which will be decoded by the MQ Adapater framework internally before placing into the queue ,if the schema used is a Opaque.

Friday, May 6, 2011

Batching and Throttling

In an enterprise application ,Some processes are called frequently for each message - for example Status update of an order, Logging or Auditing Services.
Using a plain Asynchronous communication would make the service overloaded and any underlying data service to take excess of load in given time. This will not be scalable approach.Instead we can use batching to group some messages in to a batch and introduce a delay between each batch. A batch is generally determined for an application by making a load test on the underlying data services and find out how many calls can it take at one point of time. Lets look at an example:

Client sends a message ORDER_STATUS that has multiple orders in it. All orders need to call updateStatus() web service to update their status in the database. Now there can be some thousands of ORDER's in one ORDER_STATUS message and the Web service cant handle the call to insert them all at a time. The web service lets say has a capability to scale up to 50 orders /5 secs. The best way is to send each batch one at a time with a delay of 4s between each batch.
The following snippet explains how we divide the message in to multiple batches:

BatchOrderStatusMessage.xsl :

.....After Imports ...
<code>
<pre>
<xsl:param name="partitionCount"/>
<xsl:template match="/">
<xsl:variable name="orderStatusCount"
select="count(/os:ORDER_STATUS/ORDERS/ORDER)"/>
<tns:OrderStatusCollection>
<xsl:call-template name="groupOrderStatus">
<xsl:with-param name="statusMessages"
select="/os:ORDER_STATUS/ORDERS/ORDER"/>
<xsl:with-param name="fields" select="/os:ORDER_STATUS/*[not(local-name()='ORDERS')]"/>
<xsl:with-param name="groupSize" select="number($partitionCount)"/>
<xsl:with-param name="counter" select='1'/>
</xsl:call-template>
</tns:OrderStatusCollection>
</xsl:template>

<xsl:template name="groupOrderStatus">
<xsl:param name="statusMessages"/>
<xsl:param name="fields"/>
<xsl:param name="groupSize"/>
<xsl:param name="counter"/>
<os:ORDER_STATUS>
<xsl:copy-of select="$fields"/>
<ORDERS>
<xsl:for-each select="$statusMessages[(position() <= number(($counter)*$groupSize))]">
<xsl:copy-of select="."/>
</xsl:for-each>
</ORDERS>
</os:ORDER_STATUS>
<xsl:variable name="nextStatusMessages"
select="$statusMessages[position() greater-than number(($counter)*$groupSize)]"/>
<xsl:if test="count($nextStatusMessages) greater-than 0">
<xsl:call-template name="groupOrderStatus">
<xsl:with-param name="statusMessages" select="$nextStatusMessages"/>
<xsl:with-param name="fields" select="$fields"/>
<xsl:with-param name="groupSize" select="number($groupSize)"/>
<xsl:with-param name="counter" select="number($counter)"/>
</xsl:call-template>
</xsl:if>
</xsl:template>
</xsl:stylesheet>

In this partitionCount is the batch size which is passed via the parameters in the call to XSLT.ie.,

ora:processXSLT('BatchOrderStatusMessage.xsl',bpws:getVariableData('processOrderStatusInputVar','payload'),bpws:getVariableData('paramsVar'))

Where paramsVar is of type element "parameters" in the below XSD

<xsd:element name="parameters">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="item" minOccurs="1" maxOccurs="unbounded">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="name" type="xsd:string"/>
<xsd:element name="value" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
</xsd:element>


Summary : Using Batching technique may be helpful when the payload is very huge and we need to split them into multiple batches to process them further in the flow.