Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
huijiezhang
Advisor
Advisor
0 Kudos

Introduction


The SAP BTP Event Mesh service allows applications to communicate through asynchronous events. Some representative usage scenarios of this service include:

  1. Achieve loose coupling of requestors and processors

  2. Implement asynchronous request and processing to improve overall performance

  3. Handling temporary off-line of backend systems

  4. Achieve message broadcasting


There are a few ways to implement a message consumer, this blog post focuses on consuming messages from a Message Queue and present an example of how to implement it. But first let's look at the typical ways of implementing a message consumer.

    1. Using the service APIs to poll messages

You can create a microservice that calls the message consuming API to poll messages in the queue. While this is a quick and easy way to mockup a message consumer for testing purposes, I'd not recommend this for serious usage. The consuming service APIs are for testing purpose only. There is a severe delay in getting a message using the consuming API.

2. Installing a Web Hook

You can create a microservice that provide an API end point and install it as a Web Hook on the Message Queue. While this is a good approach for streaming scenarios where it's ok to lose some packets when the service is offline, it does not fit if all messages are to be processed or if messages are to be processed in the order that they are produced.

3. Using an Integration Flow to consume messages

With the SAP Integration Suite Cloud Integration, you can create an Integration Flow to consume messages using the AMQP adaptor. The Integration Flow can then process the messages or forward the messages to the targeted service.

4. Creating a AMQP microservice to process the messages

SAP BTP Event Mesh service provides a Secure Web Socket that allows a message consumer to bind and consume messages using the AMQP protocol. If a Message Queue is configured with QoS = 1, the messages in the queue will be guaranteed to be delivered to the Web Socket client in the order that they are placed into the queue. This blog post illustrates how to implement a message consuming service using Node.JS and the npm package provided by SAP. This implementation is not constrained to run in SAP BTP only.

Use Case Scenario


As a message consumer, the microservice needs to connect to the AMQP Web Socket to get the messages in sequence and process the messages one by one. When a message is processed successfully, acknowledge the queue to de-queue the message and if message processing fails, stop processing messages. The failed message will stay in the queue.



Obtaining the Connection Information


Connection information should be obtained from the SAP BTP Event Mesh administrator.

The following information is needed to connect:

  1. OAuth Token URL

  2. Client ID

  3. Client Secret

  4. Web Socket URL

  5. Fully-qualified Queue Name (name space and queue name)


In BTP, each Messaging Client has a service instance. The connection information is found in the service key of the service instance. 

For each message consumer, administrator creates a new Service Key for the targeted Messaging Client. The Service Key contains all the information required above.

Node.JS – Dependent Package(es)


With Node.js, the only package we need is @sap/xb-msg

const msg    = require('@sap/xb-msg');

 

Preparing Connection Options


The Connection Options object contains all the information that’s needed to connect to the Web Socket and listening on the messages in the targeted queue:

const options = {

    "destinations": [{

        "name": "TheEventMeshSvcForMyProject",   <- Give any name

        "type": "amqp-v100",                                   <- Use this value or what's found in the Service Key

        "peer": {

            "type": "sapmgw"                                          <- Use this value

        },

        "oa2": {

            "endpoint": "https://...../oauth/token"       <- The provided OAuth Token URL

            "client": "……",                                        <- The provided OAuth Client ID

            "secret": "…..",                                        <- The provided OAuth Client Secret

        },

        "uri": "wss://…../protocols/amqp10ws",       <- The provided Web Socket URL

        "istreams": {

            "MyIStream": {                                        <- Specify the name of the input stream.

                "sndSettleMode": 0,

                "rcvSettleMode": 0,

                "source": {

                    "address": "queue:OrN/ClN/TESTQ",   <- The fully-qualified queue name

                    "expires": "connection-close",

                    "timeout": 0,

                    "durable": 0

                },

 

                "target": {

                    "expires": "connection-close",

                    "timeout": 0,

                    "durable": 0

                }

            }

        },

        "ostreams": {},

        "amqp": {}

  }]

};

 

Connecting to the Event Mesh


// Start messaging client

const client = new msg.Client(options);

client

.on("connected", () => {

    console.log("connected to SAP Event Mesh.");

})

.on("error", (err) => {

    console.log("Error when connecting to the SAP Event Mesh: " + err);

})

.on("disconnected", (hadError) => {

    console.log("Connection to SAP Event Mesh was lost, trying to reconnect in 60 seconds");

    setTimeout(()=> client.connect(), 60000);   <- Automatically re-connect when disconnected.

});

 

Subscribe to the Queue and Process Messages

client.istream("MyIStream").{ <- The name here has to math what’s in the “options” object.

on('subscribed', () =>

            console.log("Successfully subscribed to the target queue.");

})

.on("ready", () => {

            console.log("Stream is ready");

})

.on("data", (message) => {     <-This is the message handler

    console.log("Message received: " + message.payload.toString());

    let processSuccess = false;

    try {         <- Now process the message Put it in a try-catch-finally block

            ….       

            processSuccess = true/false;

}

catch(err) {

        processSuccess = false;  // very likely to be false.

}

finally {

       if( processSuccess {

            message.done();          <- De-queue the message

        }

        else {

            message.failed();         <- Let the message stay in the queue

            console.log("Failed to process the message. Reason:………. Exiting. ");

            process.exit(1);            <- Can’t continue. Exiting.

    }

}

});

 

// All handlers are ready and now connect:

client.connect();

 

Skipping a Message that Caused Error


If a message in the queue caused the message consumer to fail and after inspection, we want to ignore the message continue processing the next message, there are multiple ways to do so.

One way, in case you don't have access to the Event Mesh Cockpit is to use Postman to run the message consuming APIs to acknowledge the queue to de-queue the message:

  1. Get the message from the queue

  2. Using the message ID, de-queue the message.


To do so, you’ll need the Event Mesh Admin to provide the REST API end point as well as the API Client ID and Client Secret.

The following code segment demonstrate the process:

async function dequeueLastMessage() {

return new Promise( async (resolve, reject)=> {

    let oAuthToken = ….; //Obtain an OAuth token first.

    let resp = await Axios( {                           <- Get the last message

            method: "POST",

            url: "https://<The REST End Point>/messagingrest/v1/queues/<URL-Encoded Fully Qualified Queue Name>/messages/consumption",

            headers: {  "content-type": "application/json", "x-qos":"1", "Authorization":"Bearer " + oAuthToken}

        });

        if( resp.status == 200) {                              <- Message successfully retrieved.

            let messageId = resp.headers["x-message-id"];               <- Get the message’s ID

            let ack = await Axios( {                                                        <- Acknowledge the queue to remove the message.

                method: "POST",

               url: "https://<The REST End Point>/messagingrest/v1/queues/<URL-Encoded Fully Qualified Queue Name>/messages/" + messageId + "/acknowledgement",

                headers: {"content-type": "application/json", "x-qos":"1", "Authorization":"Bearer " + oAuthToken}

            });

            resolve(ack);

        }

    else {

            resolve(resp);

        }

    });

}

** Be sure to run this procedure only once to avoid de-queueing a good message.

Summary


This blog post provides a sample of how to connect to the SAP BTP Event Mesh using AMQP protocol and consume the messages in the message queue using Node.JS. Implementers can choose to use a different programming language and process the messages differently. For example, implementer can choose to download the messages first into its own data store and then process them. This could be more flexible for the message consumer when handling errors.