Skip to Content
Technical Articles

Cloud Integration – Connecting to Messaging Systems using the AMQP Adapter

This blog describes options for configuring asynchronous message processing using the new AMQP adapter, which is available for customers to connect to messaging systems using the AMQP (Advanced Message Queuing Protocol) protocol. It describes the configuration, prerequisites and limits. The AMQP adapter is available for SAP Cloud Platform Integration customers with the 08-December-2019 release.

Connecting to Messaging Systems Using the AMQP Adapter

In many integration scenarios messages or events have to be exchanged between applications or systems via messaging systems. With the new AMQP adapter SAP Cloud Platform Integration can be used as provider or consumer of such messages or events. Cloud Integration can connect to messaging systems using the AMQP (Advanced Message Queuing Protocol) protocol version 1.0 and consume messages or events using the AMQP sender adapter or store messages or events in the message broker using the AMQP receiver adapter. Follow the steps described below to write to or consume from queues and/or topics in the message broker.

Prerequisite: Messaging System Setup

To be able to connect to queues or topics in the message broker, you have to create queues and/or topics in the message broker. This needs to be done on the messaging system with the configuration tools provided by the messaging system.

In some messaging systems you need to configure a Lock Duration to make sure the message is not consumed multiple times. This timeout must be longer than the expected processing time of the message, otherwise this would lead to duplicate messages.

Note that the monitoring of queues, topics and messages in the queues or topics can only be executed by using tools provided by the messaging system provider. These monitors are not integrated into Cloud Integration. In SAP Cloud Platform Integration you can monitor the integration flows using the AMQP adapter and the messages send to or consumed from the messaging system. You can find more details about those options in this blog in section ‘Monitoring’.

 

AMQP Receiver Adapter

The AMQP receiver adapter can be used to send messages from Cloud Integration to queues or topics in an external messaging system. A small sample scenario with a HTTP sender adapter and the AMQP receiver adapter is shown in the picture below. A message is received by the HTTP sender adapter, some headers are defined and conversions in a groovy script are done, the message is then sent to the messaging system via the AMQP adapter:

In the AMQP receiver channel the following configurations have to be made to connect to the messaging system. Note that these settings are very specific to the connected messaging system, some sample settings are listed below in the messaging system specific chapters.

  • Protocol:
    • Transport Protocol: Select the protocol the messaging system supports.
      • TCP
      • WebSocket

  • Connection:
    • Host: Specify the hostname of the messaging system.
    • Port: Specify the port of the messaging system.
    • Proxy Type: Select if you want to connect via Cloud Connector (On-Premise) or directly via the Internet, this configuration option is available with version 1.1 of the adapter (available with the January 2020 update). See blog How to connect to an on-premise AMQP server via Cloud Connector for more details.
    • Path: For WebSocket specify the access path of the messaging system.
    • Connect with TLS: Select if TLS has to be used for the connection.
    • Authentication: Select the authentication method the messaging system supports.
      • SASL (Simple Authentication and Security Layer)
      • OAuth2 Client Credentials
      • None
    • Credential Name: Select the alias of the deployed credentials.

  • Processing:
    • Destination Type: Specify if messages are to be sent to queues or topics in the messaging system.
      • Topic
      • Queue
    • Destination Name: Specify the name of the queue or topic. This value can be dynamically defined using header: ${header.queueabc} or property: ${property.queueabc}.
    • Expiration Period: Specify the TTL (Time to Live) for the message. If nothing is specified, the setting for the queue or topic subscription in the messaging system applies. How the defined TTL is interpreted by the messaging system depends on the messaging system.
    • Delivery: Specify if the messaging system has to make sure that the message is not lost, even in case of unexpected terminations.
      • Persistent
      • Non-Persistent

 

AMQP Sender Adapter

The AMQP sender adapter can be used to consume messages in Cloud Integration from queues or topic subscriptions in an external messaging system. A small sample scenario with the AMQP sender adapter and a SOAP receiver adapter is shown in the below picture. A message is consumed by the AMQP sender adapter from the messaging system, some conversions in a groovy script are executed, and then the message is sent to the receiver via the SOAP adapter:

