Skip to Content
Technical Articles

Send AMQP messages from CPI to Enterprise Messaging and Consume them in a Node.js AMQP Application πŸ“¨

Hi Cloud Platform Integrator!

In this blog post we will have a look on how to send messages from CPI via AMQP to the Enterprise Messaging Service. Once the messages arrive in the Queue, it will be consumed in a Node.js application running on Cloud Foundry.

This is a Blog series with the following topics in separated Blog posts:

Cloud Foundry Enterprise Messaging Webhooks πŸ“¨
Send AMQP messages from CPI to Enterprise Messaging and Consume them in a Node.js AMQP Application πŸ“¨ (this blog)
WebSocket’s in SAP UI5 Multi Target Applications consuming Enterprise Messaging πŸ“¨

Introduction

To not reinvent the wheel and to credit earlier blogposts of fellow Community Members, I would like to point out the following blog posts:

Cloud Integration – Connecting to Messaging Systems using the AMQP Adapter

https://blogs.sap.com/2019/11/20/cloud-integration-connecting-to-external-messaging-systems-using-the-amqp-adapter/

Cloud Integration – How to Connect to an On-Premise AMQP server via Cloud Connector

https://blogs.sap.com/2020/01/17/cloud-integration-how-to-connect-to-an-on-premise-amqp-server-via-cloud-connector/

SAP CPI – AMQP Channel with Cloud Event Broker – SOLACE

https://blogs.sap.com/2020/02/08/sap-cpi-amqp-channel-with-cloud-messaging-solace/

This because these blogposts helped me to get started with AMQP Messaging Channels in Cloud Platform Integration. They are very detailed, well explained and worth to have look at.

In this blog post we will continue working on our earlier configurations, setup and developments from the previous blog post β€œCloud Foundry Enterprise Messaging Webhooks πŸ“¨β€.

The prerequisites for this blog are:

1. A configured Queue called β€œErrorQueue” in the Enterprise Messaging Cockpit.

2. Stop the earlier created Node.js application in the SAP CF Space, by pressing the stop-icon.

Once this application has stopped, you will see the β€œRequested State” has switched to β€œStopped”.

3. Remove the current and earlier created Webhook Subscription in the Enterprise Messaging Cockpit.

This because a new Node.js App will be created which will consume these AMQP messages from the β€œErrorQueue”. If the Webhook would still be subscribed, it would consume the messages from this Queue and there would be no more messages to be consumed by the new Node.js AMQP app.

 

Configuring the Cloud Platform Integration Flow

Since these blog posts are about demoing the capabilities off all these services in a combined End-To-End solution, I will briefly present this CPI-Flow.

This flow will only run once when it is deployed, because it is configured int the β€œStart Timer” control. This flow continues with the β€œTrigger Error Script” which holds the following code:

import com.sap.gateway.ip.core.customdev.util.Message;
import java.util.HashMap;
def Message processData(Message message) {
    try {
        def result = 3 * "7";
    } catch (Exception e) {
        throw e;
    }
    return message;
}

This code will try to multiply 3 with the string 7, which will fail and will throw an exception. This will call the β€œException Subprocess 1” that is going to catch this thrown error.

This β€œException Subprocess 1” holds a β€œGroovy Script” control with the following content:

import com.sap.gateway.ip.core.customdev.util.Message;
import java.util.HashMap;
def Message processData(Message message) {
    byte[] bytes = '''{"errorMessage": "Error Message From CPI Flow!"}''';
    message.setBody(bytes);
    return message;
}

It will create a byte’s array and will set the bytes as body for the message to be send.

When this JSON payload message is not converted to a byte’s array, inconsistency with encoding will occur in the consuming Node.js app.

The outgoing connection to the β€œReceiver” its β€œAdapter Type” is β€œAMQP” while the β€œTransport Protocol” is of the type β€œWebSocket”.

Configuration parameter

Value

Adapter Type AMQP
Transport Protocol WebSocket

The AMQP β€œConnection” tab requires the following configuration:

Configuration parameter

Value

Host enterprise-messaging-messaging-gateway.cfapps.eu10.hana.ondemand.com
Port 443
Path /protocols/amqp10ws
Proxy Type Internet
Connect with TLS Checked X
Authentication OAuth2 Client Credentials
Credential Name {your-credential-name}

The β€œHost” and β€œPath” will be the same for every implementation. This because it has been setup like this in the Enterprise Messaging Service by SAP. This information can be found in the β€œCloud Foundry Space > Service instances > {Your-EM-Service} > Service Keys > {Your-EMService-Key}β€œ.

The AMQP β€œProcessing” tab requires the following configuration:

Configuration parameterΒ Β Β Β 

Value

Destination Type Queue
Destination Name queue:ErrorQueue
Expiration Period (in s) (optional)
Delivery Persistent

More information about sending and receiving on Queues and Topics (Destination Name) can be found here:

https://help.sap.com/viewer/bf82e6b26456494cbdd197057c09979f/Cloud/en-US/72ac1fad2dd34c4886d672e66b22b54b.html

The last step is providing the OAuth2 Client Credentials in the CPI Environment under the β€œSecurity Material” as new β€œOAuth2 Credentials”.

Configuration parameterΒ Β Β Β 

Value

Name Provided name in the CPI Flow as Credential name.
Grant Type Client Credentials
Description (optional)
Token Service URL The β€œtokenendpoint” property in the Enterprise Messaging Service Key configuration under the AMQP section.
Client ID Theβ€œclientid” property in the Enterprise Messaging Service Key configuration under the AMQP section.
Client Secret Theβ€œclientsecret” property in the Enterprise Messaging Service Key configuration under the AMQP section.
Client Authentication Send as Request Header
Include Scope Unchecked

 

Deploying the Cloud Platform Integration Flow

This was a brief explanation of the CPI-Flow. Deploy this flow and see the number of messages inside the β€œErrorQueue” increasing with 1 every time you deploy the flow. The flow will fail and the β€œException Subprocess 1” will created the message inside the β€œErrorQueue” every time the flow fails.

 

Consume the AMQP Message in a Node.js App

A new Node.js application will be created, to setup a WebSocket that listens for AMQP messages on the β€œErrorQueue” Queue.

The SAP HANA Academy already build such and application and shared it in a repository on GitHub. Clone this repository to your local IDE:

git clone https://github.com/saphanaacademy/em-consumer.git

The GitHub repository of this application from the SAP HANA Academy can be found here:

https://github.com/saphanaacademy/em-consumer

More information about this consuming application can be found here:

Inside the β€œmta.yaml” file change the green marked values with the name of your β€œEnterprise Messaging Service Instance” and the blue marked ones with β€œqueue:ErrorQueue”. This because this is the Queue where the messages from CPI are subscribed to.

The next changes take place inside the β€œserver.js” file. Change the following piece of code:

The code looks like this:

stream
  .on('data', (message) => {
    const payload = JSON.parse(message.payload.toString());
    console.log('Error: ' + payload.errorMessage);
    message.done();  
});

The encoding is changed in the β€œ.toString()” function and the value is logged to the console starting with β€œEvent:”.

Remove the obsolete code in the file, since we do not require it for the moment.

The β€œserver.js” file should look like this:

const express = require('express');
const app = express();

var cfenv = require('cfenv');
var appEnv = cfenv.getAppEnv();
var emCreds = appEnv.getServiceCreds(process.env.EM_SERVICE);
var emCredsM = emCreds.messaging.filter(function (em) {
  return em.protocol == 'amqp10ws'
});

const options = {
  uri: emCredsM[0].uri,
  oa2: {
    endpoint: emCredsM[0].oa2.tokenendpoint,
    client: emCredsM[0].oa2.clientid,
    secret: emCredsM[0].oa2.clientsecret
  },
  data: {
    source: process.env.EM_SOURCE,
    payload: new Buffer.allocUnsafe(20),
    maxCount: 100,
    logCount: 10
  }
};

const {
  Client
} = require('@sap/xb-msg-amqp-v100');
const client = new Client(options);
const stream = client.receiver('ErrorQueue').attach(options.data.source);

stream
  .on('data', (message) => {
    const payload = JSON.parse(message.payload.toString());
    console.log('Error: ' + payload.errorMessage);
    message.done();
  });

client
  .on('connected', (destination, peerInfo) => {
    console.log('Connected!');
  })
  .on('assert', (error) => {
    console.log(error.message);
  })
  .on('error', (error) => {
    console.log(error);
  })
  .on('disconnected', (hadError, byBroker, statistics) => {
    console.log('Disconnected!');
  });

client.connect();


const port = process.env.PORT || 3000;
app.listen(port, function () {
  console.info("Listening on port: " + port);
});

Use the following command to push the application to the Cloud Foundry Space:

cf push {optional-name-of-your-app}

Redeploy the Cloud Platform Integration Flow and see the error message from the CPI-Flow appear in the logs of the Node.js application in the Cloud Foundry Space:

Resend a message via Postman to the Queue and the message will also be consumed by the app:

 

Wrap up 🎁

This blog described how to send messages via the Cloud Platform Integration AMQP Adapter with the WebSocket Transport Protocol. The error messages from the CPI-Flow arrive in the dedicated Queue and are consumed by a Node.js Application running on Cloud Foundry.

In the next blog “WebSocket’s in SAP UI5 Multi Target Applications consuming Enterprise Messaging πŸ“¨” WebSockets in SAPUI5 and Node.js will be explained, along with their usage and how to consume Enterprise Messaging messages in UI5 applications in real-time.

 

See you in the next blog!

Kind regards,

Dries

Be the first to leave a comment
You must be Logged on to comment or reply to a post.