Skip to Content
Technical Articles

Cloud Integration – Connecting to Messaging Systems using the AMQP Adapter

This blog describes options for configuring asynchronous message processing using the new AMQP adapter, which is available for customers to connect to messaging systems using the AMQP (Advanced Message Queuing Protocol) protocol. It describes the configuration, prerequisites and limits. The AMQP adapter is available for SAP Cloud Platform Integration customers with the 08-December-2019 release.

Connecting to Messaging Systems Using the AMQP Adapter

In many integration scenarios messages or events have to be exchanged between applications or systems via messaging systems. With the new AMQP adapter SAP Cloud Platform Integration can be used as provider or consumer of such messages or events. Cloud Integration can connect to messaging systems using the AMQP (Advanced Message Queuing Protocol) protocol version 1.0 and consume messages or events using the AMQP sender adapter or store messages or events in the message broker using the AMQP receiver adapter. Follow the steps described below to write to or consume from queues and/or topics in the message broker.

Prerequisite: Messaging System Setup

To be able to connect to queues or topics in the message broker, you have to create queues and/or topics in the message broker. This needs to be done on the messaging system with the configuration tools provided by the messaging system.

In some messaging systems you need to configure a Lock Duration to make sure the message is not consumed multiple times. This timeout must be longer than the expected processing time of the message, otherwise this would lead to duplicate messages.

Note that the monitoring of queues, topics and messages in the queues or topics can only be executed by using tools provided by the messaging system provider. These monitors are not integrated into Cloud Integration. In SAP Cloud Platform Integration you can monitor the integration flows using the AMQP adapter and the messages send to or consumed from the messaging system. You can find more details about those options in this blog in section ‘Monitoring’.

 

AMQP Receiver Adapter

The AMQP receiver adapter can be used to send messages from Cloud Integration to queues or topics in an external messaging system. A small sample scenario with a HTTP sender adapter and the AMQP receiver adapter is shown in the picture below. A message is received by the HTTP sender adapter, some headers are defined and conversions in a groovy script are done, the message is then sent to the messaging system via the AMQP adapter:

In the AMQP receiver channel the following configurations have to be made to connect to the messaging system. Note that these settings are very specific to the connected messaging system, some sample settings are listed below in the messaging system specific chapters.

  • Protocol:
    • Transport Protocol: Select the protocol the messaging system supports.
      • TCP
      • WebSocket

  • Connection:
    • Host: Specify the hostname of the messaging system.
    • Port: Specify the port of the messaging system.
    • Proxy Type: Select if you want to connect via Cloud Connector (On-Premise) or directly via the Internet, this configuration option is available with version 1.1 of the adapter (available with the January 2020 update). See blog How to connect to an on-premise AMQP server via Cloud Connector for more details.
    • Path: For WebSocket specify the access path of the messaging system.
    • Connect with TLS: Select if TLS has to be used for the connection.
    • Authentication: Select the authentication method the messaging system supports.
      • SASL (Simple Authentication and Security Layer)
      • OAuth2 Client Credentials
      • None
    • Credential Name: Select the alias of the deployed credentials.

  • Processing:
    • Destination Type: Specify if messages are to be sent to queues or topics in the messaging system.
      • Topic
      • Queue
    • Destination Name: Specify the name of the queue or topic. This value can be dynamically defined using header: ${header.queueabc} or property: ${property.queueabc}.
    • Expiration Period: Specify the TTL (Time to Live) for the message. If nothing is specified, the setting for the queue or topic subscription in the messaging system applies. How the defined TTL is interpreted by the messaging system depends on the messaging system.
    • Delivery: Specify if the messaging system has to make sure that the message is not lost, even in case of unexpected terminations.
      • Persistent
      • Non-Persistent

 

AMQP Sender Adapter

The AMQP sender adapter can be used to consume messages in Cloud Integration from queues or topic subscriptions in an external messaging system. A small sample scenario with the AMQP sender adapter and a SOAP receiver adapter is shown in the below picture. A message is consumed by the AMQP sender adapter from the messaging system, some conversions in a groovy script are executed, and then the message is sent to the receiver via the SOAP adapter:

In the AMQP sender channel the following configurations need to be done, sample settings below in the messaging system specific chapters:

  • Protocol:
    • Transport Protocol: Select the protocol the messaging system supports.
      • TCP
      • WebSocket

  • Connection:
    • Host: Specify the hostname of the messaging system.
    • Port: Specify the port of the messaging system.
    • Proxy Type: Select if you want to connect via Cloud Connector (On-Premise) or directly via the Internet, this configuration option is available with version 1.1 of the adapter (available with the January 2020 update). See blog How to connect to an on-premise AMQP server via Cloud Connector for more details.
    • Path: For WebSocket specify the access path of the messaging system.
    • Connect with TLS: Select if TLS has to be used for the connection.
    • Authentication: Select the authentication method the messaging system supports.
      • SASL (Simple Authentication and Security Layer)
      • OAuth2 Client Credentials
      • None
    • Credential Name: Select the alias of the deployed credentials.

  • Processing:
    • Queue Name: Specify the name of the queue or topic subscription to be consumed from.
    • Number of Concurrent Processes: Specify the number of processes used for parallel message processing. Note, that the processes are started from each worker node.
    • Max. Number of Prefetched Messages (this configuration parameter will be available with the October 25th, 2020 update): Specify the maximum number of messages to be prefetched by a worker. The default value is set to 5, which means that if there are more than 5 messages in the queue, the first worker fetches 5 messages at once and processes them one after the other; the second worker fetches the next 5 and so on. The allowed value range is 1 to 100. The performance may be increased with this prefetch feature because it reduces the time for additional communications. Consider the following:
      • If the processing time of the integration flow is longer than only some seconds, consider a lower value for prefetched messages to allow an optimal distribution between the worker nodes. For very long running integration flows (longer than one minute) you may even consider 1 as value for prefetched messages.
      • If the processing time of the integration flow is only some milliseconds you may increase the number of prefetched messages to 10 or even higher. But keep the lock timeout in mind (next point).
      • Important: You need to keep the lock timeout of the messaging system in mind because the timeout starts with the processing of the first message in the prefetch package. If the package of the prefetched messages takes longer to process than what you’ve specified for the lock timeout, this could lead to duplicate messages!
      • For older versions of the AMQP channel this configuration option is not available. In this case, the default value of 5 messages is used.
  • The Retry Details section will be available with the 10th-May 2020 update.
    • Max. Number of Retries: Specify the maximum number of retries that shall be executed in case there is an error during processing. Afterwards a different delivery status will be sent to the message broker (see next field).
    • Delivery Status After Max. Retries: Select the delivery status that shall be sent to the message broker after the specified retries were executed and the message processing still fails. The following two values are available: REJECTED and MODIFIED_FAILED_UNDELIVERABLE. The message broker can then handle such messages differently.