In the AMQP sender channel the following configurations need to be done, sample settings below in the messaging system specific chapters:

  • Protocol:
    • Transport Protocol: Select the protocol the messaging system supports.
      • TCP
      • WebSocket

  • Connection:
    • Host: Specify the hostname of the messaging system.
    • Port: Specify the port of the messaging system.
    • Proxy Type: Select if you want to connect via Cloud Connector (On-Premise) or directly via the Internet, this configuration option is available with version 1.1 of the adapter (available with the January 2020 update). See blog How to connect to an on-premise AMQP server via Cloud Connector for more details.
    • Path: For WebSocket specify the access path of the messaging system.
    • Connect with TLS: Select if TLS has to be used for the connection.
    • Authentication: Select the authentication method the messaging system supports.
      • SASL (Simple Authentication and Security Layer)
      • OAuth2 Client Credentials
      • None
    • Credential Name: Select the alias of the deployed credentials.

  • Processing:
    • Queue Name: Specify the name of the queue or topic subscription to be consumed from.
    • Number of Concurrent Processes: Specify the number of processes used for parallel message processing. Note, that the processes are started from each worker node.
  • The Retry Details section will be available with the 10th-May 2020 update.
    • Max. Number of Retries: Specify the maximum number of retries that shall be executed in case there is an error during processing. Afterwards a different delivery status will be sent to the message broker (see next field).
    • Delivery Status After Max. Retries: Select the delivery status that shall be sent to the message broker after the specified retries were executed and the message processing still fails. The following two values are available: REJECTED and MODIFIED_FAILED_UNDELIVERABLE. The message broker can then handle such messages differently.

Retry Configuration

If an error occurs during the processing of the consumed message in Cloud Platform Integration, the message is not removed from the messaging system, but is retried again immediately as long as the maximum number of configured retries is reached. In case the messaging system does not support a different handling of the message when a different delivery status is sent, the retry may even go on forever.

There are different options available to handle errors during processing and configure the retry for messages consumed via AMQP:

  • Use Retry configuration AMQP sender adapter as described above (available with 10th May 2020 update)
  • Retry Configuration based on JMSRedelivered/JMSXDeliveryCount headers in Exception Sub-Process
  • Catch Error and configure alternate processing in Exception Sub-Process

If the used messaging system supports dead-lettering and delayed processing based on the delivery status as described for the AMQP sender adapter, then this is the recommended configuration option. If the messaging system does not support dead-lettering or you want to do a more specific handling, like sending a notification to an administrator after a certain number of retries you can use the configuration based on the two headers JMSRedelivered and JMSXDeliveryCount. The third option is the only option if the messaging system does neither support dead-lettering nor the headers.

Retry Configuration in AMQP Sender Adapter

If supported by the messaging system use the retry configuration in the AMQP sender adapter as shortly mentioned above.

The Retry Details section in the AMQP sender adapter will be available with the 10th-May 2020 update.

  • Max. Number of Retries: Specify the maximum number of retries that shall be executed in case there is an error during processing. Afterwards a different delivery status will be sent to the message broker (see next field).
  • Delivery Status After Max. Retries: Select the delivery status that shall be sent to the message broker after the specified retries were executed and the message processing still fails. The following two values are available: REJECTED and MODIFIED_FAILED_UNDELIVERABLE. The message broker can then handle such messages differently.

Most message brokers put messages with delivery status REJECTED into a dead letter queue, but for several message brokers a specific configuration in the message broker is required as well to achieve this. Some message brokers do not react on the delivery status at all. Check out the configuration options of the used message broker. Some specifics are mentioned below in the section about different tested message brokers.

Retry Configuration based on JMSRedelivered/JMSXDeliveryCount

There is no option to configure a delay in retry processing in the AMQP adapter because this is not supported by the AMQP protocol. One option to delay the retry in error cases is to configure this retry handling in the integration flow using dead letter queues in the messaging system (if this is supported by the messaging system). This can be configured using two headers set by most of the messaging systems:

  • JMSRedelivered: This header specifies, if the message was redelivered. Value ‘false’ means that this message was delivered for the first time, value ‘true’ tells you that this message was already tried to be delivered before.
  • JMSXDeliveryCount: This header specifies the number of processings for this message. Value 1 means that it is the first attempt to process this message, value 2 means that it is the second attempt, meaning it is the first retry.

Note that these headers are not set or not reliably set by some messaging systems. See below in the messaging system specific section for more details.

Configure Delayed Retry Processing

The first step to configure a delayed retry would be to add a Router step and another Receiver to the integration flow consuming the message:

