Skip to Content
Technical Articles

How to stop messages in CPI manually

A couple of days ago I read the following question on the SAP answers section of SAP community:Β  “CPI Message Processing Stuck“. Alice van Ommen asked, if it was possible to stop a SAP CPI message, if it gets stuck in the “Processing” state. The answer to this question pointed to SAP Note 2399949, which says that it isn’t possible and one has to wait for max. 30 days, until a clean-up job runs. I think the SAP note may be a bit misleading, because it doesn’t fit in 100% of the cases where a message is stuck in “Processing” state. But let me show you why…

Note: If you’re not interested in the “how”, but only want a quick solution, scroll down to the “ready-to-use solution” paragraph at the end of this article.

Table of contents

  • Can messages (exchanges) on CPI be stopped?
  • How to stop messages – step by step
  • How to stop messages – a ready-to-use solution
  • Feedback and discussion

Can messages (exchanges) on CPI be stopped?

The answer is yes. Messages (=CPI slang; exchanges=Apache Camel slang) can be stopped at least when they are running. This may sound like a triviality, but let me explain why I point out the “when they are running”. There can be two reasons why a message is shown as in “Processing” state in SAP CPI’s monitoring.

  1. The message is still running
  2. The message already stopped, but the monitor didn’t got the information

Case 1 may occur if you managed to write an infinite loop in one of your scripts or if there are network slowdown or when dealing with huge payloads or… Case 2 might occur, when the message processing was “hard-interrupted” by crashing the runtime node or undeploy the IFlow during the message processing.

If your message is shown as “Processing” because of “case 2”, you have no other chance to get the entry in the monitoring correct, than waiting (up to) 30 days for the monitoring clean-up job. (But hey, after all it’s only a display error and the message dosn’t consume any resources from your tenant.) But if you get into a “case 1” situation, there is a good chance that you can cancel the message. Let’s have a look on the necessary code in the next section.

How to stop messages – step by step

The short version: Get a handle to the Exchange object which should be stopped and stop it.

The long version: At first you should define the id of the IFlow id and the message id of the message you want to stop. Then we use the FrameworkUtil-class from the OSGi to get the bundle “com.sap.gateway.ip.core.customdev.util.Message” (because it exists in every runtime node) to get the overall BundleContext via the bundle’s getBundleContext-function.