Retry Configuration

If an error occurs during the processing of the consumed message in Cloud Platform Integration, the message is not removed from the messaging system, but is retried again immediately as long as the maximum number of configured retries is reached. In case the messaging system does not support a different handling of the message when a different delivery status is sent, the retry may even go on forever.

There are different options available to handle errors during processing and configure the retry for messages consumed via AMQP:

  • Use Retry configuration AMQP sender adapter as described above (available with 10th May 2020 update)
  • Retry Configuration based on JMSRedelivered/JMSXDeliveryCount headers in Exception Sub-Process
  • Catch Error and configure alternate processing in Exception Sub-Process

If the used messaging system supports dead-lettering and delayed processing based on the delivery status as described for the AMQP sender adapter, then this is the recommended configuration option. If the messaging system does not support dead-lettering or you want to do a more specific handling, like sending a notification to an administrator after a certain number of retries you can use the configuration based on the two headers JMSRedelivered and JMSXDeliveryCount. The third option is the only option if the messaging system does neither support dead-lettering nor the headers.

Retry Configuration in AMQP Sender Adapter

If supported by the messaging system use the retry configuration in the AMQP sender adapter as shortly mentioned above.

The Retry Details section in the AMQP sender adapter will be available with the 10th-May 2020 update.

  • Max. Number of Retries: Specify the maximum number of retries that shall be executed in case there is an error during processing. Afterwards a different delivery status will be sent to the message broker (see next field).
  • Delivery Status After Max. Retries: Select the delivery status that shall be sent to the message broker after the specified retries were executed and the message processing still fails. The following two values are available: REJECTED and MODIFIED_FAILED_UNDELIVERABLE. The message broker can then handle such messages differently.

Most message brokers put messages with delivery status REJECTED into a dead letter queue, but for several message brokers a specific configuration in the message broker is required as well to achieve this. Some message brokers do not react on the delivery status at all. Check out the configuration options of the used message broker. Some specifics are mentioned below in the section about different tested message brokers.

Retry Configuration based on JMSRedelivered/JMSXDeliveryCount

There is no option to configure a delay in retry processing in the AMQP adapter because this is not supported by the AMQP protocol. One option to delay the retry in error cases is to configure this retry handling in the integration flow using dead letter queues in the messaging system (if this is supported by the messaging system). This can be configured using two headers set by most of the messaging systems:

  • JMSRedelivered: This header specifies, if the message was redelivered. Value ‘false’ means that this message was delivered for the first time, value ‘true’ tells you that this message was already tried to be delivered before.
  • JMSXDeliveryCount: This header specifies the number of processings for this message. Value 1 means that it is the first attempt to process this message, value 2 means that it is the second attempt, meaning it is the first retry.

Note that these headers are not set or not reliably set by some messaging systems. See below in the messaging system specific section for more details.

Configure Delayed Retry Processing

The first step to configure a delayed retry would be to add a Router step and another Receiver to the integration flow consuming the message:

In the Router the route to the second receiver has to be configured in such a way that messages which are consumed for the second time are sent to another queue in the messaging system. The Condition in the Router can be defined by using one of the above-mentioned parameters. In the described sample I use the ‘JMSXDeliveryCount’ parameter and define the Non-XML expression as ${header.JMSXDeliveryCount} > ‘1’.

In the AMQP receiver channel a different queue is configured. This queue has to be created in the messaging system. From this queue no integration flow consumes messages, it is only used as storage for the time to the next retry. In my sample I use the queue ‘retryqueue’. Important is that you should not define an Expiration Period for the message because after this time the message can usually not be consumed anymore, also not from the dead letter queue. This would not fit to the retry modeling we configure.

In the parameters for this queue it needs to be specified that messages are moved to a dead letter queue after a specific time interval (time to live). This configuration has to be made in the messaging system by using the tools provided by the messaging system.

From this dead letter queue we can then consume the message and put it back to the original processing queue. This is done in a separate integration process:

The AMQP sender adapter needs to be configured to consume messages from the dead letter queue of the retry queue. In case Microsoft Azure Service Bus is used as messaging system, the dead letter queue can be accessed by using the following syntax: ‘<queue>/$deadletterqueue’: This may be different for other messaging systems.

The process writes the message back to the original processing queue via an AMQP receiver adapter so that it can be consumed from there again.

In addition to these configurations, you need to make sure the header JMSXDeliveryCount is retrieved by the integration flow. For this we configure the header as Allowed Header in the Runtime Configuration of the integration flow:

Now we can deploy the integration flow and test the processing:

As soon as a message is available in the processing queue in the messaging system, the message is consumed by Cloud Integration. If there is an error during processing, the processing fails and an immediate retry is executed. This retry stores the message in the separate retry queue, where the message is parked for the amount of time configured in the messaging system. As the message is not consumed from this retry queue it is moved to the dead letter queue after the configured time to live.

As soon as it is available in the dead letter queue the second process consumes the message and stores it to the original processing queue. Form there it is consumed and tried to be processed again. The same retry procedure would be triggered if again an error in processing occurs.

You see, with this we are able to define a kind of retry interval, the message would only be retried after a defined time interval. What is not possible with this configuration is to stop a message after a certain number of retries. How this can be done, we see in the next section.

Stop Retry Processing after x Retries

First, we need to introduce a counter that counts the numbers of executed retries. For this we cannot use the JMSXDeliveryCount header set by the messaging system because we move the messages between different queues and so the JMSXDeliveryCount is always reset. We define our own counter in the process that writes the message back to the processing queue. We do this in a small groovy script:

In the script we set a header RetryCount.

Note that headers with prefix JMS are not allowed, such headers are not forwarded by the messaging systems.

import com.sap.gateway.ip.core.customdev.util.Message;
import java.util.HashMap;

def Message processData(Message message) {
    // Get Headers
    map = message.getHeaders();
    retry = map.get("RetryCount");
    if (retry == null) {
          retry = 1;
       }
       else {
          retry = retry + 1; 
       }   
    message.setHeader("RetryCount", retry);
   return message;

}

Furthermore, we extend the configuration in the main process that consumes messages from the processing queue by adding an additional Router step:

In this Router step we evaluate the RetryCount header, and if it is greater or equal 5 we do some alternative handling and stop the retries. In my sample I simply write the message to a data store, but any other configuration could be done as well.

As a last step we need to add the RetryCount header in the Allowed Headers in the Runtime Configuration of the integration flow:

 

Now we can deploy the updated integration flow and test the processing:

As soon as a message is available in the processing queue in the messaging system, the message is consumed by Cloud Integration. If there is an error during processing, the processing fails and an immediate retry is executed. This retry stores the message in the separate retry queue, where the message is parked for the amount of time configured in the messaging system. As the message is not consumed from this retry queue it is moved to the dead letter queue after the configured time to live.

As soon as it is available in the dead letter queue the second process consumes the message, sets the RetryCount header and stores it to the original processing queue. From there it is consumed and as the numbers of executed retries are checked. The message is only retried 4 times; during the 5th retry, the message is written to the data store and the processing stops.

Configure Exception Handling for Messaging Systems not Supporting the Headers

As mentioned already, not all messaging systems support the two headers to configure explicit retry handling, for example Enterprise Messaging. In this case the only option to avoid endless retries that lead to a huge amount of message processing log, is to configure an exception handling in case of an error:

Configure an Exception Subprocess in your integration flow to catch the error. Define an alternate processing for the message in the exception sub-process. In the above case the message is stored to a data store, and afterwards an email is sent to the administrator to analyze the error and process the message differently. In case the message is stored to a data store, like in the above sample, a different integration flow could consume and process these messages. You can for sure also define a different error handling, like storing the message to a different queue (kind of dead letter queue) on the messaging system.

It is important that you end the Exception Subprocess with a Message End event and not with an Error End event to make sure the message is marked as Completed and is removed from the original queue on the message broker to prevent endless retries.

Monitoring

There are different options to monitor the integration flows, the connection to the messaging system and the messages processed.

AMQP Connection Test (available with the 16-Feb-2020 update)

The Connectivity Test is available in Operations View in Web UI, in section Manage Security Material. Selecting the Connectivity Test tile from Overview Page opens the test tool offering tests for different protocols. To test the communication to the messaging system, the AMQP option is to be selected.

 

Integration Content Monitor

After the integration flow is deployed it can be found in the Manage Integration Content monitor. There you can check the status of the integration flow.

Poll Status (available with the 16-Feb-2020 update)

In the Status Details area you may find the status and the details about the current consumption:

If there is an error when consuming messages via the AMQP sender adapter the error would be shown here for the respective integration flow. In the Polling Information the status of the consumption is shown as Failed:

Message Processing Monitor

Processed messages can be monitored in the Message Processing monitor:

In case of an error when sending the message to the messaging system using the AMQP receiver the errors would appear in this monitor:

Messaging System Monitor

Queues, topics and messages in the messaging system can be monitored using the monitoring tools of the messaging system.

Mapping of AMQP Headers, Properties and Annotations to Headers in Cloud Integration

During transforming a AMQP message to a message in Cloud Platform Integration, or vise versa, the following AMQP headers, properties and annotations are mapped to headers in Cloud Integration:

AMQP Type        AMQP Name                   Header Name in Cloud Integration

header                  durable                              JMSDeliveryMode

header                  priority                               JMSPriority

header                  delivery-count                  JMSXDeliveryCount

annotation            x-opt-delivery-time       JMSDeliveryTime

property                message-id                      JMSMessageID

property                user-id                               JMSXUserID

property                to                                         JMSDestination

property               subject                               JMSType

property               reply-to                              JMSReplyTo

property                correlation-id                  JMSCorrellationID

property                absolute-expiry-time    JMSExpiration

property                creation-time                  JMSTimestamp

property                group-id                            JMSXGroupID

property                group-sequence             JMSXGroupSeq

AMQP application properties are transferred to Camel headers in CPI runtime with the same name and the other way round. Important is that the application properties are not allowed to start with JMS, else it would not be forwarded into a Camel header.

 