In the Router the route to the second receiver has to be configured in such a way that messages which are consumed for the second time are sent to another queue in the messaging system. The Condition in the Router can be defined by using one of the above-mentioned parameters. In the described sample I use the ‘JMSXDeliveryCount’ parameter and define the Non-XML expression as ${header.JMSXDeliveryCount} > ‘1’.

In the AMQP receiver channel a different queue is configured. This queue has to be created in the messaging system. From this queue no integration flow consumes messages, it is only used as storage for the time to the next retry. In my sample I use the queue ‘retryqueue’. Important is that you should not define an Expiration Period for the message because after this time the message can usually not be consumed anymore, also not from the dead letter queue. This would not fit to the retry modeling we configure.

In the parameters for this queue it needs to be specified that messages are moved to a dead letter queue after a specific time interval (time to live). This configuration has to be made in the messaging system by using the tools provided by the messaging system.

From this dead letter queue we can then consume the message and put it back to the original processing queue. This is done in a separate integration process:

The AMQP sender adapter needs to be configured to consume messages from the dead letter queue of the retry queue. In case Microsoft Azure Service Bus is used as messaging system, the dead letter queue can be accessed by using the following syntax: ‘<queue>/$deadletterqueue’: This may be different for other messaging systems.

The process writes the message back to the original processing queue via an AMQP receiver adapter so that it can be consumed from there again.

In addition to these configurations, you need to make sure the header JMSXDeliveryCount is retrieved by the integration flow. For this we configure the header as Allowed Header in the Runtime Configuration of the integration flow:

Now we can deploy the integration flow and test the processing:

As soon as a message is available in the processing queue in the messaging system, the message is consumed by Cloud Integration. If there is an error during processing, the processing fails and an immediate retry is executed. This retry stores the message in the separate retry queue, where the message is parked for the amount of time configured in the messaging system. As the message is not consumed from this retry queue it is moved to the dead letter queue after the configured time to live.

As soon as it is available in the dead letter queue the second process consumes the message and stores it to the original processing queue. Form there it is consumed and tried to be processed again. The same retry procedure would be triggered if again an error in processing occurs.

You see, with this we are able to define a kind of retry interval, the message would only be retried after a defined time interval. What is not possible with this configuration is to stop a message after a certain number of retries. How this can be done, we see in the next section.

Stop Retry Processing after x Retries

First, we need to introduce a counter that counts the numbers of executed retries. For this we cannot use the JMSXDeliveryCount header set by the messaging system because we move the messages between different queues and so the JMSXDeliveryCount is always reset. We define our own counter in the process that writes the message back to the processing queue. We do this in a small groovy script:

In the script we set a header RetryCount.

Note that headers with prefix JMS are not allowed, such headers are not forwarded by the messaging systems.

import com.sap.gateway.ip.core.customdev.util.Message;
import java.util.HashMap;

def Message processData(Message message) {
    // Get Headers
    map = message.getHeaders();
    retry = map.get("RetryCount");
    if (retry == null) {
          retry = 1;
       }
       else {
          retry = retry + 1; 
       }   
    message.setHeader("RetryCount", retry);
   return message;

}

Furthermore, we extend the configuration in the main process that consumes messages from the processing queue by adding an additional Router step:

In this Router step we evaluate the RetryCount header, and if it is greater or equal 5 we do some alternative handling and stop the retries. In my sample I simply write the message to a data store, but any other configuration could be done as well.

As a last step we need to add the RetryCount header in the Allowed Headers in the Runtime Configuration of the integration flow:

 

Now we can deploy the updated integration flow and test the processing:

As soon as a message is available in the processing queue in the messaging system, the message is consumed by Cloud Integration. If there is an error during processing, the processing fails and an immediate retry is executed. This retry stores the message in the separate retry queue, where the message is parked for the amount of time configured in the messaging system. As the message is not consumed from this retry queue it is moved to the dead letter queue after the configured time to live.

As soon as it is available in the dead letter queue the second process consumes the message, sets the RetryCount header and stores it to the original processing queue. From there it is consumed and as the numbers of executed retries are checked. The message is only retried 4 times; during the 5th retry, the message is written to the data store and the processing stops.

Configure Exception Handling for Messaging Systems not Supporting the Headers

As mentioned already, not all messaging systems support the two headers to configure explicit retry handling, for example Enterprise Messaging. In this case the only option to avoid endless retries that lead to a huge amount of message processing log, is to configure an exception handling in case of an error:

