Skip to Content

As we all know that Asynchronous messages are persisted in message queue and starts processing the pipeline steps. After executing each pipeline step the message is stored again in the message queue. This process is repeated for each pipeline step until the message is finally delivered to the receiver.

Diagram.png

Now I would like to go through a scenario. I have 100 different directories for each plant (ex: AU01, MK04 etc.) from where I need to poll files (QOS=EOIO) and my requirement is, message failure for one plant should not block the messages for other plants. For ex: a file for plant AU01 is picked and it fails, it should not block the message for other plants.

I can attain this requirement by dictating message for each plant to flow through its own queue. We have a setting in file sender communication channel for Quality of service EOIO, where we can specify a queue name. Hence I have two options left.

  1. Create 100 file sender communication channels (one for each plant) to poll files from corresponding directory paths. Each communication channel will have QOS = EOIO and queue name = plant name. This will make sure that message for a specific plant will get process using its own queue.
  2. Create a single file sender communication channel to poll files from 100 different plant locations and dynamically create a queue for each plant. When a file for a particular plant is picked, it’s placed in its respective queue and processed. Hence messages for various plants are processed in their own queues and failure for one plant will not affect messages for other plants.

First option is cumbersome because we have to create 100 file sender communication channels. If plants increase in future, then I need to create communication channel for newly added plant.  Hence I opted for second option.

Now my challenge is to dynamically create queue for each plant and process message for a particular plant in its own queue. I need to do this before the message enters Integration engine. Hence I have to do it in adapter level itself.

Hence I designed an adapter module which will solve my purpose. Before going through the adapter module, let’s see the directory structure and communication channel configuration:

Following is the directory path:

Directory Structure.png

Following is the communication channel configuration:

Comm_Channel_Config.png

Comm_Channel_Config_1.png

Comm_Channel_Config_2.png

The module “SetQueueID” will create queue dynamically according to plant name and put message in its own queue.

Following is the code for the module:

package com.sap.test;

import java.io.BufferedReader;

import java.io.ByteArrayInputStream;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.rmi.RemoteException;

import java.util.Hashtable;

import javax.ejb.EJBException;

import javax.ejb.SessionBean;

import javax.ejb.SessionContext;

import javax.ejb.Timer;

import javax.xml.parsers.DocumentBuilder;

import javax.xml.parsers.DocumentBuilderFactory;

import org.w3c.dom.Document;

import org.w3c.dom.Element;

import org.w3c.dom.Node;

import org.w3c.dom.NodeList;

import com.sap.aii.af.lib.mp.module.Module;

import com.sap.aii.af.lib.mp.module.ModuleContext;

import com.sap.aii.af.lib.mp.module.ModuleData;

import com.sap.aii.af.lib.mp.module.ModuleException;

import com.sap.engine.interfaces.messaging.api.*;

import com.sap.engine.interfaces.messaging.api.auditlog.*;

import com.sap.tc.logging.*;

/**

* Developed by Rakesh Sharma

*

*/

public class SetQueueID implements SessionBean, Module {

    private SessionContext sessionContext;

    public static final String VERSION_ID = “$Id://com/sap/pi/aco/src/_adapters/java/user/module/SetQueueID.java#1 $”;

    static final long serialVersionUID = 43922673128345712L;

    public void ejbActivate() throws EJBException, RemoteException {

        // TODO Auto-generated method stub

    }

    /* (non-Javadoc)

     * @see javax.ejb.SessionBean#ejbPassivate()

     */

    public void ejbPassivate() throws EJBException, RemoteException {

        // TODO Auto-generated method stub

    }

    /* (non-Javadoc)

     * @see javax.ejb.SessionBean#ejbRemove()

     */

    public void ejbRemove() throws EJBException, RemoteException {

        // TODO Auto-generated method stub

    }

    /* (non-Javadoc)

     * @see javax.ejb.SessionBean#setSessionContext(javax.ejb.SessionContext)

     */

    public void setSessionContext(SessionContext sessionContext) throws EJBException,

    RemoteException {

        this.sessionContext = sessionContext;

    }

    public void ejbCreate() throws javax.ejb.CreateException {

    }

    public ModuleData process(ModuleContext moduleContext, ModuleData inputModuleData)