Exclusive Processing

In general there are two ways to provide exclusive (in-order) processing using an AMQP broker:

  • Exclusive queues provide a capability that will let only one consumer consume from a given queue at a time. If that particular consumer is stopped or crashes one of the other consumers takes over consuming the messages. This means this also works if the Cloud Integration tenant has multiple worker nodes.
  • Message groups are an optional feature of the AMQP 1.0 protocol. Groups are realized by using the AMQP property group-id. If an AMQP producer sets the group-id property at a message and the broker supports message grouping for that queue, all messages from the same message group will be processed one after the other. In our case, if the producer is a Cloud Integration system, you can set the AMQP group-id property by setting the camel header JMSXGroupID (see in the table above).

Note that not all messaging systems support exclusive queues and/or message groups and these feature may have to be activated explicitly for a queue in the messaging system configuration. See below in the messaging system specific section for more details.

 

Connections required for Processing

For the AMQP adapter you require a permanent minimum of one connections per worker node for consuming messages from a queue because each worker node needs a connection to the message broker. For storing a message to the messaging system the numbers of worker nodes is the maximum of required connections.

In total this means the numbers of connections per queue is:

number of worker nodes + parallel requests storing messages to the message broker (maximum: number of worker nodes)

This means you would need a maximum of 4 connections for one queue for a cluster with two worker nodes.

 

Supported External Messaging Systems

There are multiple providers of messaging system implementations available. Most of them support AMQP 1.0 and can be connected using the AMQP adapters. The most used messaging systems were tested by SAP Cloud Platform Integration. Note the required settings, configuration options and limitations below.

SAP Enterprise Messaging

The following configuration options are applicable for the SAP Enterprise Messaging:

  • Connectivity Options: WebSocket over TLS (Port 443 with Path: “/protocols/amqp10ws“)
  • Authentication Option: OAuth2 Client Credentials
  • Dead Letter Queue Handling based on Delivery Status: No dead letter queue configuration supported. This means there is an endless redelivery of the message in error case. You need to configure alternative processing using exception-subprocess (see chapter ‘Configure Exception Handling for Messaging Systems not Supporting the Headers’ above).
  • JMSRedeliverd/JMSXDeliveryCount supported: No
  • Exclusive Queues supported: Yes
  • Message Groups supported: No
  • Queues supported: Yes, queues  need to be accessed using the following syntax in AMQP adapter channel: “queue:<queue name>”
  • Topics supported: Yes, topics need to be accessed using the following syntax in AMQP adapter channel: “topic:<topic name>”
  • Topic subscriptions supported: Yes, queues can be subscribed to topics.

Size Limitations from SAP Enterprise Messaging

When using Enterprise Messaging consider the following important restrictions:

  • Maximum Number of Connections supported per service instance: 10
  • Maximum Numbers of Service Instances: 10

Taking the above connection requirements in consideration, when using Enterprise Messaging  with the AMQP adapter this means that you should use one service instance per queue.

Note, that for clusters with more than 5 worker nodes the AMQP adapter can currently not be recommended in combination with Enterprise Messaging.

For more details check out the documentation.

Microsoft Azure Service Bus

The following configuration options are applicable for the Microsoft Azure Service Bus:

  • Connectivity Options: TLS over TCP (Port 5671)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Supported, delivery status REJECTED puts the message to a dead letter queue, delivery status MODIFIED_FAILED_UNDELIVERABLE gives the message a deferred status in the messaging system (Message Deferral). Deferred messages stay in the queue, but are not consumed anymore.
  • JMSRedeliverd/JMSXDeliveryCount supported: Yes
  • Exclusive Queues supported: No
  • Message Groups supported: No
  • Queues supported: Yes
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, topic subscriptions can be accessed as separate, queue-like entities using the AMQP sender adapter: “[topic name]/subscriptions/[subscription name]”. Multiple subscriptions for a single topic can exist. It is not possible to send to topic subscriptions using the AMQP receiver adapter.

Solace PubSub+

