Skip to Content
Technical Articles
Author's profile photo Carlos Roggan

SAP Cloud Application Programming Model and Enterprise Messaging [3]: Receive message

With other words:

How to create a
CAP Application
which receives messages from
Enterprise Messaging
in SAP Cloud Platform

 

Quicklinks:
Create CAP app
Deploy and Test
All Project Files

In the previous tutorial  we learned how to connect a CAP application to Enterprise Messaging and how to send messages
Before sending messages, we had to prepare the message broker:
We opened the Messaging Dashboard and created a queue and a subscription
We also used the test tab to consume the messages
However: that was just for learning purpose
We can now delete the subscription and queue

Upps….Why?

Continue reading to learn why…

Overview

In this tutorial we’re going to create a CAP application which connects to Enterprise Messaging to receive (consume) messages from there
Means, we subscribe to a “topic”
Whenever anybody publishes a message to that topic, the broker will forward it to our app.
In our app, to keep the code minimalistic, we just log the received message to the console
In productive scenarios, a receiving CAP application might do some calculations with the message content (payload) and update the database

Note:
The messaging-integration in CAP is currently in BETA-mode
The code described below is valid from version 3.30 onward
If you need to work with old version, see appendix for legacy api

Prerequisites
Create CAP application
Deploy and Test

Prerequisites

This tutorial is the continuation of previous blog, same prerequisites hold true here as well
Experienced CAPiers can skip the previous part

Create CAP Application

Since you are familiar with CAP projects, I’m only mentioning the few steps required to send messages to Enterprise Messaging with CAP.

Note that below description is valid for CAP version from 3.30 onward.
The old API can be viewed in the appendix

See the appendix section for full content of files

This example is written in Node.js, however, in Java it would be similar

manifest.yml

Like in previous tutorial, we need to bind our app to the same instance of messaging service

applications:
- name: capreceiver
  . . .
  services:
    - msgcustcare

package.json