The BundleContext itself gives us access to the service references. (Service reference represent (not exclusive, but also) the instances of our deployed/running IFlows. Via the getServiceReferences-function and by passing our IFlow’s id, we can retrieve the ServiceReference object which links to the IFlow which runs the message we want to stop. In the next step, we cast the ServiceReference to a CamelContext and save it in a variable called “camelContext”.

def iFlowID = "THE_ID_OF_YOUR_IFLOW_ARTIFACT"
def messageID = "17gbd329bd3j90eqh3hq093eh3q" //The SAP CPI message you want to stop

def bundleCtx = FrameworkUtil.getBundle(Class.forName("com.sap.gateway.ip.core.customdev.util.Message")).getBundleContext()		
//Get service references for IFlow
ServiceReference[] srs = bundleCtx.getServiceReferences(CamelContext.class.getName(), "(camel.context.name=${iFlowID})");
//Get CamelContext from service reference
CamelContext camelContext = (CamelContext)bundleCtx.getService(srs[0])

Imagine the CamelContext as an instance of our target Iflow. Via the CamelContext’s getInflightRepository-function and it’s browse-function we can get a list of all Exchanges (=messages) which are currently processed by the IFlow. (The InflightRepository tracks the exchanges during the processing steps.)

At first we check if the InflightRepository has at least on message (size() > 1). If so, everything is fine. If not, we stop at this point, because the message we want to stop is already completed. If there are messages in the InflightRepository, we call the browse-function in combination with the find-function, passing our message id, to get the repository entry, which reflects the message we search for. We save the repository entry in a variable called “repoEntry”.

//Loop through all messages currently in processing
if (camelContext.getInflightRepository().browse().size() < 1){
	throw new Exception("No messages in InflightRepository, thus nothing to stop.")
} else {
	//Get repository entry for given message id
	def repoEntry = camelContext.getInflightRepository().browse().find{ it.getExchange().getProperty("SAP_MessageProcessingLogID") == messageID}
	if (repoEntry != null){
		//Get exchange, set Exception and stop it
		def exchange = repoEntry.getExchange()
		exchange.setException(new InterruptedException(exceptionText))
		exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
	} else {
		throw new Exception("No message found for given metatdata.")
	}
}

If “repoEntry” isn’t null, which means we got a valid reference, we call the getExchange-function on it, to get an instance of type Exchange. That’s the holy grail we were looking for. With two more lines of code we are able to stop the message now. By calling the setProperty-function with the parameters “Exchange.ROUTE_STOP” and “Boolean.True” we set the message to be stopped as soon as possible.

Note (1): The setException-call is optional. If you call this function, the message will show up as “FAILED” in SAP CPI’s monitoring. The exception text, passed to the setException-function, will also be shown in CPI’s monitoring. If you don’t call the setException-function, but only set the “Exchange.ROUTE_STOP” property, then the message will be stopped, but shown as “COMPLETED” in CPI’s monitoring.

Note (2): By setting the “Exchange.ROUTE_STOP” property, you don’t kill an Exchange immediately, but let the Camel framework know, that it should stop the Exchange at the next possible time (=usually the next processing step in the IFlow).

How to stop messages – a ready-to-use solution

Now that we have looked into the technical details, we take a look at the practical implementation. You can use the method above, by building a simple IFlow which looks as follows.

Create a new IFlow with one sender and no receiver. Connect the Sender element via HTTPS sender channel. In the Sender set an endpoint address according to your taste. After that, add a script element of type Groovy Script and past the following script. (The script is based on the theoretical part above, but expanded by some lines of error- and input handling.)

//internal
import com.sap.gateway.ip.core.customdev.util.Message
import groovy.json.JsonOutput
import org.apache.camel.*
import org.osgi.framework.*

Message processData(Message message) {
	
	//Get url parameters
	def params = [:]
    (message.getHeaders().CamelHttpQuery =~ /(\w+)=?([^&]+)?/)[0..-1].each{
        params[it[1]] = it[2]
    }
    
	//Stop message
	def result = stopExchange(params.messageId, params.iFlowId)
	
	//Return result
	def body = JsonOutput.toJson(result)
    message.setBody(body)
    message.setHeader('Content-Type', 'application/json')
	return message
}

private stopExchange(messageId, iFlow, exceptionText = "Exchange stopped manually"){
	def result = [successful:true, text:""]
	
	try {
		//Get general bundle context
		def bundleCtx = FrameworkUtil.getBundle(Class.forName("com.sap.gateway.ip.core.customdev.util.Message")).getBundleContext()
		
		//Get service references for IFlow
		ServiceReference[] srs = bundleCtx.getServiceReferences(CamelContext.class.getName(), "(camel.context.name=${iFlow})");
		//Get CamelContext from service reference
		CamelContext camelContext = (CamelContext)bundleCtx.getService(srs[0])
	
	
		//Loop through all messages currently in processing
		if (camelContext.getInflightRepository().browse().size() < 1){
			result.successful = false
			result.text = "No messages in InflightRepository, thus nothing to stop."
		} else {
			//Get repository entry for given message id
			def repoEntry = camelContext.getInflightRepository().browse().find{ it.getExchange().getProperty("SAP_MessageProcessingLogID") == messageId}
			if (repoEntry != null){
				//Get exchange, set Exception and stop it
				def exchange = repoEntry.getExchange()
				exchange.setException(new InterruptedException(exceptionText))
				exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
			} else {
				result.successful = false
				result.text = "No message found for given metatdata."
			}
		}
	}
	catch(Exception ex){
		result.successful = false
		result.text = ex.getMessage()
	}
	return result
}

That’s it. Save and deploy the IFlow. After deployment, you can call your IFlow in any webbrowser. By adding the url parameters “messageId” and “iFlowId” you can stop message with ease now.

Example call:

https://x0815-iflmap.hcisbp.eu1.hana.ondemand.com/http/CancelMessage?messageId=AF3Cuw5OXjZYte4cEwEDEaHISQt6&iFlowId=Webbrowser_to_CPI_LongrunnerInterface

Because the IFlow responds with an easy-to-parse JSON document, including status- and error information, it is also ideal for interfacing with a third-party application or monitoring tool of your choice.

Feedback and discussion

It was a fun journey. Sneaking through the classes and features of CPI / Apache and finding out what’s possible is always fun. Also, if that should be clear, let me say it again in clear words: The concept shown above is a proof-of-concept. It is not an official method of SAP itself, nor is it recommended by SAP. Please handle this knowledge responsibly.

Enough of the harsh words now. Please let me know what you think about the methods/idea shared with you in the article above? Did you know them before? Have you ever faced the problem of stuck messages and how did you solve the situation? I’m looking forward to your comments.

21 Comments
You must be Logged on to comment or reply to a post.
  • Great blog, I encountered same issue this year due to problematic input data and caused iFlow unable to reach completed.

    This blog shows great insight in CPI. I wonder how you did your investigation during figuring out the code in your blog. If all done in CPI tenant then your are brilliant!

    • Hi Nick,

      for testing/figuring out what is possible, I used the method described here, which allows us to run scripts immediately without deploying: https://blogs.sap.com/2019/06/17/rapid-groovy-scripting-in-cpi-dynamic-loading-and-execution-of-script-code-at-runtime/ This accelerated the development decisively.

      As entry point for my investigation I used Eng Swee’s hint in this article, which shows us how to access the Exchange-object of a message: https://blogs.sap.com/2018/04/05/using-camels-simple-in-cpi-groovy-scripts/

      With access to the Exchange itself, we have access to a Apache Camel class/object. Thus I tried to “break” out of the CPI environment and see what is possible in Apache Camel. By searching Apache Camel boards, documentation and Stackoverflow I tried to figure out how it is possible to stop Exchanges in Camel. With the ideas from my research, I tried to code down the information I collected in the CPI environment.

      Hopefully this answers your question.

      • Thanks for sharing your journey on how to make it work.

        Your sample code also shows very lean and thoroughly understanding of Groovy language.

      • Getting into the Camel bits behind CPI is indeed fun and a whole new world to explore πŸ™‚ And indeed it is achieved much faster/easier with dynamic script execution πŸ™‚

        I love the way you expanded on the knowledge that Vadim and myself shared, and brought this to another level. It shows how a vibrant community of passionate individuals can spur each other towards greater heights by sharing knowledge and continually learning from one another. Kudos again for another fantastic, simply fantastic work.

  • Hi Raffael,

    thanks a lot for your blog, great solution!

    The only hurdle I have to overcome is that non of my iFlows can be Deployed since the message got stuck…

     

    Any tips;-)

     

    All the best, regards Alice

  • Raffael, thank you for such mindful insight. Message cancellation is truly a highly demanded feature, and an elegant way to address it that you illustrated!

  • Raffael, I think you might be able to get access to the Camel Context without the bundle and service reference.

    def Message processData(Message message) {
    	Exchange exchange = message.exchange
    	CamelContext context = exchange.getContext()

    I admit it takes advantage of Groovy’s direct access to private attributes and may seem “hacky”, but we are indeed hacking into the system anyway πŸ™‚

     

    • Hi Eng Swee Yeoh ,

      You’re right. It’s an easier way to get the CamelContext belonging to an exchange. But it doesn’t help for this task. If I’m not wrong, each Iflow/OSGi package has its own CamelContext with an independent InflightRepository. So using the CamelContext from the Exchange that was triggered to stop a specific message would only work if the code runs in the same Iflow which also hold the message to be stopped.

      If there is a way to get access to something like a global InflightRepository this would not be necessary.

      Regards

  • Hi Raffel,

     

    I tried the same to stop my iflow messages in processing status.But every time it is giving me message as “No messages in InflightRepository, thus nothing to stop.”It is not getting any messages inflight repository,though I can see them in message monitoring.Can you please suggest onthis.

     

    Thanks,

    Sonali

    • Hi Sonalia,

       

      if there are no more messages in the InflightRepository, then you’ve either choosen a wrong IFlow-Id or the messages are already processed and the CPI monitor just didn’t get the information. The second option is case 2 as described in the article: “Case 2 might occur, when the message processing was β€œhard-interrupted” by crashing the runtime node or undeploy the IFlow during the message processing.” In this case you have to wait for the official clean-up job.

       

    • Hi Vijay,

      I haven’t tried to stop “retry” messages. But if you’re talking about the standard retry mechanism (with JMS adapter + message queues) than I would guess it should work with the code shown above if you make one little adjustment: Remove the following line from the code above:

      exchange.setException(new InterruptedException(exceptionText))

      Without these line messages are stopped with state “Completed” instead of “Failed”. I think the retry in scenario with JMS adapter in retry mode will only be triggered if a message goes into errornous state. Maybe you can “bypass” this by setting the message simply to “Completed” state as described above.