The following configuration options are applicable for the Solace message broker:

  • Connectivity Options:
    • TCP (Port as defined in the Solace broker)
    • TLS over TCP (Port as defined in the Solace broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Supported, but the dead letter queue and the maximum redeliveries have to be configured in the message broker as well. Delivery status REJECTED puts the message to the dead letter queue. Delivery status MODIFIED_FAILED_UNDELIVERABLE must not be used as it is not supported and can lead to errors during processing.
  • JMSRedeliverd/JMSXDeliveryCount supported: No
  • Exclusive Queues supported: Yes
  • Message Groups supported: No
  • Queues supported: Yes
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics.

Apache Qpid Broker-J

The following configuration options are applicable for the Apache Qpid Broker-J:

  • Connectivity Options:
    • TCP (Port as defined in the Qpid broker)
    • TLS over TCP (Port as defined in the Qpid broker)
    • WebSocket (Port as defined in the Qpid broker)
    • WebSocket over TLS (Port as defined in the Qpid broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Yes, but the dead letter queue and the maximum redeliveries have to be configured in the message broker as well. Delivery status REJECTED puts the message to the dead letter queue after the number of redelivery attempts configured in the messaging system. Delivery status MODIFIED_FAILED_UNDELIVERABLE takes the message out of the processing as long as the connection to the messaging system is established. After restart of the integration flow or the worker node the message would be tried again. After the number of redelivery attempts configured in the messaging system the message is moved to the dead letter queue.
  • JMSRedeliverd/JMSXDeliveryCount supported: Yes
  • Exclusive Queues supported: Yes
  • Message Groups supported: Yes
  • Queues supported: Yes
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics.

Apache ActiveMQ 5 and Apache ActiveMQ Artemis

The following configuration options are applicable for the Apache ActiveMQ:

  • Connectivity Options:
    • TCP (Port as defined in the ActiveMQ broker)
    • TLS over TCP (Port as defined in the ActiveMQ broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Yes, the dead letter queue, the maximum redeliveries and a redelivery delay can be configured in the message broker as well. Delivery status REJECTED and MODIFIED_FAILED_UNDELIVERABLE put the message to the dead letter queue.
  • JMSRedeliverd/JMSXDeliveryCount supported: Yes
  • Exclusive Queues supported: No
  • Message Groups supported: Yes
  • Queues supported: Yes
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics.

Note that Amazon MQ Service is based on Apache Active MQ and so, the same configuration options apply. For details see blog How to connect to an Amazon MQ Service using the AMQP Adapter.

IBM MQ

The IBM MQ broker has only limited support for the AMQP protocol, it does not support addressing queues via AMQP protocol, see IBM MQ documentation. Because of this limitation the AMQP receiver adapter can only be used to write to topics on the IBM MQ broker, but the AMQP sender adapter cannot consume from IBM MQ because only queues are supported for consumption in Cloud Integration.

The following configuration options are applicable for IBM MQ:

  • Connectivity Options:
    • TCP (Port as defined in the IBM MQ broker)
    • TLS over TCP (Port as defined in the IBM MQ broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Not supported, as AMQP sender is not supported with IBM MQ.
  • JMSRedeliverd/JMSXDeliveryCount supported: Not supported, as AMQP sender is not supported with IBM MQ.
  • Exclusive Queues supported: No
  • Message Groups supported: No
  • Queues supported: No
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics to send messages to them.

RabbitMQ

In testing we found that RabbitMQ has problems with AMQP 1.0. Test was done with plugin version 3.8.2, which was the newest version available at this time. Only non-durable queues were processed by the AMQP adapter. For durable queues the processing hangs. This may change with newer plugin versions.

The following configuration options are applicable for RabbitMQ:

  • Connectivity Options:
    • TCP (Port as defined in the RabbitMQ broker)
    • TLS over TCP (Port as defined in the RabbitMQ broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Not supported, as only non-durable queues are supported.
  • JMSRedeliverd/JMSXDeliveryCount supported: Not supported,as only non-durable queues are supported.
  • Exclusive Queues supported: No
  • Message Groups supported: No
  • Queues supported: Only non-durable queues.
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics to send messages to them.
67 Comments
You must be Logged on to comment or reply to a post.
  • Excellent write-up. Looking forward to a native adapter for this.

    How would we go about setting/reading message annotations and application properties on messages?

      • Hello Joachim,

        Camel headers in CPI runtime are transferred to AMQP application properties and the other way round on CPI inbound side.

        As for message annotations: we currently transfer the annotation x-opt-delivery-time into the header JMSDeliveryTime, other annotations are not supported. Could you maybe provide some more details on the scenarios you require AMQP message annotations? What kind of support would be required for the intended scenarios? Which annotations are required?

        Best regards,

        Mandy

         

         

        • Hi,

          Thanks for the reply.

          In our case, we need to be able to read/write AMQP standard properties, like subject (Label on Azure ServiceBus), as well as application properties (which show up as Custom Properties on Azure ServiceBus) and the message annotation x-opt-scheduled-enqueue-time.

          If everything except the message annotation is currently supported we can use the adapter for some things, so that’s good. But we do have some integrations where ScheduledEnqueueTime is used.

          Best regards,
          Joachim

          • Hello,

            AMQP property subject is transferred to the JMS property JMSType, which is transferred to a header in CPI. I will extend the blog with the complete list of properties we map shortly.

            x-opt-scheduled-enqueue-time we currently cannot transfer as this is not supported by our underlying open source components. We would have to check if and how we can provide this.

            Best regards,

            Mandy

          • see new chapter ‘Mapping of AMQP Headers, Properties and Annotations to Headers in Cloud Integration’ in the blog.

            Best regards,

            Mandy

    • No, absolutely not. AMQP adapter can be used to connect to any messaging system via AMQP protocol. JMS adapter can only connect to the internal in CPI embedded messaging.

      BR,

      Mandy

  • Hi Mandy Krimmel,

    I’m trying to follow up your blog using HTTPS to AMQP but I’m not sure which headers are required

    Can you help me understand the headers properties?

    I’m getting the error below: 

    com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: AMQP SASL header mismatch value 0, expecting 41. In state: HEADER0.

    Thanks

    • Hello,

      I do not really understand your question. Which headers are you referring to?

      If you want to send the message via AMQP to the message broker you do not need to set any headers, the message is sent as is. The headers in the table are only relevant if needed in your scenario. For some scenarios you need to fill specific headers to be transferred as JMS properties.

      For a simple scenario from any sender to AMQP receiver you do not need to care about headers.

      Best regards,

      Mandy

      • Thank you so much Mandy Krimmel I did the changes and I’m facing a different error I will double check cloud connector settings with basis team

        com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: socks5, password, localhost/127.0.0.1:20004 =>            :61616, status: FORBIDDEN(2)

    • Hi Leomilde!

      What AMQP provider do you try to connect to?

      I’ve faced the similar issue recently while tried to send messages to RabbitMQ. The point was that RabbitMQ used AMQP protocol version 0.9.1 by default and CPI AMQP adapter used protocol of version 1.0.

      Enabling plugin for 1.0 version of AMQP protocol on RabbitMQ server resolved the problem.

      Regards, Evgeniy.

      • Hi Evgeniy,

        I’m using OASIS AMQP 1.0

        I did some changes to my iflow (just sending the message as is, no transformations needed) per Mandy suggestions and getting different error now, looks like is a cloud connector issue

        com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: socks5, password, localhost/127.0.0.1:20004 =>               :61616, status: FORBIDDEN(2)

        I will check agan next week once test connectivity for AMQP is released

        Thanks

      • Hi Evgeniy,

        Glad to know that you were able to connect to RabbitMQ server with AMQP1.0. I’m trying to connect but SAP’s AMQP Adapter doesn’t have a field to provide virtual host/remote host name and my connection to the remote server is failing as the Rabbit MQ server is not able to connect to the correct virtual host.  Can you please tell me what you did to make it work?

        Thanks
        Santosh

         

         

  • Hi,

     

    I am trying to talk to azure service bus using shared key and i deployed shared key as security parameter. I get the below error.

    Exception while processing SASL init: Crededentials objects of kind secure_param are not supported for SASL authentication

    Can you please help, what should i deploy the credentials like.

     

    Thanks,

    Vijaya Palla

  • Thanks Mandy.

    I deployed them as credentials and it worked. I am able to poll and push messages.

     

    Thanks,

    Vijaya Palla

     

     

    Thanks,

    Vijaya Palla

  • Mandy Krimmel  : I tried using pass through from Azure service bus Queue named Q1 (with source 5000 message) to Azure Queue named Q2 . But shockingly i received 8000+ messages in Q2. Why is this happening ?

     

    Note: 1. Transaction handling was none

    2. Concurrency was 10

    3. I used both persistent and non persistent

    4. But if i use JMS in between, the number is exact ! Is this known bug ?

    • This sounds like there were issues in consuming and the message is then retried. Have you configured exception handling in the integration flow?

      Transaction Handling is not relevant, AMQP adapter does not share a transaction. Usually if a message is successfully processed the message is removed in the message broker because a successfull acknowledgement is sent back. If there is an error, the message is tried to be consumed again.

      This needs to be checked in detail, this should not happen.

      Could you please open a ticket on LOD-HCI-PI-RT and add the integration flow and a clear error description?

      Thank you,

      Mandy

    • A dedicated Kafka adapter is indeed on the roadmap. According to the current planning it is targeted for Q1 2021.

      BR,

      Mandy

  • Mandy Krimmel  Might be silly but what is the syntax for pushing to Topic subscription’s deadletter (as a receiver from some other iflow (JMS to AMQP)) ?

     

    “In case Microsoft Azure Service Bus is used as messaging system, the dead letter queue can be accessed by using the following syntax: ‘<queue>/$deadletterqueue’”

    • Hi,

      this syntax makes sense if you want to consume from the deadletter queue, not for pushing towards it. The AMQP sender adapter consumes messages from the queue, and this is what is described there.

      We have not tested how to push to a deadletter queue directly, is there really a use case for this? Usually moving to the deadletter queue is done by the messaging system directly.

      Have you tried with the above syntax? Did you get an error?

      BR

      Mandy

  • HI Mandy,

    Thank you for the detailed blog.

    I am trying to configure AMQP Sender Adapter, but after deploying my iFlow i amg etting below error.

    The Integration Flow is not operational.
    Error Details

    [CAMEL][IFLOW][POLL_FAILED] : Polling from amqpwss://enterprise-messaging-messaging-gateway.cfapps.eu10.hana.ondemand.com:443/protocols/amqp10ws/vor1/emsbxvorwerk/1sb/createOrderQueue failed.
    [CAMEL][IFLOW][EXCEPTION] : javax.jms.JMSException: Invalid handshake response getStatus: 404 Not Found
    [CAMEL][IFLOW][CAUSE] : Cause: org.apache.qpid.jms.provider.exceptions.ProviderIOException: Invalid handshake response getStatus: 404 Not Found
    [CAMEL][IFLOW][CAUSE] : Cause: java.io.IOException: Invalid handshake response getStatus: 404 Not Found
    [CAMEL][IFLOW][CAUSE] : Cause: io.netty.handler.codec.http.websocketx.WebSocketHandshakeException: Invalid handshake response getStatus: 404 Not Found
    Polling Information
    Adapter URI:
    Continuous Consumption
    amqpwss://enterprise-messaging-messaging-gateway.cfapps.eu10.hana.ondemand.com:443/protocols/amqp10ws/vor1/emsbxvorwerk/1sb/createOrderQueue
    Consumption Status:
    Failed
    Invalid handshake response getStatus: 404 Not Found

    Can you please help me figuring out if i am missing any configuration?

    AMQP Sender Adapter Configuration: –

    Host: enterprise-messaging-messaging-gateway.cfapps.eu10.hana.ondemand.com

    Port: – 443

    Path: – /protocols/amqp10ws/vor1/emsbxvorwerk/1sb

    Proxy Type: – Internet

    Queue Name :- createOrderQueue

     

    Note: – Test Conenctivity with AMQP works in SCPI with SAP Enterprise Messaging.

    Thanks,
    Varun

     

    • Hello Varun,

      the Path sounds strange to me, it /protocols/amqp10ws/vor1/emsbxvorwerk/1sb the path that you also see in the service key of enterprise messaging for the AMQP connection? Usually it is only /protocols/amqp10ws.

      404 means that under the given address no service can be found.

      Best regards,

      Mandy

  • Hi Mandy,

    Thank you for your reply.

    Yes in the Service Keys i can only see this /protocols/amqp10ws. But i added the namepace from the Queue which is “vor1/emsbxvorwerk/1sb/createOrderQueue”. Is this not needed to be done?

    Well i also tried with another AMQP adapter configuration and now i am getting this error.

    The Integration Flow is not operational.
    Error Details
    [CAMEL][IFLOW][POLL_FAILED] : Polling from amqpwss://enterprise-messaging-messaging-gateway.cfapps.eu10.hana.ondemand.com:443/protocols/amqp10ws/createOrderQueue failed.   [CAMEL][IFLOW][EXCEPTION] : org.apache.qpid.jms.JmsConnectionRemotelyClosedException: invalid sender address (220034) [condition = amqp:internal-error]     [CAMEL][IFLOW][CAUSE] : Cause: org.apache.qpid.jms.provider.exceptions.ProviderConnectionRemotelyClosedException: invalid sender address (220034) [condition = amqp:internal-error]

    AMQP Sender Adapter Configuration: –

    Host: enterprise-messaging-messaging-gateway.cfapps.eu10.hana.ondemand.com

    Port: – 443

    Path: – /protocols/amqp10ws

    Proxy Type: – Internet

    Queue Name :- createOrderQueue

     

    Regards,

    Varun

    • Hi Mandy,

      It works now. 🙂

      I changed the queue name to “queue:vor1/emsbxvorwerk/1sb/createOrderQueue”

       

      Continuous Consumption

      amqpwss://enterprise-messaging-messaging-gateway.cfapps.eu10.hana.ondemand.com:443/protocols/amqp10ws//queue:vor1/emsbxvorwerk/1sb/createOrderQueue
      Consumption Status: Successful
      Appreciate you support anyway!!
      Regards,
      Varun
        • Hello Mandy,

          In the blog above you have mentioned that SAP Enteprise messaging does not have the capability of Dead Letter Queue Handling.

          So what i am trying to do is to achieve this behavior: –

          1. I have an Sender AMQP Iflow in SCPI with createOrderQueue.
          2. If an exception occur to deliver this message, i have added an Exception Sub process to put this message in an createOrderRetryQueue.
          3. Now i want to poll a message from this createOrderRetryQueue, but after an internal of 5 mins.

          So what i am trying to do is, creating another iFlow with a Start timer event, which runs every 5 mins and look for messages in the createOrderRetryQueue.

          Can you please let me know how can i use Start Timer event with an combination of Sender AMQP adapter (which can poll the message from the retry queue)?

           

          Regards,

          Varun

  • Hello Mandy Krimmel,

    Thank you for the detailed blog.

    I am trying to configure AMQP Receiver adapter. I am using the ActiveMq local in my computer. Here is my Iflow

    Iflow

    Then i have this error when raise outbound IDOC in SAP ECC:

     

    This is about the header setting. Could you give me an solution about this?

    Thanks and best regards,

    Nghia

    • Hi,

      this means there is an error when calling the ActiveMQ server.

      Have you tried the AMQP connection test? Can the server be reached at all?

      Are the credentials correct?

      You should check in the logs of the active mq server, maybe you find some more details there.

      BR

      Mandy

  • Hi Mandy,

    I am trying to use REST services of Enterprise Messaging using http adapter in SCPI.

    I can successfully get the access token, but when i make a call for consumption API i am getting below error.

    URL being used in SCPI HTTP adapter: –

    https://enterprise-messaging-pubsub.cfapps.eu10.hana.ondemand.com/messagingrest/v1/queues/vor1%2Femsbxvorwerk%2F1sb%2FcreateOrderErrorQueue/messages/consumption

    Error Details
    org.apache.camel.component.ahc.AhcOperationFailedException: HTTP operation failed invoking https://enterprise-messaging-pubsub.cfapps.eu10.hana.ondemand.com/messagingrest/v1/queues/vor1/emsbxvorwerk/1sb/createOrderErrorQueue/messages/consumption with statusCode: 404

    I have set the header property for Authorization: Bearer <accessToken>

    Can you please let me know if the URL for consumption API used is in a correct format or else what is needed to be changed?

    Regards,
    Varun

  • Hi Mandy

    I’m trying to connect to Rabbit MQ hosted on Cloud foundry as a service. But unfortunately the connection is not established from SAP CPI (AMQP) – Connection test to Rabbit MQ.

    My below query is

    1. How to get the host name of Rabbti MQ  ( this is being used as a service in Cloud foundry) – I tried checking the environment variables of the Rabbit MQ service instance. But the Host name assigned in the environmental variable is not going through. The error is – No route found for the host mentioned.
    2. The SCPI is hosted in neo system – Is it possible to connect to Rabbit MQ hosted on Cloud foundry in different region ?

    Your help is highly appreciated

     

    • Hello,

      yes, the AMQP adapter can connect to any messaging system that can be reached via the internet or via the Cloud Connector in an on-premise landscape. Also from the Neo landscape.

      How to address the Rabbit MQ in CF and if this is possible at all:

      • Check that AMQP version 1.0 is supported at all by this message broker.
      • Check the documentation how to address the broker via AMQP
      • Maybe open a support question on Rabbit MQ on CF.

      Please update this comment when you were able to solve your problem.

      Best regards,

      Mandy

       

       

      •  
  • Hi Mandy,

    I have send the message from SAP to activeMQ using the AMQP adapter and  the message is send. But the body has some text come from the message header:

    Do you know how to remove these text?

    Many thanks,

    Nghia

     

    /
    • Hello,

      I don’t completely understand your question. You get the message in activeMQ, but there the message does not only contain the payload, but also some headers from SAP within the message body? Is this correct? We did never encounter something like this, that is actually very strange. Or maybe its only an issue on the UI showing the message? So that also headers are shown along with the message?

      Best regards,

      Mandy

  • Hello Mandy,

    first – thanks for the great blog! I have configured the AMQP sender including retry-handling (3 retries). So far everything works as expected, but there’s one confusing detail. If a message fails at first try, it will be retried. In monitoring the message switches to status “Retry”. If it now gets delivery successfully in the first retry, the retry mode “stops” as expected, but the message is still shown as status “Retry” in monitoring. Do I have to set/delete any headers, etc. just before running to the (successful) message end event, to make the monitor aware, that the message’s status shall switch from “Retry” to “Success”?

    Even if the flow acts as expected, it’s confusing if messages stay in “Retry” status (after they have been “solved”/delivered successfully.)

    Best regards,
    Raffael

    • Hello Raffael,

      glad you like the blog 🙂

      The status should definitely not stay in Retry mode, it should be updated to completed if the message processing gets completed in the retry.

      Some questions:

      • what is the messaging system you are using?
      • Are you using also a CPI integration flow to send the message to the messaging system or are you using an external tool/application?

      best would be if you could open a ticket for this issue on LOD-HCI-PI-RT.

      Thank you,

      Mandy

      • Hi Mandy,

        I use AMQP adapter in combination with SAP Enterprise Messaging. The initial queue is filled via an external NodeJS application (with QOS = 1).

        The interesting part of the flow looks like this:

        Messages are read from initial queue. Then in the router ${header.JMSXDeliveryCount} > ‘1’ is checked. If its 1 or smaller the message is passed via ProcessDirect to the handler flow. If it fails over there the AMQP sender recognizes the error, re-sends the message immediately (but with  JMSXDeliveryCount > 1) an then the message is passed to another queue (SapEms2 receiver).

        Since the message arrives in the second queue and the retry mechanism stops after the first retry it looks fine for me. But nevertheless the initial message (which got status “Retry”) is stuck in “Retry” status even if the message already arrived in the second queue.

        /
        • Hi Raffael,

          As written in the blog the header JMSXDeliveryCount is not set reliably by the SAP Enterprise Messaging system, we saw in our tests that they often set it even in case it is the first processing. As stated in the blog you should not configure retry behavior for Enterprise Messaging based on this header as this does not work as expected in all executions. Enterprise Messaging is kind of special here.

          But still, the mpl status is strange. Is the message really deleted from the original queue? did you check in the EMS queue monitor?

          Best regards

          Mandy

           

          • Hi Mandy,

            yes, the messages were successfully deleted as shown in the screenshot below. That’s why I’m wondering about the message status.

            And yes, I had seen that you wrote JMSXDeliveryCount is not supported by SAP EMS, but since the headers occured in the trace and from my test runs behaved somehow reliable, I though this information may be outdated. But “good” (or should I say sad) to hear, that I just misinterpreted it. 😉

            I now switched to triggering the retry logic via Exception Subprocess and deactivated AMQP’s retry mechanism completely. Thus the “Retry” status problem is bypassed also. (Nevertheless the error still triggers me, because I simply like to understand why things happen like they happen…)

            At the end I must say that SAP EMS is somehow a big disappointment with all the things missing right now. No reliable retry headers, no dead-letter queues, only 99,5% uptime via SLA, … (By the way – using JMS + Enterprise Messaging from Enterprise Edition is no option right now, because we use the EMS to talk to some NodeJS microservices in the SAP CF environment. As par my understanding JMS+EMS can – sadly – only be used for CPI internal purposes.)

          • Hello,

            to really understand the mpl behavior and why the status is in retry we would need the textual mpl with log level DEBUG so that we could check what really happend. Maybe some internal JMS headers caused this.

            Was there only this one message processing log? Or was there maybe another one with Completed status?

            It would be good if you could open a ticket on LOD-HCI-PI-RT so that we could check this is more detail. Please attach the textual mpl with DEBUG log level and the integration flow project.

            Thank you,

            Mandy

          • Hi Mandy,

            thanks for your help and input. I’ll discuss with the client to open a ticket.

            Regarding your MPL question. There are two MPLs.

            • Message comes initially via AMQP (=First MPL)
            • Initial message is passed via ProcessDirect to another IP in the same IFlow (=Second MPL)
            • Message in second IP in same flow fails. AMQP in first IP starts retry. Retried message is written to another queue. Process ends. (=No MPL. All these steps are not part of one of the both forementioned MPLs. A third/another MPL isn’t created. Even in trace mode, there’s no clue about the retry run. But it ran,otherwise the message would not been transfered to the second queue.)

            Note: I understand that raising a ticket is the correct way to go and I don’t expect you to answer on the above. I just wanted to point these things out, because you asked for it.

            Best regards,
            Raffael

          • Hi Raffael,

            thank you for the additional details. The experts for our mpl Logik need to have a look, it could be a combination of AMQP retry and process direct within same integration flow.

            Best regards

            Mandy

  • Hi, Are there any plans to make JMS Adapter or AMQP Adapter to connect to fully JMS compatible JMS providers like like IBM MQ,SonicMQ,Active MQ? What JMS providers are currently supported ? Any plans to enhance JMS Adapter similar to what we have in SAP PI/PO stack?

    • Hello,

      there are currently no such plans in near future. AMQP adapter is the adapter to connect to external messaging systems using AMQP protocol.

      Best regards,

      Mandy

  • Hi Mandy Krimmel, thanks for this really detailed blog post.

    I am trying to connect the AMQP sender adapter to a cloud platform enterprise messaging instance but cannot work out the adapter parameters.

     

    Is there any documentation on what to enter for:

    • host
    • port
    • path
    • authentication
    • credential name?

     

    I assume I can get these from the enterprise messaging instance service key – but which bits?

     

    Regards,

    Jason

    • Hi,

      yes, those are the details from the Enterprise Messaging service instance/key.

      You need the bit for the AMQP access, section amqp10ws in the service key.

      BR

      Mandy