    throws ModuleException {

        AuditAccess audit = null;

        Object obj = null;

        Message msg = null;

        MessageKey key = null;

        String sourceFileName = null;  

        String sourceDirectory = null;

        String sourceDirectoryFileName = null;

        String targetQueue = null;

        String fileName = null;

        try {

            obj = inputModuleData.getPrincipalData();

            msg = (Message) obj;

            key = new MessageKey(msg.getMessageId(),msg.getMessageDirection());

            audit = PublicAPIAccessFactory.getPublicAPIAccess().getAuditAccess();

            audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, “SetQueueID Module called”);

          }

          catch (Exception e) {

            ModuleException me = new ModuleException(e);

            throw me;

          }

        try

        {

              Hashtable mp = (Hashtable) inputModuleData.getSupplementalData(“module.parameters”);// Get the Supplemental data which is available as hash table

              if (mp != null)

              {

                    sourceDirectoryFileName = (String)mp.get(“FileName”); // Get the file name which is being processed

                    audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” sourceDirectoryFileName : “+sourceDirectoryFileName);

                    sourceFileName = sourceDirectoryFileName.substring(sourceDirectoryFileName.lastIndexOf(“\\”)+1);

                    audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” sourceFileName : “+sourceFileName);

                    sourceDirectory = sourceDirectoryFileName.substring(0,sourceDirectoryFileName.lastIndexOf(“\\”)); // Get the directory which is being processed

                    audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” sourceDirectory : “+sourceDirectory);

              }

//         

                     targetQueue = sourceDirectory.substring(sourceDirectory.lastIndexOf(“\\”)+1);

                       targetQueue = “Q_” + targetQueue.toUpperCase();

              audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” targetQueue : “+targetQueue);

              msg.setSequenceId(targetQueue);

              audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” after msg.setSequenceID .”);

              audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” SetQueueID : Custome Queue is set to the SequenceID .”);

        }

        catch(Exception e)

        {

              audit.addAuditLogEntry(key, AuditLogStatus.ERROR, ” Cannot assign the Custom Queue to the SequenceID”);

        }

        return inputModuleData;

}

}

In the above code, in the process method I have created objects for Object, Message, MessageKey, AuditAccess. In following lines of code I have set the sequenceID for the message:

Hashtable mp = (Hashtable) inputModuleData.getSupplementalData(“module.parameters”);// Get the Supplemental data which is available as hash table

if (mp != null)

{

   sourceDirectoryFileName = (String)mp.get(“FileName”); // Get the file name which is being processed

   audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” sourceDirectoryFileName : “+sourceDirectoryFileName);

sourceFileName=sourceDirectoryFileName.substring(sourceDirectoryFileName.lastIndexOf(“\\”)+1);

audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” sourceFileName : “+sourceFileName);

sourceDirectory =   sourceDirectoryFileName.substring(0,sourceDirectoryFileName.lastIndexOf(“\\”)); // Get the directory which is being processed

audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” sourceDirectory : “+sourceDirectory);

}

        targetQueue = sourceDirectory.substring(sourceDirectory.lastIndexOf(“\\”)+1);

       targetQueue = “Q_” + targetQueue.toUpperCase();  // Q_<Plant_Name>

      audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” targetQueue : “+targetQueue);

       msg.setSequenceId(targetQueue); // This method will set the queue ID to plant name

       audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” after msg.setSequenceID .”);

       audit.addAuditLogEntry(key, AuditLogStatus.SUCCESS, ” SetQueueID : Custome Queue is set to the SequenceID .”);

Hence message corresponding to a file picked from directory \\testserver\plants\MK04, the sequenceID will be set to Q_MK04. Hence the corresponding queue name will be in form XBQIQ_MK04 (you can find queue with this name in SMQ2. The string after XBQI is our sequenceID set in adapter module. This is a qRFC (EOIO) inbound queue).

Please find the screenshot of the communication channel audit log, reliable messaging tag of  SOAP:Header and integration engine trace log of the message below:

ScreenShot1.png

ScreenShot2.png

ScreenShot3.png

This is my first blog in SCN. Hence please let me know your feedback and suggestions. Thanks a lot for going through my blog.. 🙂

To report this post you need to login first.

11 Comments

You must be Logged on to comment or reply to a post.

    1. Rakesh Sharma Post author

      Hi Michal,

      Thanks for bringing it to my notice. I explained the problem and solution which I faced.  Setting queue ID coincided with James blog. I was unaware of it. I will link it to James blog to have full view on the topic.

      Thanks

      Rakesh

      (0) 

Leave a Reply