Skip to Content
Technical Articles
Author's profile photo Rafael Schardosin

CAP – Specifying the queues to be consumed from SAP Event Mesh

Cloud Application Programming (CAP) is an excellent development framework if you are working with the SAP Business Technology Platform (SAP BTP), it provides you an easy way to create your data domain models and expose them as OData Services, which will use Hana Database as the persistence layer. All of this is done in an easy way, which requires minimal configuration.

But CAP is more than that, it has the goal to provide a framework that will allow you to consume other SAP BTP services in a seamless way, and providing you the best practices when using these services.

One of the additional capabilities of SAP CAP is to easily integrate with SAP Event Mesh, there are lots of blogs that show how to configure to be able to emit to and receive messages from SAP Event Mesh, it requires simple configuration and a bind between the SAP Event Mesh service and your application.

But this post has the goal to present when you cannot use the standard implementation from CAP to consume messages from SAP Event Mesh.

CAP by convention creates a single queue, which is in the format of CAP/0000 or {Application_name}/{4 digits of application_id}, to consume the topic you inform in the handler, handlers don’t handle queues directly, you need to specify a topic on it.

messaging.on('my/em/namespace/topic', async (msg) => { ... }

If you start specifying multiple handlers to consume multiple topics, it will consume the queues as expected by subscribing all these topics to the queue CAP/0000, CAP can distinguish when it receives a message in a queue, what is the topic associated with the message, and then it delivers to the right handler.

The “problem” is when you already have a queue to be consumed, for any reason you had to create it manually, for example, to configure a dead queue or any other configuration not supported by CAP for a queue.

In the configuration file (package.json) you can set which queue must be used by each messaging client, in this format:

{
  cds: {
    requires: {
      messaging1: {
        kind: 'enterprise-messaging-shared',
        queue: {
          name: 'my/em/namespace/my/queue/one'
        }
      },
      messaging2: {
        kind: 'enterprise-messaging-shared',
        queue: {
          name: 'my/em/namespace/my/queue/two'
        }
      }
    }
  }
}

with this configuration, when you create your client, it knows what queue name it must use.

const messaging = await cds.connect.to('messaging1')

The implementation above knows it must use the already created queue my/em/namespace/my/queue/one.

Easy solution, right? Well, be cautious with that!

CAP always expects a message in a queue to have a topic associated with it, in case you start consuming queues that contain messages not associated with topics, the message will be consumed by CAP, but it will not be delivered to any handler, once handlers ALWAYS expect a topic.

A good example of these scenarios are the dead queues, once a message reaches the max retry and is sent to a dead queue, this message will not be associated with a topic in the dead queue, and by consequence, CAP will not be able to consume it. So for dead queues, expects to use the @sap/xb-msg-amqp-v100 manually to consume it (in case of node.js).

Hope this helps!

Assigned Tags

      15 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo David Kunz
      David Kunz

      Hi Rafael Schardosin

      Thanks for this nice write up! I hope we can implement the feature to consume all messages from the queue, maybe with messaging.on('*', msg => {...}), but I'll have to think about that a bit more.

      Regarding the queue name: It's {application_name}/{4 digits of application_id} which defaults to CAP/0000 if the variables are not provided. This makes sure that every application gets its own queue (shared amongst multiple instances).

      Best regards,
      David

      Author's profile photo Rafael Schardosin
      Rafael Schardosin
      Blog Post Author

      Hi David Kunz,

      thank you for your continued support. I fixed the {4 digits of application_id} in the content, thank you for calling it out.

      One good thing would be to have a way to configure CAP to create the queues with the configuration for max retrial, dead queue, and max time to live, then the manual creation of the queues wouldn't be needed.

      Just sharing some thoughts, thanks again for your support! 🙂

      Author's profile photo David Kunz
      David Kunz

      Hi Rafael Schardosin,

      Thanks for fixing it!

      You can already pass the configuration options to the queue object in the configuration, e.g.

      {
        cds: {
          requires: {
            messaging: {
              kind: "enterprise-messaging-shared",
              queue: {
                name: "my/awesome/super/queue",
                maxRedeliveryCount: 100,
                deadMsgQueue: "my/awesome/super/dead/queue",
                maxTtl: 100,
                respectTtl: true
              }
           }
        }
      }

      All options specified here are allowed.

      Best regards,
      David

      Author's profile photo Rafael Schardosin
      Rafael Schardosin
      Blog Post Author

      Nice David!

      So basically we can define the queues and dead queues in the configuration file, establish the connection between them, and only the consumption from the dead queue would need manual consumption (until CAP introduces the feature to consume from queues or another way of consuming message with no topic associated to it)

      I'll update the post accordingly, this is really useful information.

      Thank you,

      Rafael

      Author's profile photo David Kunz
      David Kunz

      Yes, exactly!

      Author's profile photo Carl Ă–nnheim
      Carl Ă–nnheim

      Hi,

      We are trying this and have a few issues. First, the queuing service is only connecting if we keep the exact name "messaging" in the configuration.

      ... this works
          "cds": {
              "requires": {
                  "messaging": {
                      "kind": "enterprise-messaging-shared",
                      "queue": {
                          "name": "default/my/queue",
                          "maxDeliveredUnackedMsgsPerFlow": 5,
                          "maxRedeliveryCount": 10,
                          "deadMsgQueue": "default/my/dmq",
                          "maxTtl": 0,
                          "respectTtl": true
                      }
                  }
      ... but just changing "messaging" to "messaging1" makes the queue not connecte
                  "messaging1": {
                      "kind": "enterprise-messaging-shared",
                      "queue": {
                          "name": "default/my/queue",
                          "maxDeliveredUnackedMsgsPerFlow": 5,
                          "maxRedeliveryCount": 10,
                          "deadMsgQueue": "default/my/dmq",
                          "maxTtl": 0,
                          "respectTtl": true
                      }
                  }

      The service implementation is like this:

      module.exports = srv => {
          cds.connect.to("messaging").then(messaging => {
              messaging.on("default/my/queue", async (msg) => {
                  const messagePayload = JSON.stringify(msg)
                  console.log('===> Received message : ' + messagePayload)
              });
          });
      }

      We also notice that the dead message queue does not get created. A cosmetic issue (if we create it first it gets properly connected). But with that sorted we are still not getting messages to pass over to the dead message queue. We are throwing a simple error from the handler function and the result of that is that the message keeps getting repeated (but never goes into the dmq).

      Do you have any input on what we are doing wrong?

      Thanks in advance!

      //Carl

      Author's profile photo David Kunz
      David Kunz

      Hi Carl Ă–nnheim ,

      The name of your service in cds.requires must be the same as the one you connect to in your custom handler.

      That means:

      "cds": {
              "requires": {
                  "messaging1": {...}
                  }
              }
      

      in combination with

      module.exports = srv => {
          cds.connect.to("messaging1").then(messaging => {
              messaging.on("default/my/queue", async (msg) => {
                  const messagePayload = JSON.stringify(msg)
                  console.log('===> Received message : ' + messagePayload)
              });
          });
      }

      should work.

      Note: The dead msg queue is solely a feature of Event Mesh, this must be created manually before, CAP doesn't do that automatically, only for own queues.

      Maybe Tobias Griebe knows why the message doesn't get pushed into the dead queue after many failed attempts? (CAP delegates the queue configuration to Even Mesh).

      Thanks and best regards,
      David

      Author's profile photo Carl Ă–nnheim
      Carl Ă–nnheim

      Hi,

      Thanks for the quick reply. This is what we have been trying, sorry I was not clear on that aspect in the first comment. With "messaging" in "cds.requires" and in "conect.to" we can see in the output that it connects, like this:

      [cds] - server listening on { url: 'http://localhost:4004' }
      [cds] - launched in: 6.741s
      [ terminate with ^C ]
      
      [messaging] - Create messaging artifacts 
      [messaging] - Create queue { queue: 'default/my/queue' }
      [messaging] - Get subscriptions { queue: 'default/my/queuee' }
      [messaging] - Unchanged subscriptions [ 'default/my/queue' ] 

      If we have "messaging1" in both places, the log stops after "terminate with ^C" and the messaging does not work.

      I tried to isolate it a bit further (on version 5.4.3). I think it is related to this.

      // in @sap/cds/server.js, line 47, there is a dedicated connect to 'messaging' if specified
      
        // connect to essential framework services if required
        const _init = o.in_memory && (db => cds.deploy(csn).to(db,o))
        if (cds.requires.db) cds.db =  await cds.connect.to ('db') .then (_init)
        if (cds.requires.messaging)    await cds.connect.to ('messaging')             //<<<<<<<<<<<<<<<<<<<<<<
        if (cds.requires.multitenancy) await cds.mtx.in (app)
      
        // serve all services declared in models
        await cds.serve (o.service,o).in (app)
        await cds.emit ('served', cds.services)               //> hook for listeners
      
      // in @sap/cds/libx/_runtime/messaging/AMQPWebhookMessaging.js, line 17 there is a "startListening" hook which does the connection
      
          cds.once('listening', () => {
            this.startListening()
          })

      The "connect.to("messaging")" in "server.js" causes the "subscribedTopics" of the AMQPWebhookMessaging to be populated before the "listening" event occurs. If we use a different name they occur in the wrong order (subscribedTopics is not populated when the startListening method is called and nothing is connected).

      Adding this to a custom server.js makes the other-named messaging instance connect early and then it works properly.

      const cds = require('@sap/cds')
      module.exports = async (o) => {
          // Workaround to connect messaging services with a different name than "messaging"
          await cds.connect.to("messaging1");
          // Delegate to default server
          return cds.server(o);
      }

      Hope this helps finding the root cause.

      Thanks!

      //Carl

      Author's profile photo David Kunz
      David Kunz

      Hi Carl,

      Thanks for this nice writeup, yes, that explains the problem.

      You're connecting to 'messaging1' too late.

      I just saw that you didn't await the promise when registering the messaging handlers, this will lead to a race condition. So you need to write the following:

      module.exports = srv => {
          return cds.connect.to("messaging1").then(messaging => { // <--- return here
              messaging.on("default/my/queue", async (msg) => {
                  const messagePayload = JSON.stringify(msg)
                  console.log('===> Received message : ' + messagePayload)
              });
          });
      }

      Otherwise, when we cannot await the srv => {...} function.

      Best regards,
      David

      Author's profile photo Carl Ă–nnheim
      Carl Ă–nnheim

      Perfect, thanks! That did the trick!

      Author's profile photo Tobias Griebe
      Tobias Griebe

      Hi David Kunz & Carl Ă–nnheim,

      unfortunately the maxRedeliveryCount -> DMQ configuration is not supported when you are using WebHooks. To my knowledge the DMQ for WebHooks only evaluates TTL.

      maxRedeliveryCount -> DMQ works with AMQP, but the behaviour is unpredictable with webhooks.

      Best regards,
      Tobias

      Author's profile photo David Kunz
      David Kunz

      Hi Tobias Griebe ,

      Thanks for clarification, this is good to know!

      Best regards,
      David

      Author's profile photo Minh Tri Le
      Minh Tri Le

      Hi Rafael Schardosin ,

      Thanks for a great blog.

      I have a question.

      A good example of these scenarios are the dead queues, once a message reaches the max retry...

      How do you set max retry?

      Regards,

      Minh

      Author's profile photo Lochner Louw
      Lochner Louw

      Hey Rafael Schardosin,

      Nice article!

      One thing you can add is is that you can set the format to CloudEvents spec in the config too:

      {
        cds: {
          requires: {
            messaging: {
              kind: 'enterprise-messaging-shared',
              format: 'cloudevents'
            }
          }
        }
      

      It will produce something like by filling in all the header fields for you:

      {
        "type": "sap.cap.salesorder.v1.SalesOrder.Created.v1",
        "specversion": "1.0",
        "source": "/default/sap.cap/CAPCLNT001",
        "id": "0894ef45-7741-1eea-b7be-ce30f48e9a1d",
        "time": "2021-08-14T06:21:52Z",
        "datacontenttype": "application/json",
        "data": {
          "SalesOrder":"123456789"
        }
      }

       

      Cheers

      Lochner

      Author's profile photo Maximiliano Colman
      Maximiliano Colman

      Hello, is there any way to consume text and not json messages from the queue?, it seems that is always expecting json messages the cap amqp client.