Like in previous tutorial, we need to specify dependency and configuration for messaging.

  "dependencies": {
    "@sap/xb-msg-amqp-v100": "latest"
    . . .
  },
  "cds": {
    "requires": {
      "messaging": {
        "kind": "enterprise-messaging"

receiver-service.cds

We don’t need anything here, but since the exposed OData service cannot be completely empty, we need to add a dummy entity definition

service SenderService {
    entity DummyEntity {
        key dummyID : Integer;
    };     
}

receiver-service.js

Now again the interesting part of this blog:
What do we need to code in order to receive messages?

First:
As usual, define an event handler:

srv.on('...

Second:
Specify event name for the handler.
Here we need special attention:
We have to enter the fully qualified event name, more precisely: “topic” name.
Means: it must start with the namespace, just like we see it in the dashboard

  srv.on('company/customer/care/demo/customer/created', async (msg) => {  

How do I know the topic to write here?

In terms of messaging, we subscribe to a “topic”.
To use a trivial example:
We could be interested in soccer match results, so we subscribe to topic like soccer/results/premierleague

What I want to say: the topic usually already exists and we know it and we subscribe to it because we want to get notifications whenever any news arrive in that topic (are published to that topic)

In our example, we take the topic to which our senderapp sends messages
Other example could be: use the topic to which S4HANA sends its business events

So, to answer the question:
Either the “publisher” tells us the topic, or we take it from the Messaging Dashboard

What do we do with it?

The sender publishes little portions of information in the messages.
We receive that information in the messaging handler function parameter

Implementing a messaging handler means to read the data out of the provided msg instance
The structure is predefined:

msg.event
This property contains the name of the event which was fired. It is the same to which we subscribed
msg.headers
This property contains metadata about events, e.g. when fired in a backend like S4HANA
msg.data
This property contains the actual payload

In our sample implementation we just write the payload to the console:

srv.on('company/customer/care/demo/customer/created', async (msg) => {  
   const messagePayload = JSON.stringify(msg.data)
   console.log('===> Received message : ' + messagePayload)

Note;
Have you noticed?
The required code and configuration is (once more) REALLY VERY small and convenient…!
That’s CAP…

Deploy and Test

After deployment: we don’t open the app
The app doesn’t have anything to show.
Instead, we open the Messaging Dashboard

First:
Check Messaging Dashboard

We can see – not without a little surprise – that a new queue is there:

It has been generated by CAP.
CAP knows that the queue name has to start with the namespace and CAP knows the namespace because it is contained in the environment variables of our deployed application.
To make the queue name unique, CAP appends some hash + the name of the app + the name of the Service.

Once more I’m convinced that it is helpful to use silly names.
Like now: we can immediately understand how the queue name is composed

Next, let’s check the subscription:
It has been nicely generated, just what would have done it manually:

CAP  takes the topic name which we used in the handler implementation

Note:
If the topic doesn’t contain segments, then the topic name needs prefix:
srv.on(‘ topic:customercreated’

Note:
If you deploy the app before having defined a messaging handler in the code, then CAP doesn’t generate any queue subscription

Note:
If you expect the magically generated queues to magically disappear once you don’t need them anymore:
Forget it.
We have to manually delete the queues which were generated by CAP

OK.
We’ve learned that we should carefully choose:
– the name of the connection in messaging configuration (package.json)
– the name of the topic
– the name of the application

Second:
Now we want to see the messaging in action

We open the URL of the sender-app, which we created in the previous tutorial
We use it now to send messages to the Enterprise Messaging broker
In my example:
https://capsender.cfapps.eu10.hana.ondemand.com/sender/send()

We send message by executing that URL (function import)
Then we quickly go to dashboard and press refresh: the number has increased to 1
Refresh again: the number is back to 0

Reason: the message in the queue has been consumed by our receiver app
It disappears from the queue because it isn’t needed anymore

In case you don’t see the increased number: you need to be faster

Third:
Now we want to see what our app writes to the log.
In command prompt, we run the command to stream the Cloud Foundry logs:

cf logs capreceiver

We execute the send()-URL again and observe the logs:
Every time we execute the function import, an entry is immediately written to the log:

Fourth:
Optional Test

Let’s disable the receiver app to avoid that messages are immediately consumed
In Cloud Foundry, we can stop an application with the following command:
cf stop capreceiver

Then we send messages with the sender app
We observe how the number of messages increases
Then we start the receiver app again:
cf start capreceiver

Then we check the log and the dashboard:
All accumulated messages are consumed, as soon as the starting app connects to the messaging broker

Fifth:
Optional Test

Let’s create more queues and see what happens
If you’ve followed the previous blog you might have already noticed:
All queues that are subscribed to our topic are receiving the messages
Our receiver app only consumes the messages that are waiting in the queue generated by the app

Summary

In this tutorial we’ve learned how to receive messages from Enterprise Messaging in a CAP application.
Briefly:
package.json: add dependency and “requires” section for messaging
service.js: connect to messaging and register messaging handler for desired topic

Troubleshooting

If you have headache because messages aren’t arriving in Messaging Queue, you should check this guide

Links

Previous Blog: see here
Intro Blog: here
Extra Blog: local development
CAP managed messaging using keyword “event” in CDS model

CAP documentation : https://cap.cloud.sap
CAP messaging: https://cap.cloud.sap/docs/guides/consuming-services

SAP Help Portal: What is Enterprise Messaging
SAP Help Portal: Enterprise Messaging: Naming Syntax

Appendix 1: All Project Files

Again, our little sample app doesn’t contain a data model.
Only a service model file, to be placed in the “srv”-folder

manifest.yml

---
applications:
- name: capreceiver
  host: capreceiver
  path: .
  memory: 128M
  buildpacks:
    - nodejs_buildpack
  services:
    - msgcustcare

package.json

{
  "dependencies": {
    "@sap/cds": "^3",
    "express": "^4",
    "@sap/xb-msg-amqp-v100": "latest"
  },
  "cds": {
    "requires": {
      "messaging": {
        "kind": "enterprise-messaging"
      }
    }
  },
  "scripts": {
    "start": "npx cds run"
  }
}

receiver-service.cds

service ReceiverService {
    entity DummyEntity {
        key dummyID : Integer;
    }; 
}

receiver-service.js

const cds = require ('@sap/cds')

module.exports = cds.service.impl ((srv) => {
  
  srv.on('company/customer/care/demo/customer/created', async (msg) => {  
    const messagePayload = JSON.stringify(msg.data)
    console.log('===> Received message : ' + messagePayload)
  })
})

 

Appendix 2: Legacy API

package.json

{
    "dependencies": {
        "@sap/cds": "latest",
        "@sap/xb-msg-amqp-v100": "latest",
        "express": "^4.17.1"
    },
    "cds": {
        "requires": {
            "myMessagingReceiver": {
                "kind": "enterprise-messaging"
            }
        }
    },
    "scripts": {
        "start": "cds run",
        "build": "cds build/all --clean",
        "deploy": "cds deploy"
    }
}

receiver-service.js

const cds = require ('@sap/cds')

module.exports = cds.service.impl ((srv) => {

  const myMessaging = cds.connect.to("myMessagingReceiver")
  
  myMessaging.on('company/customer/care/demo/customer/created', async (msg) => {  
    const messagePayload = JSON.stringify(msg.data)
    console.log('===> Received message : ' + messagePayload)
  })
})

Assigned Tags

      12 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Jon Viter
      Jon Viter

      Carlos Roggan I love your last 3 blogs on enterprise messaging.

      I am trying to write some unit tests usings JEST (or something else) and am having some issues getting the srv.on(topic) handler in my jest test to fire. I want to verify messages are getting put onto the bus. I also want to test another service that does the real async processing and in my tests put some messages on a bus, and verify the processing.

      Have you had any luck testing message consumers and message producers?

       

      Jon Viter

      Author's profile photo Mikhail Ponomarev
      Mikhail Ponomarev

      Hi Carlos Roggan,

      Thanks you! The post is absolutely must-have to be in the CAP documentation.

       

      But there are some rookie questions:

      1. Who initiates reading the queue? Is it the consumption service we defined? if not then what will happen with the message if the consumption service is down? I don't want to lose it.
      2. What will happen if the consumer will raise an error? E.g my HANA is down now by some reason. Again, I don't want to lose my message. I'm completely OK to process the whole queue later when it will be restored. So let's say we just add next row to our handler (or better to say worker)
        throw new Error('I cannot process the message right now');​

        What will happen with the message on a Mesh side? Will it be erased from the queue or not?

      3. What if I have multiple consumers for the same topic? Will each message be available for each consumer?
      4.  What should I know in addition if my consumer has multitenancy? E.g it subscribed to sales order creation. So messages should be exchanged only within the tenant.

      I have to be honor: I'm working on SAP internal project now so the answers are critical for us.

      Would appreciate if you reply that.

      Thank you!

      Best regards, Mikhail.

      Author's profile photo Minh Tri Le
      Minh Tri Le

      Hi Carlos Roggan

      Thanks for great info.

      I have a questions: what do we do with failed messages? For example, when processing messages, some errors may happen. In that case, how do we handle it?

      Thanks.

      Minh

      Author's profile photo Pradeep Panda
      Pradeep Panda

      Hi Minh,

      You have mentioned here about some issue while processing the message. I assume processing the message will come after consuming it from the queue. Once it is read from the queue, the message will be deleted. So any error while processing, have to be taken care by the business.

      If you are talking about some other kind of error/failure, can you please elaborate little more?

       

      regards

      Pradeep

      Author's profile photo Maximiliano Colman
      Maximiliano Colman

      Hello, is there any way to consume text and not json messages from the queue?, it seems that is always expecting json messages the cap amqp client.

      Author's profile photo Carlos Roggan
      Carlos Roggan
      Blog Post Author

      Hello Maximiliano Colman ,
      When I wrote the blog post, CAP expected the message to be in JSON, otherwise it wouldn't be consumed.
      Not sure if that behavior has changed in the meantime.
      If you have such requirement, you could think about posting it to CAP github issues.
      The CAP integration for msg is a wrapper around the amqp client lib.
      As a workaround, you can directly use the amqp client in your app.
      KInd Regards,
      Carlos

      Author's profile photo Jait Purohit
      Jait Purohit

      Is there a way to consume messages from Kafka within CAP?

      Author's profile photo Keerthana Jayathran
      Keerthana Jayathran

      Hi Carlos Roggan ,

      While receiving the message in CAP application,i am not able to see the received messages in logs even if i am logging the messages.Could you please help me with this.

      Regards,
      Keerthana

      Author's profile photo max jessi
      max jessi

      Hi @Carlos Roggan ,

      Do you know if it is possible to have several listeners to the same queue & process the messages in round-robin in a concurrent way?,for example:

       

      Queue name "messages": MSG_1,MSG_2;MSG_3

      Listeners:"LISTENER_1","LISTENER_2", "LISTENER_3"

       

      And the messaging is expected to be like:

      LISTENER_1->Process "MSG_1"

      LISTENER_2->Process "MSG_2"

      LISTENER_3->Process "MSG_3"

       

      As far I tested I only got:

      LISTENER_1->Process "MSG_1"

      LISTENER_2->Process "MSG_1"

      LISTENER_3->Process "MSG_1"

      Author's profile photo Carlos Roggan
      Carlos Roggan
      Blog Post Author

      Hi,
      Unfortunately, I'm not expert here. It sounds like you're trying to implement a kind of load-balancer, which is something a low-level middleware would handle, like messagebrokers probably are doing.
      In terms of asynchronous eventdriven scenarios, the concept probably doesn't guarantee am ordered sequence of msg processing. Just my superficial understanding.
      Kind Regards,
      Carlos

      Author's profile photo max jessi
      max jessi

      Hi Carlos,

      let me put it in another way, imagine that you have 100 messages in a queue and you have a consumer process that it take 1 minute to process 1 message, the processing time to consume all the messages will be 100 minutes.

      But imagine that it’s possible to consume in a concurrent parallel way 10 messages from the same queue, then the processing time to consume all the messages will be only 10 minutes.

       

      I hope that it’s more clear now, any idea if SAP CAP support this?.

      Author's profile photo max jessi
      max jessi

      Hi Carlos,

      I found the solution/way to do it in SAP CAP:

      "messaging_test": {
          "kind": "enterprise-messaging-shared",
          "queue": {
              "name": "test"
          },
          "amqp": {
              "outgoingSessionWindow": 100,
              "incomingSessionWindow": 100
          }
      },
      It seems that it is full of parameters that amqp section 🙂
      Kind Regards.
      Max.