Configure an Exception Subprocess in your integration flow to catch the error. Define an alternate processing for the message in the exception sub-process. In the above case the message is stored to a data store, and afterwards an email is sent to the administrator to analyze the error and process the message differently. In case the message is stored to a data store, like in the above sample, a different integration flow could consume and process these messages. You can for sure also define a different error handling, like storing the message to a different queue (kind of dead letter queue) on the messaging system.

It is important that you end the Exception Subprocess with a Message End event and not with an Error End event to make sure the message is marked as Completed and is removed from the original queue on the message broker to prevent endless retries.

Monitoring

There are different options to monitor the integration flows, the connection to the messaging system and the messages processed.

AMQP Connection Test (available with the 16-Feb-2020 update)

The Connectivity Test is available in Operations View in Web UI, in section Manage Security Material. Selecting the Connectivity Test tile from Overview Page opens the test tool offering tests for different protocols. To test the communication to the messaging system, the AMQP option is to be selected.

 

Integration Content Monitor

After the integration flow is deployed it can be found in the Manage Integration Content monitor. There you can check the status of the integration flow.

Poll Status (available with the 16-Feb-2020 update)

In the Status Details area you may find the status and the details about the current consumption:

If there is an error when consuming messages via the AMQP sender adapter the error would be shown here for the respective integration flow. In the Polling Information the status of the consumption is shown as Failed:

Message Processing Monitor

Processed messages can be monitored in the Message Processing monitor:

In case of an error when sending the message to the messaging system using the AMQP receiver the errors would appear in this monitor:

Messaging System Monitor

Queues, topics and messages in the messaging system can be monitored using the monitoring tools of the messaging system.

Mapping of AMQP Headers, Properties and Annotations to Headers in Cloud Integration

During transforming a AMQP message to a message in Cloud Platform Integration, or vise versa, the following AMQP headers, properties and annotations are mapped to headers in Cloud Integration:

AMQP Type        AMQP Name                   Header Name in Cloud Integration

header                  durable                              JMSDeliveryMode

header                  priority                               JMSPriority

header                  delivery-count                  JMSXDeliveryCount

annotation            x-opt-delivery-time       JMSDeliveryTime

property                message-id                      JMSMessageID

property                user-id                               JMSXUserID

property                to                                         JMSDestination

property               subject                               JMSType

property               reply-to                              JMSReplyTo

property                correlation-id                  JMSCorrellationID

property                absolute-expiry-time    JMSExpiration

property                creation-time                  JMSTimestamp

property                group-id                            JMSXGroupID

property                group-sequence             JMSXGroupSeq

AMQP application properties are transferred to Camel headers in CPI runtime with the same name and the other way round. Important is that the application properties are not allowed to start with JMS, else it would not be forwarded into a Camel header.

 

Exclusive Processing

In general there are two ways to provide exclusive (in-order) processing using an AMQP broker:

  • Exclusive queues provide a capability that will let only one consumer consume from a given queue at a time. If that particular consumer is stopped or crashes one of the other consumers takes over consuming the messages. This means this also works if the Cloud Integration tenant has multiple worker nodes.
  • Message groups are an optional feature of the AMQP 1.0 protocol. Groups are realized by using the AMQP property group-id. If an AMQP producer sets the group-id property at a message and the broker supports message grouping for that queue, all messages from the same message group will be processed one after the other. In our case, if the producer is a Cloud Integration system, you can set the AMQP group-id property by setting the camel header JMSXGroupID (see in the table above).

Note that not all messaging systems support exclusive queues and/or message groups and these feature may have to be activated explicitly for a queue in the messaging system configuration. See below in the messaging system specific section for more details.

 

Connections required for Processing

For the AMQP adapter you require a permanent minimum of one connections per worker node for consuming messages from a queue because each worker node needs a connection to the message broker. For storing a message to the messaging system the numbers of worker nodes is the maximum of required connections.

In total this means the numbers of connections per queue is:

number of worker nodes + parallel requests storing messages to the message broker (maximum: number of worker nodes)

This means you would need a maximum of 4 connections for one queue for a cluster with two worker nodes.

 

Supported External Messaging Systems

