Monday, August 25, 2014

WLST Library: Work Manager configuration automation in weblogic 12c

Work Manager in weblogic server allows the application to optimize (or) control the execution of its work based on the requirement. Using Work Manager we can setup a dedicated thread pool via the constraints and dictate on how the application requests needs to executed.

Weblogic release 8 and before used different pre-configured Execute Queues like weblogic.kernel.default(Default Queue),weblogic.admin.HTTP/RMI(protocol based) to perform different kinds of work.The later releases of weblogic started using a single self-tuning threading model which is used to execute all kinds of work. Weblogic server will automatically prioritise the work based on the configurations and also the runtime metrics.

The user can create additional Work Manager specific for their applications and provide constraints to dictate how its work needs to be prioritised. By default the total shared capacity for all Work Managers for a server is '65536' requests and they can be configured.


Some of the more common scenarios for using Work Managers are to handle the following :
  • Throttling the number of requests to applications to avoid overloading them.Most of the legacy applications cannot handle huge overload and care should be taken to load the application with only the number of requests that it can handle.
  • Throttle the number of requests to low-priority applications(Notification Service, Logging Service, etc.) to avoid over-servicing them.These applications can be throttled to avoid overloading of servers during the peak time so that critical services can be provided with more resources
A work manager consists of the following components:
  • Request classes - They provide a scheduling guideline on how the threads needs to allocated to the application. By defining the request classes, we can dictate that high-priority applications needs to be serviced before the low-priority ones.
    • Fair-Share - Specifies relative thread usage share required to process requests. Default value is 50
    • Response-Time - The thread usage share required to process requests is determined by the reponse-time goal provided.
    • Context - Assigns request classes to requests based on context information, such as the current user or the current user's group.
  • Constraints - Dictates the minimum, maximum & total capacity threads allocated for a Work Manager to execute its requests.
    • Minimum Threads Constraint - The guaranteed number of threads allocated for the work manager.
    • Maximum Threads Constraint - The maximum number of threads that can be allocated for the work manager.
    • Capacity Constraint - Total capacity of requests that can be executed & queued for servicing. Any requests over the capacity will be rejected.
    In SOA 12c, the invoke/engine & system threads have been replaced through 'Work Manager Groups'. Each Partition will now need to be associated with a Work Manager Group and it dictates how invoke/engine & system threads are allocated to it. The below figure shows the work manager allocation for the 'default' partition. I will write a detailed article about this feature at a later date.


    In this article I will show how to automate creation of Work Managers with constraints through WLST. Lets reuse the previous the library(WlstObject) that we created in our previous article.

    The below figure shows that a new package 'wm' has been created and it contains the module 'workManagerObject.py'


    The below script will allow creation of multiple Work Managers at the same time by providing the required constraints(Request Classes still needs to be implemented).  The script is idempotent in the sense, if the Work Manager & the constraints are already available, then it will just update it otherwise will create it.

    workManagerObject.py
    ###################################################
    #    Name -   workManageObject.py                 #
    #    Author - gkrishna                            #
    #    Version - 1.0                                #
    ###################################################
    
    
    import sys
    from java.io import FileInputStream
    from java.util import Properties 
    from weblogic.descriptor import BeanAlreadyExistsException
    import wlstModule
    import os
    print os.path.abspath("../../../")
    sys.path.append(os.path.abspath("../../../"))
    from base import WlstObject
    
    
    class WorkManagerObject(WlstObject):
        
        _workManagerProperties = Properties()
        _workManagers=None
        
        ###################################################
        #    Method  -__init__                            #
        #    __init__ method for WorkManagerObject        #
        #     class                                       #
        ###################################################
        def __init__(self,commonPropertiesFile,workManagerPropertiesFile):
            try:
                super(WorkManagerObject,self).__init__(commonPropertiesFile)
                self.defaultMBeanHierarchy='/SelfTuning/'+str(self.commonProperties.get("wls.domain.name"))
                self.print_message('__init__ method in WorkManagerObject invoked - Reading property file '+workManagerPropertiesFile+' for Work Manager configurations.')
                propertyInputStream = FileInputStream(workManagerPropertiesFile)
                self._workManagerProperties.load(propertyInputStream)
                
                self.connect()
                self.action()
                self.exit()
            except:
                self.print_message("Error executing __init__ method  - " +str(sys.exc_info()[0]) +" "+ str(sys.exc_info()[1]))
        
        ###################################################
        #    Method  - action                             #
        #    Overriden method to configure the Work       #
        #    Manager.                                     #
        ###################################################
        
        def action(self):
            try:
                workManagers=self._workManagerProperties.get("WorkManagers")
                if(workManagers==''):
                    raise 'No WorkManagers available in the properties file to configure.'
                else:
                    for workManager in workManagers.split():
                        self.name=self._workManagerProperties.get(workManager+('.name'))
                        self.targets=self._workManagerProperties.get(workManager+('.targets'))
                        self.targetType=self._workManagerProperties.get(workManager+('.targetType'))
                        ignoreStuckThread=self._workManagerProperties.get(workManager+('.ignoreStuckThread'))
    
                    if(self.targets=='' or self.targetType==''):
                            raise 'Invalid Target configuration provided for the work manager - '+workManager
                    
                    self.start_edit()
                    ########## Minimum Thread Constraints ##########
                    minThreadsConstraintName=self._workManagerProperties.get(workManager+('.minThreadsConstraint.name'))
                    minThreadsConstraintCount=self._workManagerProperties.get(workManager+('.minThreadsConstraint.count'))
                    if(minThreadsConstraintName!='' and minThreadsConstraintCount!=''):
                        self.print_message('Creating MinThreadConstraint '+str(minThreadsConstraintName) + ' with count '+str(minThreadsConstraintCount))
                        minThreadsConstraint=MinThreadsContraint(minThreadsConstraintName,minThreadsConstraintCount,self.targetType,self.targets,self.defaultMBeanHierarchy)
                        minThreadsConstraint.action()
                        minThreadsConstraintExists=True
                    
                    ########## Maximum Thread Constraints ##########
                    maxThreadsConstraintName=self._workManagerProperties.get(workManager+('.minThreadsConstraint.name'))
                    maxThreadsConstraintCount=self._workManagerProperties.get(workManager+('.minThreadsConstraint.count'))
                    if(maxThreadsConstraintName!='' and maxThreadsConstraintCount!=''):
                        self.print_message('Creating MaxThreadConstraint '+maxThreadsConstraintName + ' with count '+maxThreadsConstraintCount)
                        maxThreadsConstraint=MaxThreadsContraint(maxThreadsConstraintName,maxThreadsConstraintCount,self.targetType,self.targets,self.defaultMBeanHierarchy)
                        maxThreadsConstraint.action()
                        maxThreadsConstraintExists=True
                    
                    ########## Capacity Constraints ##########
                    capacityConstraintName=self._workManagerProperties.get(workManager+('.capacityConstraint.name'))
                    capacityConstraintCount=self._workManagerProperties.get(workManager+('.capacityConstraint.count'))
                    if(capacityConstraintName!='' and capacityConstraintCount!=''):
                        self.print_message('Creating CapacityConstraint '+capacityConstraintName + ' with count '+capacityConstraintCount)
                        capacityConstraint=CapacityConstraint(capacityConstraintName,capacityConstraintCount,self.targetType,self.targets,self.defaultMBeanHierarchy)
                        capacityConstraint.action()
                        capacityConstraintExists=True
                    
                    ########## Work Manager ##########
                    wlstModule.cd(self.defaultMBeanHierarchy)
                    self.print_message('Creating WorkManager '+self.name)
                    wlstModule.cd(self.defaultMBeanHierarchy)
                    
                    try:
                        wlstModule.cmo.createWorkManager(self.name)
                    except BeanAlreadyExistsException:
                        self.print_message(" Work Manager "+self.name+" already exists")
                        pass
                    
                    wlstModule.cd(self.defaultMBeanHierarchy+'/WorkManagers/'+self.name)
                    self.set_targets()
                    if(minThreadsConstraintExists):
                        wlstModule.cmo.setMinThreadsConstraint(wlstModule.getMBean(self.defaultMBeanHierarchy+'/MinThreadsConstraints/'+minThreadsConstraintName))
                    if(maxThreadsConstraintExists):
                        wlstModule.cmo.setMaxThreadsConstraint(wlstModule.getMBean(self.defaultMBeanHierarchy+'/MaxThreadsConstraints/'+maxThreadsConstraintName))
                    if(capacityConstraintExists):
                        wlstModule.cmo.setCapacity(wlstModule.getMBean(self.defaultMBeanHierarchy+'/Capacities/'+capacityConstraintName))
                    if(ignoreStuckThread!=''):
                        wlstModule.cmo.setIgnoreStuckThreads(bool(ignoreStuckThread))
                    self.save_and_activate()
        
            except:
                self.print_message("Error executing create_workmanager method - " +str(sys.exc_info()[0]) +" "+ str(sys.exc_info()[1]))
                self.cancel_and_exit()
        
       
    class MinThreadsContraint(WlstObject):
        _count=None    
        
        ###################################################
        #    Method  -__init__                            #
        #    __init__ method for MinThreadsContraint      #
        #     class                                       #
        ###################################################
        def __init__(self,name,count,targetType,targets,defaultMBeanHierarchy):
            self.name=name
            self._count=count
            self.targetType=targetType
            self.targets=targets
            self.defaultMBeanHierarchy=defaultMBeanHierarchy
        
        ###################################################
        #    Method  - action                             #
        #    Overriden method to configure                #
        #    MinThreadsContraint for the Work             #
            #    Manager                      #
            ###################################################
        def action(self):
            wlstModule.cd(self.defaultMBeanHierarchy)
            try:
                wlstModule.cmo.createMinThreadsConstraint(self.name)
            except BeanAlreadyExistsException:
                self.print_message(" MinThreadsConstraints "+self.name+" already exists")
                pass
    
            wlstModule.cd(self.defaultMBeanHierarchy+'/MinThreadsConstraints/'+self.name)
            wlstModule.cmo.setCount(int(self._count))
            self.set_targets()
            
    class MaxThreadsContraint(WlstObject):
        _count=None
        
        ###################################################
        #    Method  -__init__                #
        #    __init__ method for MaxThreadsContraint   #
        #     class                      #
        ###################################################
        def __init__(self,name,count,targetType,targets,defaultMBeanHierarchy):
            self.name=name
            self._count=count
            self.targetType=targetType
            self.targets=targets
            self.defaultMBeanHierarchy=defaultMBeanHierarchy
    
            ###################################################
            #    Method  - action              #
            #    Overriden method to configure               #
            #    MaxThreadsContraint for the Work      #
            #    Manager                      #
            ###################################################
        def action(self):
            wlstModule.cd(self.defaultMBeanHierarchy)
            try:
                wlstModule.cmo.createMaxThreadsConstraint(self.name)
            except BeanAlreadyExistsException:
                self.print_message(" MaxThreadsConstraints "+self.name+" already exists")
                pass
    
            wlstModule.cd(self.defaultMBeanHierarchy+'/MaxThreadsConstraints/'+self.name)
            wlstModule.cmo.setCount(int(self._count))
            self.set_targets()
            
    class CapacityConstraint(WlstObject):
        _count=None
        
        ###################################################
        #    Method  -__init__                #
        #    __init__ method for CapacityContraint     #
        #     class                      #
        ###################################################
        def __init__(self,name,count,targetType,targets,defaultMBeanHierarchy):
            self.name=name
            self._count=count
            self.targetType=targetType
            self.targets=targets
            self.defaultMBeanHierarchy=defaultMBeanHierarchy
        
        ###################################################
            #    Method  - action              #
            #    Overriden method to configure               #
            #    CapacityContraint for the Work            #
            #    Manager                      #
            ###################################################
        def action(self):
            wlstModule.cd(self.defaultMBeanHierarchy)
            try:
                wlstModule.cmo.createCapacity(self.name)
            except BeanAlreadyExistsException:
                self.print_message(" CapacityConstraint "+self.name+" already exists")
                pass
    
            wlstModule.cd(self.defaultMBeanHierarchy+'/Capacities/'+self.name)
            wlstModule.cmo.setCount(int(self._count))
            self.set_targets()
            
    if __name__=="main":
        w = WorkManagerObject(sys.argv[1],sys.argv[2])
        
    Now we will see how to automate the deployment/creation of the work managers through Maven. With 12c, we have wlst goal in the maven-weblogic-plugin which allows to execute wlst scripts through Maven. In the case of 11g, we would be required to use the exec-maven-plugin to execute the python module using the weblogic.WLST class. Please note that the below code in the module has been added for simplicity to handle the base package import dependency. Ideally while integrating with Maven, we need to use the maven-dependency-plugin to bring the base package into the target folder to handle the dependency. But for now, we put in a relative path reference(hard-coded) to handle dependency.
    print os.path.abspath("../../../")
    sys.path.append(os.path.abspath("../../../"))
    
    Below is the weblogic 12c Maven example to create the Work Manager through Maven. pom.xml
    
     4.0.0
     
      techsamples.fmw
      parent
      1.0
     
     techsamples.fmw
     wm
     pom
     1.0
     wm
     
     
            /XXXXXXXX/wlst/base/dev_common.properties
            /XXXXXXXX/wlst/wm/dev_workmanager.properties
        
        
     
      
       
        
        com.oracle.weblogic
        weblogic-maven-plugin
        12.1.3-0-0
        
         /XXXXXXXXX/Oracle/bpm12c
        
        
         
          wlst-execute
          deploy
          
           wlst
          
          
           /XXXXXXXX/Oracle/bpm12c
           ${project.basedir}/workManagerObject.py
           ${common.propeties.file} ${wm.properties.file}
           true
           false
           true
           wls
          
         
        
       
    
    The script takes two property files :

    • dev_common.properties - Contains the environment related configurations and will be used to execute the control/edit wlst commands against the weblogic server.admin.target.server=localhost
    admin.target.port=7101
    admin.target.user=weblogic
    admin.target.password=weblogic1
    wls.domain.name=DefaultDomain
    
    • dev_workmanager.properties - Contains the work manager related configurations.
    ###################################################
    # Name -   dev_workmanager.properties   #
    # Author - gkrishna    #
    # Version - 1.0      #
    ###################################################
    
    WorkManagers=TestWorkManager1 TestWorkManager2
    
    ########## Work Manager ##########
    TestWorkManager1.name=TestWorkManager1
    TestWorkManager1.targets=DefaultServer
    TestWorkManager1.targetType=Server
    TestWorkManager1.ignoreStuckThread=True
    
    TestWorkManager2.name=TestWorkManager2
    TestWorkManager2.targets=DefaultServer
    TestWorkManager2.targetType=Server
    TestWorkManager2.ignoreStuckThread=True
    
    ########## Minimum Threads Constraint ##########
    TestWorkManager1.minThreadsConstraint.name=TestWorkManager1MinThreadsConstraint
    TestWorkManager1.minThreadsConstraint.count=10
    
    TestWorkManager2.minThreadsConstraint.name=TestWorkManager2MinThreadsConstraint
    TestWorkManager2.minThreadsConstraint.count=20
    
    ########## Maximum Threads Constraint ##########
    TestWorkManager1.maxThreadsConstraint.name=TestWorkManager1MaxThreadsConstraint
    TestWorkManager1.maxThreadsConstraint.count=20
    
    TestWorkManager2.maxThreadsConstraint.name=TestWorkManager2MaxThreadsConstraint
    TestWorkManager2.maxThreadsConstraint.count=30
    
    ########## Capacity Constraints ##########
    TestWorkManager1.capacityConstraint.name=TestWorkManager1CapacityConstraint
    TestWorkManager1.capacityConstraint.count=40
    
    TestWorkManager2.capacityConstraint.name=TestWorkManager2CapacityConstraint
    TestWorkManager2.capacityConstraint.count=40
    
    
    Before we execute the pom file to create the work managers, lets take a snapshot of the existing work managers to identify the difference after executing the script.
    Lets execute the pom file to create the work manager.
    mvn clean deploy
    


    Lets check in console to verify the creation of Work Manager:


    Sunday, August 24, 2014

    WLST Library : Creating a base library for the WLST scripts

    As part of the continuos delivery it is very critical that we automate the deployment of each and every deployable unit with in our application landscape and managing weblogic resources used by our applications is no different.

    In the web, there are lot of arbitrary WLST script examples for creating such weblogic resources like JDBC Data Sources, JMS Queues/Topics/Servers, etc. but I didn't come across any libraries which can be extended to bring reusability in more Object Oriented way. Though python is not strongly object oriented(loose encapsulation), I still give it a shot :)

    The IDE I prefer to use for writing WLST scripts is Eclipse with OEPE though jEdit is also equally good. Lets first create our base library with WlstObject class which provides us with the capability to manage the Weblogic Server environment. In the later blogs, I will show you on how the library can be extended and can be integrated with the continuos delivery using Maven.

    Creating a new eclipse Faceted project called 'wlst'





    Add the python interpreter to load the default python libraries.



    Create a new folder 'base' and add a __init__.py file in it. The __init__.py acts a marker to mark the 'base' directory as 'base' package in python. So when we create any classes/modules within this directory, we can access them via the 'base' package. For now just leave this file as empty.

    Lets create the base module 'wlstObject.py' with class WlstObject.  I have tried to stick to the python coding standards so variable/method definitions may sound a bit weird. There are lot of good articles out there to understand how they need to be defined, so I am not going to talk about them in this article.


    WlstObject.py

    # ##################################################
    # Name -   wlstObject.py           #
    # Author - gkrishna          #
    # Version - 1.0           #
    ###################################################
    #
    import sys
    from java.io import FileInputStream
    
    from java.util import Properties
    import wlstModule
    from datetime import datetime
    
    
    class WlstObject(object):
        commonProperties = Properties()
        defaultMBeanHierarchy = None
        name = None
        targets = None
        targetType = None
    
        ###################################################
        # Method  -__init__             #
        # __init__ method for base class        #
        ###################################################
        def __init__(self, commonPropertiesFile):
            try:
                self.print_message(
                    '__init__ method in WlstObject invoked - Reading property file ' + commonPropertiesFile + ' for common environment configurations.')
                propertyInputStream = FileInputStream(commonPropertiesFile)
                self.commonProperties.load(propertyInputStream)
            except:
                self.print_message(
                    "Error while executing the __init_method - " + str(sys.exc_info()[0]) + " " + str(sys.exc_info()[1]))
                wlstModule.exit()
    
        ###################################################
        # Method  - connect             #
        # Establishes connection to the         #
        # WLS Environment            #
        ###################################################
        def connect(self):
            try:
                print self.commonProperties.getProperty("admin.target.server")
                adminUrl = "t3://" + self.commonProperties.getProperty(
                    "admin.target.server") + ":" + self.commonProperties.getProperty("admin.target.port")
                adminUser = self.commonProperties.getProperty("admin.target.user")
                self.print_message("Trying to connect to the admin server - " + adminUrl + " with user - " + adminUser)
                wlstModule.connect(adminUser, self.commonProperties.getProperty("admin.target.password"), adminUrl)
            except Exception, e:
                self.print_message("Error while connecting to the server - " + adminUrl + " - " + str(e))
    
        ###################################################
        # Method  - start_edit            #
        # Starts the edit session          #
        ###################################################
        def start_edit(self):
            try:
                wlstModule.edit()
                wlstModule.startEdit()
            except:
                self.print_message("Error while starting the edit session - " + " - " + str(sys.exc_info()[0]) + " " + str(
                    sys.exc_info()[1]))
                self.cancel_and_exit()
    
        ###################################################
        # Method  - save_and_activate        #
        # Saves & Activates the edit session        #
        ###################################################
        def save_and_activate(self):
            try:
                wlstModule.save()
                # wlstModule.activate()
            except:
                self.print_message(
                    "Error while activating the edit session - " + " - " + str(sys.exc_info()[0]) + " " + str(
                        sys.exc_info()[1]))
                self.__cancel_and_exit__()
    
        ###################################################
        # Method  - cancel_and_exit    #
        # Cancel the edit session & Exit       #
        ###################################################
        def cancel_and_exit(self):
            wlstModule.cancelEdit('y')
            self.exit()
    
        ###################################################
        # Method  - exit           #
        # Exit the WLST environment            #
        ###################################################
        def exit(self):
            wlstModule.disconnect();
            wlstModule.exit();
    
        ###################################################
        # Method  - get_time           #
        # Get the formatted timestamp          #
        ###################################################
        def get_time(self):
            return str(datetime.now())
    
        ###################################################
        # Method  - print_message          #
        # Utility method to print message         #
        ###################################################
        def print_message(self, msg):
            print self.get_time() + " - " + self.__class__.__name__ + " - " + msg
    
        ###################################################
        # Method  - set_targets          #
        # Utility method to set the targets       #
        ###################################################
        def set_targets(self):
            for target in self.targets.split(','):
                wlstModule.set('Targets', wlstModule.jarray.array(
                    [wlstModule.ObjectName('com.bea:Name=' + target + ',Type=' + self.targetType)], wlstModule.ObjectName))
    
        ########## Action Methods ##########
    
        ###################################################
        # Method  - action           #
        # Abstract method to intiate WLST actions       #
        # the respective weblogic MBEAN(s)       #
        ###################################################
        def action(self):
            raise NotImplementedError("Implement this method in the extended sub class")
    
    The WlstObject class contains common methods to execute control & editing WLST  commands. All the specific resource level classes(ex - JDBC, JMS, Work Manager)  will extend this base class.

    One of the important point to note here is that, since we are executing the  WLST commands from a library, they are not imported by default. Hence we need to explicitly import the WLST commands as a jython module.

    A sample script is  already available at WLS_HOME/common/wlst/modules/wlstModule.py and we will be  using it in our modules. Otherwise we can use the writeInitFile('<file name>') WLST command to create the module and reference it in our libraries.

    Now lets update the __init__.py file with the following:
    from wlstObject import WlstObject
    
    This would allow all the referencing libraries to import the 'WlstObject' class from the 'base' package. (i.e) from base import WlstObject.

    In the coming segments, we will extend from this base class to create the weblogic resources.

    Saturday, August 16, 2014

    Hidden Feature in Weblogic Admin Console - WLST recording

    WLST is the core to what you can do in weblogic and it hasn't been a straight forward tool in terms of learning the MBEAN structure and automate managing all the services available in weblogic

    Luckily the admin console provides recording capability which allows you record the commands that gets executed behind the scenes whilst you perform actions on the console. This gives a opportunity to reuse those scripts and understand the MBEAN structure so that we can parameterize the same to automatic the actions going forward.

    Lets see how we can record the WLST commands from the admin console.


    Now lets create a JDBC data source and see what happens behind the scenes.

    Now the data source has been created. Now lets end the recording and check the content of the generated jython script.




    The script contains the equivalent commands for all the actions that we did. Now we don't need to google to find how to automate WLST scripts... just record & learn :)


    OSB 12c Templates for Proxy Pipeline

    Oracle Service Bus allows mediation and manage the interactions between disparate systems through a common canonical model & messaging infrastructure. Though the product provide enriched tools to orchestrate complex integration requirements, generally it is  meant for virtualising the interactions between different applications and hence most of the time follows a predictable orchestration pattern in the message flow. In 11g, we have to model these repeatable message flows in every proxy and with 12c we don't need to do that anymore. Luckily with 12c, we have the Proxy Pipeline templates, which allows us to model these repeatable message flows into a template and let the concrete Piplelines generated out of it.

    As a first example, we will model a Pipeline Template for a WSDL based message interaction.







    Lets take a look at the design palette for the message flow. The highlighted section in the below image shows the placeholders that can be added to the template to provide overriding capability to the concrete pipeline. 



    Nodes - Placeholders for Pipeline pair, Branch & Route nodes.
    Stages - Placeholders for one or more Stages, which in turn can contain actions.
    Actions - Placeholders for one or more Actions.
    Route - Placeholder for Route node.
    Conditional  - Placeholder for a Conditional Branch
    Operational - Placeholder for a Operational Branch

    The example template acts as the core template for any WSDL based pipeline and includes the below capabilities :
    1)Mapping EBMHeader(Ex:- AIA) information to shared variables to make reporting more meaningful to handle the message correlation. (i.e) the keys can be derived from the information available in the header.
    2)Report Request & Response payloads
    3)Placeholder for a Operational Branch and create a placeholder route action fora a valid business/proxy service in the available branch.
    4)Provide a common error handling pattern.

    The final template would like below :


    The shared variables ebmID & entity key will hold the EBMID(from AIA Header) &  entity specific id from the business entity respectively. They will acts as the keys for the reporting. Also, another feature in templates is that you can lock  a particular action from being changed at the concrete pipeline.


    Lets generate a concrete pipeline:







    On clicking the finish button, the generated concrete pipeline/proxy will look like below :





    The placeholders are now turned concrete and we can add more components into as per required.

    For some reason if we think we need to break the template link, then we can go to Pipeline -> Configuration -> click on the 'Break Template Link' button as shown  below:


    Breaking the template link will retain only the concrete elements in the pipeline. We need to be careful as breaking the template link will automatically save the changes (though there will be a warning popup shown to confirm the changes).. there is no going back as far as I could see..

    Friday, August 15, 2014

    Unwritten Rule with Oracle BPM Parallel Gateway with Multi-Instance processing

    There are numerous scenarios that requires the usage of the multi-instance sub-process in BPM. Lets consider an example of an Order entity with multiple OrderLines. For each status we require to do different processing and once all the processing is complete, we require to merge the final outcome into the Order entity.

    The implementation pattern we will follow is to use the parallel gateway connecting different multi-instance sub-process each filtering their respective order lines and do the processing. Once everything is done they will converge at the outgoing parallel gateway.

    The below xsd represents  a simple schema for Order entity:

    <?xml version= '1.0' encoding= 'UTF-8' ?>
    <schema xmlns="http://www.w3.org/2001/XMLSchema" xmlns:xsd="http://www.w3.org/2001/XMLSchema"
         xmlns:qt="http://www.mycompany.com/ns/order" targetNamespace="http://www.mycompany.com/ns/order"
         elementFormDefault="qualified">
        <annotation>
            <documentation xml:lang="en">Order</documentation>
        </annotation>
        <element name="order" type="qt:OrderType"/>
        <element name="orderLine" type="qt:OrderLineType"/>
        <complexType name="OrderType">
            <sequence>
                <element name="orderID" type="string"/>
                <element name="customerName" type="string"/>
                <element name="customerAddress" type="string"/>
                <element name="orderLine" maxOccurs="unbounded" minOccurs="0" type="qt:OrderLineType"/>
            </sequence>
        </complexType>
        <xsd:complexType name="OrderLineType">
            <xsd:sequence>
                <xsd:element name="orderLineID" maxOccurs="1" type="xsd:string"/>
                <xsd:element name="itemId" maxOccurs="1" type="xsd:string"/>
    <xsd:element name="quantity" maxOccurs="1" type="xsd:int"/> <xsd:element name="status" maxOccurs="1" type="xsd:string"/> </xsd:sequence> </xsd:complexType> </schema>

    Lets create the business process for the scenario mentioned.


     Add 'order' element as the input to the bpmn process


    Associate the input to the process to the 'order' data object created.


    A draft process could look like below:


    Lets make it interesting, lets configure each sub-process to filter only their respective order lines. First for Pending Orders 


    XPATH expression to filter the pending order lines is shown below(we need to put the expression for both input & output loop data output) - 


    Similarly, do the same for approved orders also. The final process would look like :


    Now lets deploy the bpm process and test the process with the below input :-


    <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
     <soap:Body>
      <ns1:start
       xmlns:ns1="http://xmlns.oracle.com/bpmn/bpmnProcess/MultiInstanceProcess"
       xmlns:ns2="http://www.mycompany.com/ns/order">
       <ns2:order>
        <ns2:orderID>1</ns2:orderID>
        <ns2:customerName>Ganesh</ns2:customerName>
        <ns2:customerAddress>Melbourne</ns2:customerAddress>
        <ns2:orderLine>
         <ns2:orderLineID>100</ns2:orderLineID>
         <ns2:itemId>10</ns2:itemId>
         <ns2:quantity>2</ns2:quantity>
         <ns2:status>APPROVED</ns2:status>
        </ns2:orderLine>
        <ns2:orderLine>
         <ns2:orderLineID>101</ns2:orderLineID>
         <ns2:itemId>33</ns2:itemId>
         <ns2:quantity>1</ns2:quantity>
         <ns2:status>APPROVED</ns2:status>
        </ns2:orderLine>
        <ns2:orderLine>
         <ns2:orderLineID>102</ns2:orderLineID>
         <ns2:itemId>34</ns2:itemId>
         <ns2:quantity>10</ns2:quantity>
         <ns2:status>PENDING</ns2:status>
        </ns2:orderLine>
       </ns2:order>
      </ns1:start>
     </soap:Body>
    </soap:Envelope>
    

    The audit trail for the above test is shown below. There are two threads processing the 'APPROVED' orders and one Thread for the 'PENDING' order.


    So far so good.. now lets MIX THE STATUSES in the input and test the process again. The input used is :-


    <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
     <soap:Body>
      <ns1:start
       xmlns:ns1="http://xmlns.oracle.com/bpmn/bpmnProcess/MultiInstanceProcess"
       xmlns:ns2="http://www.mycompany.com/ns/order">
       <ns2:order>
        <ns2:orderID>1</ns2:orderID>
        <ns2:customerName>Ganesh</ns2:customerName>
        <ns2:customerAddress>Melbourne</ns2:customerAddress>
        <ns2:orderLine>
         <ns2:orderLineID>101</ns2:orderLineID>
         <ns2:itemId>33</ns2:itemId>
         <ns2:quantity>1</ns2:quantity>
         <ns2:status>APPROVED</ns2:status>
        </ns2:orderLine>
        <ns2:orderLine>
         <ns2:orderLineID>102</ns2:orderLineID>
         <ns2:itemId>34</ns2:itemId>
         <ns2:quantity>10</ns2:quantity>
         <ns2:status>PENDING</ns2:status>
        </ns2:orderLine>
        <ns2:orderLine>
         <ns2:orderLineID>100</ns2:orderLineID>
         <ns2:itemId>10</ns2:itemId>
         <ns2:quantity>2</ns2:quantity>
         <ns2:status>APPROVED</ns2:status>
        </ns2:orderLine>
       </ns2:order>
      </ns1:start>
     </soap:Body>
    </soap:Envelope>
    



    The process fails with the error :- 

    Error Message: {http://docs.oasis-open.org/wsbpel/2.0/process/executable}mismatchedAssignmentFailure
    default/MultiInstanceProcess!1.0*soa_00de7c84-1a46-4324-a7f2-2071ef7a680f/MultiInstanceProcess/830003-ACT10651136503013MultiInstanceBlock_ACT10651136503013_End-ACT10651136503013MultiInstanceBlock_ACT10651136503013.3-2
    Aug 15, 2014 10:49:09 PM
    Non Recoverable System Fault :
    <bpelFault><faultType>0</faultType><mismatchedAssignmentFailure xmlns="http://docs.oasis-open.org/wsbpel/2.0/process/executable"></mismatchedAssignmentFailure></bpelFault>


    Why is the mismatchedAssignmentFailure error thrown in this case, if the order status are mixed. This is because bpm internally uses an function similar to the bpelx:copyList to copy the nodes and unless the list used to filter the status are contiguous, the above pattern will not work.

    From documentation -
    http://docs.oracle.com/cd/E23943_01/dev.1111/e10224/bp_manipdoc.htm#CIHICJGH

    6.14.6.1 bpelx:copyList in BPEL 1.1

    Example 6-65 provides an example of bpelx:copyList in a BPEL project that supports BPEL version 1.1.

    Example 6-65 bpelx:copyList Extension in BPEL 1.1
    <bpel:assign> 
        <bpelx:copyList>
           <bpelx:from ... />
           <bpelx:to ... /> 
        </bpelx:copyList>
    </bpel:assign>
    
    The from-spec query can yield a list of either all attribute nodes or all element nodes. The to-spec query can yield a list of L-value nodes: either all attribute nodes or all element nodes.
    All the element nodes returned by the to-spec query must have the same parent element. If the to-spec query returns a list of element nodes, all element nodes must be contiguous.


    MTOM/XOP example using JAX-WS / SOA Suite & OSB - 2

    Oracle SOA Suite

    Below we show how to consume the service that is MTOM enabled in Oracle SOA Suite. We implement it by attached the MTOM client WS-Policy to the reference binding as shown below :




    Oracle Service Bus

    Oracle Service Bus supports parsing inbound messages & sending outbound messages in MTOM/XOP format. This capability is available for any XML based (or) WSDL based services.

    The below image shows how to enable MTOM support as part of the message handling tab for the proxy services.

    Include Binary data by Reference - can be used for pass through scenarios where in we don't really need to parse the attachment (or) to pass the attachment directly to MFL or Java callout. The attachment will be available as part of $body/ctx:binary-content variable.

    Include Binary data by Value - can be used when you want to validate the xml schema (or) scenarios where the outbound service doesn't support MTOM. In this case, the xop:include will be replaced with the actual base64 binary content.

    One important point to note is that, if MTOM is enabled on a OSB Service we cannot attach any other OWSM policy to it( not mentioned in any documents as far as i know)


    MTOM/XOP example using JAX-WS / SOA Suite & OSB - 1

    Binary data sent using SOAP across the wire needs to be encoded using base64Encoding to convert it as text. This process will increase the size of data by 33%. Thus to optimize the transmission of xml data of type xs:base64Binary and xs:hexBinary in SOAP messages, W3C recommends usage of MTOM/XOP (Message Transmission Optimization Mechanism / XML Optimized Packaging). When the transport protocol is HTTP, MIME attachments are used to carry that data while at the same time allowing both the sender and the receiver direct access to the XML data in the SOAP message without having to be aware that any MIME artefacts were used to marshal the base64Binary or hexBinary data. MTOM primarily provides the following:
    • Encoding of the binary data
    • Removal the binary data from the SOAP Envelope
    • Compression of the Binary data
    • Attach the binary data to the MIME package
    • Pointer to the MIME package in the SOAP Envelope
    JAX-WS provides annotations support for MTOM/XOP through annotations.

    JAVA - XML Type conversion for MTOM

    The java types/classes javax.activation.DataHandler, java.awt.Image, java.xml.transform.Source are by default converted to xs:base64Binary or xs:hexBinary XML types. While the XML types xs:base64Binary or xs:hexBinary are converted to byte[] by default.

    One of the highlights of the MTOM specification is that, it doesn't mandate that always the data will be sent as attachments by default. It allows us to specify the threshold limit after which the data needs to sent as attachment. This way for lesser data need not be passed as attachment and thus can avoid unnecessary overhead.

    @MTOM(threshold=<Threshold Value>)

    Steps to Enable MTOM
    • Annotate the datatype that needs to be sent as attachment
    • Enable MTOM on both the Web Service & its client
    Example (Start with Java)

    Lets create the class 'DocumentManager' and define a method to archive the incoming document into the disc... The document sent across the wire will use MTOM providing an optimized transmission.


    public class DocumentManager {
        public DocumentManager() {
            super();
        }
        
        public void archiveDocument(javax.activation.DataHandler _pDocumentHandler) {
            try
            {
                //Hardcoding stuff here...you can get the content type to set the extension dynamically....
                File fileToArchive = new File("/tmp/"+"test.pdf");             
                FileOutputStream outputStream = new FileOutputStream(fileToArchive);
                _pDocumentHandler.writeTo(outputStream); //Writes the stream into the disk.
                outputStream.flush();
                outputStream.close();
                
            }
            catch(Exception e) {
                e.printStackTrace();
            }
            
        }

    Now lets expose the class as a JAX-WS web service and enable MTOM for the same:

     









    The generated service would look something like below:
    package fmw.techsamples.java.ws.mtom;
    
    import java.io.File;
    import java.io.FileOutputStream;
    
    import javax.jws.WebService;
    
    import javax.xml.ws.soap.MTOM;
    
    @WebService(endpointInterface = "fmw.techsamples.java.ws.mtom.DocumentManagerPortType")
    @MTOM(enabled=true,threshold=1024)
    public class DocumentManager {
        public DocumentManager() {
            super();
        }
        
        public void archiveDocument(javax.activation.DataHandler _pDocumentHandler) {
            try
            {
                //you can get the content type to set the extension dynamically....
                File fileToArchive = new File("/tmp/"+"test.pdf"); 
                
             FileOutputStream outputStream = new FileOutputStream(fileToArchive);
             _pDocumentHandler.writeTo(outputStream);
             outputStream.flush();
             outputStream.close();
                
            }
            catch(Exception e) {
                e.printStackTrace();
            }
            
        }
    }
    

    I have set the threshold  to 1024 bytes, which means till that threshold is reached MTOM will not be used. The below image shows the WSDL & XSD definition generated after deploying the web service.
    The WSDL would contain the WS policy definition for MTOM/XOP. Also, note that the XSD definition shows the input as 'base64Binary'



    Ok now to testing. We will use SOAP UI to do the testing, but to trap the http request that is sent through the wire to the server(to show that the request has been optimized through MTOM), we will make use of HttpAnalyzer in the jdeveloper. First we start  the HttpAnalyzer to listen in port 8099 and then set the same as the proxy in SOAP UI (via preferences as shown below).

    This would mean that all the request flows through HttpAnalyzer and we can trap the request sent across the wire. In the SOAPUI, we can use the Insert file as Base64 option to convert a pdf to base64binary (or you could use openssl or base64 command to do that easily in a mac). Also, we need to set the 'Enable MTOM' to true (equivalent to setting new MTOMFeature() in a java client)



    After executing the request, lets see the request sent via HttpAnalyzer. The below SOAP request shows the XOP-include element having the reference to binary data sent separately as MIME attachments.
    Also, the end pdf is shown written to the /tmp directory in the server. 


    In the next instalment I will show a quick sample on how to access the MTOM enabled service through Oracle SOA Suite / OSB.