Technical Articles
Send AMQP messages from CPI to Enterprise Messaging and Consume them in a Node.js AMQP Application ?
Hi Cloud Platform Integrator!
In this blog post we will have a look on how to send messages from CPI via AMQP to the Enterprise Messaging Service. Once the messages arrive in the Queue, it will be consumed in a Node.js application running on Cloud Foundry.
This is a Blog series with the following topics in separated Blog posts:
Cloud Foundry Enterprise Messaging Webhooks ? |
Send AMQP messages from CPI to Enterprise Messaging and Consume them in a Node.js AMQP Application ? (this blog) |
WebSocket’s in SAP UI5 Multi Target Applications consuming Enterprise Messaging ? |
Introduction
To not reinvent the wheel and to credit earlier blogposts of fellow Community Members, I would like to point out the following blog posts:
Cloud Integration – Connecting to Messaging Systems using the AMQP Adapter
Cloud Integration – How to Connect to an On-Premise AMQP server via Cloud Connector
SAP CPI – AMQP Channel with Cloud Event Broker – SOLACE
https://blogs.sap.com/2020/02/08/sap-cpi-amqp-channel-with-cloud-messaging-solace/
This because these blogposts helped me to get started with AMQP Messaging Channels in Cloud Platform Integration. They are very detailed, well explained and worth to have look at.
In this blog post we will continue working on our earlier configurations, setup and developments from the previous blog post “Cloud Foundry Enterprise Messaging Webhooks ?”.
The prerequisites for this blog are:
1. A configured Queue called “ErrorQueue” in the Enterprise Messaging Cockpit.
2. Stop the earlier created Node.js application in the SAP CF Space, by pressing the stop-icon.
Once this application has stopped, you will see the “Requested State” has switched to “Stopped”.
3. Remove the current and earlier created Webhook Subscription in the Enterprise Messaging Cockpit.
This because a new Node.js App will be created which will consume these AMQP messages from the “ErrorQueue”. If the Webhook would still be subscribed, it would consume the messages from this Queue and there would be no more messages to be consumed by the new Node.js AMQP app.
Configuring the Cloud Platform Integration Flow
Since these blog posts are about demoing the capabilities off all these services in a combined End-To-End solution, I will briefly present this CPI-Flow.
This flow will only run once when it is deployed, because it is configured int the “Start Timer” control. This flow continues with the “Trigger Error Script” which holds the following code:
import com.sap.gateway.ip.core.customdev.util.Message;
import java.util.HashMap;
def Message processData(Message message) {
try {
def result = 3 * "7";
} catch (Exception e) {
throw e;
}
return message;
}
This code will try to multiply 3 with the string 7, which will fail and will throw an exception. This will call the “Exception Subprocess 1” that is going to catch this thrown error.
This “Exception Subprocess 1” holds a “Groovy Script” control with the following content:
import com.sap.gateway.ip.core.customdev.util.Message;
import java.util.HashMap;
def Message processData(Message message) {
byte[] bytes = '''{"errorMessage": "Error Message From CPI Flow!"}''';
message.setBody(bytes);
return message;
}
It will create a byte’s array and will set the bytes as body for the message to be send.
When this JSON payload message is not converted to a byte’s array, inconsistency with encoding will occur in the consuming Node.js app.
The outgoing connection to the “Receiver” its “Adapter Type” is “AMQP” while the “Transport Protocol” is of the type “WebSocket”.
Configuration parameter |
Value |
Adapter Type | AMQP |
Transport Protocol | WebSocket |
The AMQP “Connection” tab requires the following configuration:
Configuration parameter |
Value |
Host | enterprise-messaging-messaging-gateway.cfapps.eu10.hana.ondemand.com |
Port | 443 |
Path | /protocols/amqp10ws |
Proxy Type | Internet |
Connect with TLS | Checked X |
Authentication | OAuth2 Client Credentials |
Credential Name | {your-credential-name} |
The “Host” and “Path” will be the same for every implementation. This because it has been setup like this in the Enterprise Messaging Service by SAP. This information can be found in the “Cloud Foundry Space > Service instances > {Your-EM-Service} > Service Keys > {Your-EMService-Key}“.
The AMQP “Processing” tab requires the following configuration:
Configuration parameter |
Value |
Destination Type | Queue |
Destination Name | queue:ErrorQueue |
Expiration Period (in s) | (optional) |
Delivery | Persistent |
More information about sending and receiving on Queues and Topics (Destination Name) can be found here:
The last step is providing the OAuth2 Client Credentials in the CPI Environment under the “Security Material” as new “OAuth2 Credentials”.
Configuration parameter |
Value |
Name | Provided name in the CPI Flow as Credential name. |
Grant Type | Client Credentials |
Description | (optional) |
Token Service URL | The “tokenendpoint” property in the Enterprise Messaging Service Key configuration under the AMQP section. |
Client ID | The“clientid” property in the Enterprise Messaging Service Key configuration under the AMQP section. |
Client Secret | The“clientsecret” property in the Enterprise Messaging Service Key configuration under the AMQP section. |
Client Authentication | Send as Request Header |
Include Scope | Unchecked |
Deploying the Cloud Platform Integration Flow
This was a brief explanation of the CPI-Flow. Deploy this flow and see the number of messages inside the “ErrorQueue” increasing with 1 every time you deploy the flow. The flow will fail and the “Exception Subprocess 1” will created the message inside the “ErrorQueue” every time the flow fails.
Consume the AMQP Message in a Node.js App
A new Node.js application will be created, to setup a WebSocket that listens for AMQP messages on the “ErrorQueue” Queue.
The SAP HANA Academy already build such and application and shared it in a repository on GitHub. Clone this repository to your local IDE:
git clone https://github.com/saphanaacademy/em-consumer.git
The GitHub repository of this application from the SAP HANA Academy can be found here:
https://github.com/saphanaacademy/em-consumer
Inside the “mta.yaml” file change the green marked values with the name of your “Enterprise Messaging Service Instance” and the blue marked ones with “queue:ErrorQueue”. This because this is the Queue where the messages from CPI are subscribed to.
The next changes take place inside the “server.js” file. Change the following piece of code:
The code looks like this:
stream
.on('data', (message) => {
const payload = JSON.parse(message.payload.toString());
console.log('Error: ' + payload.errorMessage);
message.done();
});
The encoding is changed in the “.toString()” function and the value is logged to the console starting with “Event:”.
Remove the obsolete code in the file, since we do not require it for the moment.
The “server.js” file should look like this:
const express = require('express');
const app = express();
var cfenv = require('cfenv');
var appEnv = cfenv.getAppEnv();
var emCreds = appEnv.getServiceCreds(process.env.EM_SERVICE);
var emCredsM = emCreds.messaging.filter(function (em) {
return em.protocol == 'amqp10ws'
});
const options = {
uri: emCredsM[0].uri,
oa2: {
endpoint: emCredsM[0].oa2.tokenendpoint,
client: emCredsM[0].oa2.clientid,
secret: emCredsM[0].oa2.clientsecret
},
data: {
source: process.env.EM_SOURCE,
payload: new Buffer.allocUnsafe(20),
maxCount: 100,
logCount: 10
}
};
const {
Client
} = require('@sap/xb-msg-amqp-v100');
const client = new Client(options);
const stream = client.receiver('ErrorQueue').attach(options.data.source);
stream
.on('data', (message) => {
const payload = JSON.parse(message.payload.toString());
console.log('Error: ' + payload.errorMessage);
message.done();
});
client
.on('connected', (destination, peerInfo) => {
console.log('Connected!');
})
.on('assert', (error) => {
console.log(error.message);
})
.on('error', (error) => {
console.log(error);
})
.on('disconnected', (hadError, byBroker, statistics) => {
console.log('Disconnected!');
});
client.connect();
const port = process.env.PORT || 3000;
app.listen(port, function () {
console.info("Listening on port: " + port);
});
Use the following command to push the application to the Cloud Foundry Space:
cf push {optional-name-of-your-app}
Redeploy the Cloud Platform Integration Flow and see the error message from the CPI-Flow appear in the logs of the Node.js application in the Cloud Foundry Space:
Resend a message via Postman to the Queue and the message will also be consumed by the app:
Wrap up ?
This blog described how to send messages via the Cloud Platform Integration AMQP Adapter with the WebSocket Transport Protocol. The error messages from the CPI-Flow arrive in the dedicated Queue and are consumed by a Node.js Application running on Cloud Foundry.
In the next blog “WebSocket’s in SAP UI5 Multi Target Applications consuming Enterprise Messaging ?” WebSockets in SAPUI5 and Node.js will be explained, along with their usage and how to consume Enterprise Messaging messages in UI5 applications in real-time.
See you in the next blog!
Kind regards,
Dries
Hi Dries.
Great post!
Just wondering something as I have different parameters on the AMQP Receiver Adapter.
"Delivery" parameter is simply not available. I'm on CloudFoundry - Cloud Integration but I see the same in NEO CPI.
Have things changed in the meanwhile?
Thanks,
André
Hi Andre,
Thanks for your reaction!
I'm working on NEO and I just double checked in the current flow and also in a new one (created a new one).
I still see the "Delivery" parameter in the AMQP Receiver Adapter (Websocket) connection.
Maybe things changed indeed, but to be honest I have no idea what would have changed..
Best regards,
Dries
Hi Dries. I just realized that this was all confusion due to the CPI naming convention for the adapters.
AMQP Receiver Adapter is actually sending messages to a Queue and AMQP Sender Adapter is actually reading from a Queue.
All good and consistent between NEO, CF and Documentation.
Thanks for the reply,
André
Hi Andre,
Thanks for sharing!
Can be confusing sometimes indeed! Glad you discovered it!
Best regards,
Dries
Hi Dries Van Vaerenbergh
Great post and very nicely to follow step-by-step setup.
One question though: Using the xb-msg client you are using here (from the HANA Academy file), how would you go about controlling the flow of events? In your example there only occurs one error which is promptly consumed from the queue by your consumer app (basically upon occurrence). Say there are 100 events in the queue, how would you only consume 5, do something with them (here you console log them) and then consume the next 5?
Thanks in advance!
Best regards,
Dan
Hi Deniz,
Thanks a lot for your nice feedback! I do have to say you have some really interesting questions about the Enterprise Messaging topic.
I saw your previous question on the webhook blog as well, but I believe that Tobias provided an amazing and well explained answer to your question there. 🙂
I'm consuming the messages immediately indeed. It has been a while that I've played around with the Enterprise Messaging service, but if I remember correctly you could just use the rest-api to consume the messages in chronological order no?
In a very plain example that would mean 5 basic calls, to consume the 5 messages.
If I'm not mistaken, the consumption also takes place via a POST-request, which would mean that you cannot pass a query such as a skip or top.
So I would say, in a rest-api scenario that would mean 5 calls, and in the other scenarios I would have no idea myself at the moment.
If you find another and better performing solution than 5 requests, I would be glad to read it in the comments! 😃
Best regards,
Dries
Hi Dries Van Vaerenbergh
thanks for your feedback 🙂 I guess you caught me red-handed, lots of questions regarding Enterprise Messagin yes.
The issue I see with doing it with REST API calls is that you loose the near real-time feature of EM, since you kind of start polling from the queue. Also I'd really want to utilize the AMQP protocol. The only thing that's missing for me, is how to control to flow of evens from the client side, cause if you test around with 1 event all is good, but have 1,000 events and things start getting a bit more challenging in terms of event processing, but once consumed they are gone from the queue.
I'll keep fiddling around and get back to you if I come across a good solution. 🙂
Thanks alot!
Best regards,
Dan
Hi Deniz,
Most welcome!
If you would like to use the AMQP protocol to have some real-time processing, you could maybe combine it with CAP. This to store the required messages in a database to consume and query afterwards. But again that could be some overkill I guess.
DJ Adams also has a really nice series on SAP Developers about "Diving into messaging on SAP Cloud Platform". Maybe you can find some insights and solutions there.
Have fun and I'm looking forward to your experiences! 🙂
Best regards,
Dries
I am not sure whether I understand the full scenario, you implemented, but would like to comment from messaging part.
The most simple solution to see messages from a queue in a Node.js application would be to use a messaging client, here based on AMQP 1.0
One option would be to use https://www.npmjs.com/package/@sap/xb-msg-amqp-v100 directly, not via `@sap/xb-msg`.
The client allows you to specify `options.amqp.incomingSessionWindow`, so to say the maximum number of unsettled incoming messages in flight. As soon as one of them is acknowledged via `msg.done()` The broker sends another one.
You may set `incomingSessionWindow` to 5, but usually it is bigger, say 100 or more. It then runs faster. While first messages are reaching the client others are still on the way. The delay between sending the message to the client and receiving the corresponding acknowledge from client is compensated by allowing more messages to be in flight.
Another option would be to use EMS rest gateway if you wish to implement a WebHook on your end instead of using an AMQP client.
Best regards
Christoph
Hi All,
Event Mesh prototype was working with Cloud Integration earlier.
But now I am getting the below error:
com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: target (topic/queue) forbidden (220056) [condition = amqp:not-allowed].
Any pointers would be really helpful
Thanks
Shyam
Hi Shyam, did you resolve this issue? I am facing the same
Regards,
Alan
Have raised ticket with SAP. Issue still persists.
See the comments in Blog: https://blogs.sap.com/2019/11/20/cloud-integration-connecting-to-external-messaging-systems-using-the-amqp-adapter/comment-page-1/#comment-623432
Do let me know in case you get any pointers.
Thanks,
Shyam
Hi,
I managed to find the cause of this issue; at least my queue and application had different namespaces and I got this error when I tried to access it. Once I bound a new to queue to the application instance everything worked fine
Regards,
Alan
Hi Alan,
I created a new Queue under a fresh Message client but still it fails as I am trying to invoke the iflow from Postman.
Is there a way I can test it locally first. It used to work fine earlier
Thanks
Shyam
Hi Dries Van Vaerenbergh ,
Is it possible to consume the messages that are consumed from Event Mesh by an iFlow directly using a UI5 application without the microservice application.If not can you please explain what is the use of CPI to consume messages from any messaging application like event mesh and use a micro service to consume it from CPI when it can be directly consumed by a microservice.
Thank you
Regards,
Keerthana