There are multiple providers of messaging system implementations available. Most of them support AMQP 1.0 and can be connected using the AMQP adapters. The most used messaging systems were tested by SAP Cloud Platform Integration. Note the required settings, configuration options and limitations below.

SAP Enterprise Messaging

The following configuration options are applicable for the SAP Enterprise Messaging:

  • Connectivity Options: WebSocket over TLS (Port 443 with Path: “/protocols/amqp10ws“)
  • Authentication Option: OAuth2 Client Credentials
  • Dead Letter Queue Handling based on Delivery Status: No dead letter queue configuration supported. This means there is an endless redelivery of the message in error case. You need to configure alternative processing using exception-subprocess (see chapter ‘Configure Exception Handling for Messaging Systems not Supporting the Headers’ above).
  • JMSRedeliverd/JMSXDeliveryCount supported: No
  • Exclusive Queues supported: Yes
  • Message Groups supported: No
  • Queues supported: Yes, queues  need to be accessed using the following syntax in AMQP adapter channel: “queue:<queue name>”
  • Topics supported: Yes, topics need to be accessed using the following syntax in AMQP adapter channel: “topic:<topic name>”
  • Topic subscriptions supported: Yes, queues can be subscribed to topics.

Size Limitations from SAP Enterprise Messaging

When using Enterprise Messaging consider the following important restrictions:

  • Maximum Number of Connections supported per service instance: 10
  • Maximum Numbers of Service Instances: 10

Taking the above connection requirements in consideration, when using Enterprise Messaging  with the AMQP adapter this means that you should use one service instance per queue.

Note, that for clusters with more than 5 worker nodes the AMQP adapter can currently not be recommended in combination with Enterprise Messaging.

For more details check out the documentation.

Microsoft Azure Service Bus

The following configuration options are applicable for the Microsoft Azure Service Bus:

  • Connectivity Options: TLS over TCP (Port 5671)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Supported, delivery status REJECTED puts the message to a dead letter queue, delivery status MODIFIED_FAILED_UNDELIVERABLE gives the message a deferred status in the messaging system (Message Deferral). Deferred messages stay in the queue, but are not consumed anymore.
  • JMSRedeliverd/JMSXDeliveryCount supported: Yes
  • Exclusive Queues supported: No
  • Message Groups supported: No
  • Queues supported: Yes
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, topic subscriptions can be accessed as separate, queue-like entities using the AMQP sender adapter: “[topic name]/subscriptions/[subscription name]”. Multiple subscriptions for a single topic can exist. It is not possible to send to topic subscriptions using the AMQP receiver adapter.

Solace PubSub+

The following configuration options are applicable for the Solace message broker:

  • Connectivity Options:
    • TCP (Port as defined in the Solace broker)
    • TLS over TCP (Port as defined in the Solace broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Supported, but the dead letter queue and the maximum redeliveries have to be configured in the message broker as well. Delivery status REJECTED puts the message to the dead letter queue. Delivery status MODIFIED_FAILED_UNDELIVERABLE must not be used as it is not supported and can lead to errors during processing.
  • JMSRedeliverd/JMSXDeliveryCount supported: No
  • Exclusive Queues supported: Yes
  • Message Groups supported: No
  • Queues supported: Yes
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics.

Apache Qpid Broker-J

The following configuration options are applicable for the Apache Qpid Broker-J:

  • Connectivity Options:
    • TCP (Port as defined in the Qpid broker)
    • TLS over TCP (Port as defined in the Qpid broker)
    • WebSocket (Port as defined in the Qpid broker)
    • WebSocket over TLS (Port as defined in the Qpid broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Yes, but the dead letter queue and the maximum redeliveries have to be configured in the message broker as well. Delivery status REJECTED puts the message to the dead letter queue after the number of redelivery attempts configured in the messaging system. Delivery status MODIFIED_FAILED_UNDELIVERABLE takes the message out of the processing as long as the connection to the messaging system is established. After restart of the integration flow or the worker node the message would be tried again. After the number of redelivery attempts configured in the messaging system the message is moved to the dead letter queue.
  • JMSRedeliverd/JMSXDeliveryCount supported: Yes
  • Exclusive Queues supported: Yes
  • Message Groups supported: Yes
  • Queues supported: Yes
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics.

Apache ActiveMQ 5 and Apache ActiveMQ Artemis

The following configuration options are applicable for the Apache ActiveMQ:

  • Connectivity Options:
    • TCP (Port as defined in the ActiveMQ broker)
    • TLS over TCP (Port as defined in the ActiveMQ broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Yes, the dead letter queue, the maximum redeliveries and a redelivery delay can be configured in the message broker as well. Delivery status REJECTED and MODIFIED_FAILED_UNDELIVERABLE put the message to the dead letter queue.
  • JMSRedeliverd/JMSXDeliveryCount supported: Yes
  • Exclusive Queues supported: No
  • Message Groups supported: Yes
  • Queues supported: Yes
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics.

Note that Amazon MQ Service is based on Apache Active MQ and so, the same configuration options apply. For details see blog How to connect to an Amazon MQ Service using the AMQP Adapter.

IBM MQ

The IBM MQ broker has only limited support for the AMQP protocol, it does not support addressing queues via AMQP protocol, see IBM MQ documentation. Because of this limitation the AMQP receiver adapter can only be used to write to topics on the IBM MQ broker, but the AMQP sender adapter cannot consume from IBM MQ because only queues are supported for consumption in Cloud Integration.

The following configuration options are applicable for IBM MQ:

  • Connectivity Options:
    • TCP (Port as defined in the IBM MQ broker)
    • TLS over TCP (Port as defined in the IBM MQ broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Not supported, as AMQP sender is not supported with IBM MQ.
  • JMSRedeliverd/JMSXDeliveryCount supported: Not supported, as AMQP sender is not supported with IBM MQ.
  • Exclusive Queues supported: No
  • Message Groups supported: No
  • Queues supported: No
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics to send messages to them.

RabbitMQ

In testing we found that RabbitMQ has problems with AMQP 1.0. Test was done with plugin version 3.8.2, which was the newest version available at this time. Only non-durable queues were processed by the AMQP adapter. For durable queues the processing hangs. This may change with newer plugin versions.

The following configuration options are applicable for RabbitMQ:

  • Connectivity Options:
    • TCP (Port as defined in the RabbitMQ broker)
    • TLS over TCP (Port as defined in the RabbitMQ broker)
  • Authentication Option: SASL
  • Dead Letter Queue Handling based on Delivery Status: Not supported, as only non-durable queues are supported.
  • JMSRedeliverd/JMSXDeliveryCount supported: Not supported,as only non-durable queues are supported.
  • Exclusive Queues supported: No
  • Message Groups supported: No
  • Queues supported: Only non-durable queues.
  • Topics supported: Yes
  • Topic subscriptions supported: Yes, queues can be subscribed to topics to send messages to them.
36 Comments
You must be Logged on to comment or reply to a post.
  • Excellent write-up. Looking forward to a native adapter for this.

    How would we go about setting/reading message annotations and application properties on messages?

      • Hello Joachim,

        Camel headers in CPI runtime are transferred to AMQP application properties and the other way round on CPI inbound side.

        As for message annotations: we currently transfer the annotation x-opt-delivery-time into the header JMSDeliveryTime, other annotations are not supported. Could you maybe provide some more details on the scenarios you require AMQP message annotations? What kind of support would be required for the intended scenarios? Which annotations are required?

        Best regards,

        Mandy

         

         

        • Hi,

          Thanks for the reply.

          In our case, we need to be able to read/write AMQP standard properties, like subject (Label on Azure ServiceBus), as well as application properties (which show up as Custom Properties on Azure ServiceBus) and the message annotation x-opt-scheduled-enqueue-time.

          If everything except the message annotation is currently supported we can use the adapter for some things, so that’s good. But we do have some integrations where ScheduledEnqueueTime is used.

          Best regards,
          Joachim

          • Hello,

            AMQP property subject is transferred to the JMS property JMSType, which is transferred to a header in CPI. I will extend the blog with the complete list of properties we map shortly.

            x-opt-scheduled-enqueue-time we currently cannot transfer as this is not supported by our underlying open source components. We would have to check if and how we can provide this.

            Best regards,

            Mandy

          • see new chapter ‘Mapping of AMQP Headers, Properties and Annotations to Headers in Cloud Integration’ in the blog.

            Best regards,

            Mandy

    • No, absolutely not. AMQP adapter can be used to connect to any messaging system via AMQP protocol. JMS adapter can only connect to the internal in CPI embedded messaging.

      BR,

      Mandy

  • Hi Mandy Krimmel,

    I’m trying to follow up your blog using HTTPS to AMQP but I’m not sure which headers are required

    Can you help me understand the headers properties?

    I’m getting the error below: 

    com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: AMQP SASL header mismatch value 0, expecting 41. In state: HEADER0.

    Thanks

    • Hello,

      I do not really understand your question. Which headers are you referring to?

      If you want to send the message via AMQP to the message broker you do not need to set any headers, the message is sent as is. The headers in the table are only relevant if needed in your scenario. For some scenarios you need to fill specific headers to be transferred as JMS properties.

      For a simple scenario from any sender to AMQP receiver you do not need to care about headers.

      Best regards,

      Mandy

      • Thank you so much Mandy Krimmel I did the changes and I’m facing a different error I will double check cloud connector settings with basis team

        com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: socks5, password, localhost/127.0.0.1:20004 =>            :61616, status: FORBIDDEN(2)

    • Hi Leomilde!

      What AMQP provider do you try to connect to?

      I’ve faced the similar issue recently while tried to send messages to RabbitMQ. The point was that RabbitMQ used AMQP protocol version 0.9.1 by default and CPI AMQP adapter used protocol of version 1.0.

      Enabling plugin for 1.0 version of AMQP protocol on RabbitMQ server resolved the problem.

      Regards, Evgeniy.

      • Hi Evgeniy,

        I’m using OASIS AMQP 1.0

        I did some changes to my iflow (just sending the message as is, no transformations needed) per Mandy suggestions and getting different error now, looks like is a cloud connector issue

        com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: socks5, password, localhost/127.0.0.1:20004 =>               :61616, status: FORBIDDEN(2)

        I will check agan next week once test connectivity for AMQP is released

        Thanks

      • Hi Evgeniy,

        Glad to know that you were able to connect to RabbitMQ server with AMQP1.0. I’m trying to connect but SAP’s AMQP Adapter doesn’t have a field to provide virtual host/remote host name and my connection to the remote server is failing as the Rabbit MQ server is not able to connect to the correct virtual host.  Can you please tell me what you did to make it work?

        Thanks
        Santosh

         

         

  • Hi,

     

    I am trying to talk to azure service bus using shared key and i deployed shared key as security parameter. I get the below error.

    Exception while processing SASL init: Crededentials objects of kind secure_param are not supported for SASL authentication

    Can you please help, what should i deploy the credentials like.

     

    Thanks,

    Vijaya Palla

  • Thanks Mandy.

    I deployed them as credentials and it worked. I am able to poll and push messages.

     

    Thanks,

    Vijaya Palla

     

     

    Thanks,

    Vijaya Palla

  • Mandy Krimmel  : I tried using pass through from Azure service bus Queue named Q1 (with source 5000 message) to Azure Queue named Q2 . But shockingly i received 8000+ messages in Q2. Why is this happening ?

     

    Note: 1. Transaction handling was none

    2. Concurrency was 10

    3. I used both persistent and non persistent

    4. But if i use JMS in between, the number is exact ! Is this known bug ?

    • This sounds like there were issues in consuming and the message is then retried. Have you configured exception handling in the integration flow?

      Transaction Handling is not relevant, AMQP adapter does not share a transaction. Usually if a message is successfully processed the message is removed in the message broker because a successfull acknowledgement is sent back. If there is an error, the message is tried to be consumed again.

      This needs to be checked in detail, this should not happen.

      Could you please open a ticket on LOD-HCI-PI-RT and add the integration flow and a clear error description?

      Thank you,

      Mandy

    • A dedicated Kafka adapter is indeed on the roadmap. According to the current planning it is targeted for Q3/4 2020.

      BR,

      Mandy

  • Mandy Krimmel  Might be silly but what is the syntax for pushing to Topic subscription’s deadletter (as a receiver from some other iflow (JMS to AMQP)) ?

     

    “In case Microsoft Azure Service Bus is used as messaging system, the dead letter queue can be accessed by using the following syntax: ‘<queue>/$deadletterqueue’”

    • Hi,

      this syntax makes sense if you want to consume from the deadletter queue, not for pushing towards it. The AMQP sender adapter consumes messages from the queue, and this is what is described there.

      We have not tested how to push to a deadletter queue directly, is there really a use case for this? Usually moving to the deadletter queue is done by the messaging system directly.

      Have you tried with the above syntax? Did you get an error?

      BR

      Mandy