Technical Articles
CAP – Specifying the queues to be consumed from SAP Event Mesh
Cloud Application Programming (CAP) is an excellent development framework if you are working with the SAP Business Technology Platform (SAP BTP), it provides you an easy way to create your data domain models and expose them as OData Services, which will use Hana Database as the persistence layer. All of this is done in an easy way, which requires minimal configuration.
But CAP is more than that, it has the goal to provide a framework that will allow you to consume other SAP BTP services in a seamless way, and providing you the best practices when using these services.
One of the additional capabilities of SAP CAP is to easily integrate with SAP Event Mesh, there are lots of blogs that show how to configure to be able to emit to and receive messages from SAP Event Mesh, it requires simple configuration and a bind between the SAP Event Mesh service and your application.
But this post has the goal to present when you cannot use the standard implementation from CAP to consume messages from SAP Event Mesh.
CAP by convention creates a single queue, which is in the format of CAP/0000 or {Application_name}/{4 digits of application_id}, to consume the topic you inform in the handler, handlers don’t handle queues directly, you need to specify a topic on it.
messaging.on('my/em/namespace/topic', async (msg) => { ... }
If you start specifying multiple handlers to consume multiple topics, it will consume the queues as expected by subscribing all these topics to the queue CAP/0000, CAP can distinguish when it receives a message in a queue, what is the topic associated with the message, and then it delivers to the right handler.
The “problem” is when you already have a queue to be consumed, for any reason you had to create it manually, for example, to configure a dead queue or any other configuration not supported by CAP for a queue.
In the configuration file (package.json) you can set which queue must be used by each messaging client, in this format:
{
cds: {
requires: {
messaging1: {
kind: 'enterprise-messaging-shared',
queue: {
name: 'my/em/namespace/my/queue/one'
}
},
messaging2: {
kind: 'enterprise-messaging-shared',
queue: {
name: 'my/em/namespace/my/queue/two'
}
}
}
}
}
with this configuration, when you create your client, it knows what queue name it must use.
const messaging = await cds.connect.to('messaging1')
The implementation above knows it must use the already created queue my/em/namespace/my/queue/one.
Easy solution, right? Well, be cautious with that!
CAP always expects a message in a queue to have a topic associated with it, in case you start consuming queues that contain messages not associated with topics, the message will be consumed by CAP, but it will not be delivered to any handler, once handlers ALWAYS expect a topic.
A good example of these scenarios are the dead queues, once a message reaches the max retry and is sent to a dead queue, this message will not be associated with a topic in the dead queue, and by consequence, CAP will not be able to consume it. So for dead queues, expects to use the @sap/xb-msg-amqp-v100 manually to consume it (in case of node.js).
Hope this helps!
Hi Rafael Schardosin
Thanks for this nice write up! I hope we can implement the feature to consume all messages from the queue, maybe with
messaging.on('*', msg => {...})
, but I'll have to think about that a bit more.Regarding the queue name: It's
{application_name}/{4 digits of application_id}
which defaults toCAP/0000
if the variables are not provided. This makes sure that every application gets its own queue (shared amongst multiple instances).Best regards,
David
Hi David Kunz,
thank you for your continued support. I fixed the {4 digits of application_id} in the content, thank you for calling it out.
One good thing would be to have a way to configure CAP to create the queues with the configuration for max retrial, dead queue, and max time to live, then the manual creation of the queues wouldn't be needed.
Just sharing some thoughts, thanks again for your support! 🙂
Hi Rafael Schardosin,
Thanks for fixing it!
You can already pass the configuration options to the queue object in the configuration, e.g.
All options specified here are allowed.
Best regards,
David
Nice David!
So basically we can define the queues and dead queues in the configuration file, establish the connection between them, and only the consumption from the dead queue would need manual consumption (until CAP introduces the feature to consume from queues or another way of consuming message with no topic associated to it)
I'll update the post accordingly, this is really useful information.
Thank you,
Rafael
Yes, exactly!
Hi,
We are trying this and have a few issues. First, the queuing service is only connecting if we keep the exact name "messaging" in the configuration.
The service implementation is like this:
We also notice that the dead message queue does not get created. A cosmetic issue (if we create it first it gets properly connected). But with that sorted we are still not getting messages to pass over to the dead message queue. We are throwing a simple error from the handler function and the result of that is that the message keeps getting repeated (but never goes into the dmq).
Do you have any input on what we are doing wrong?
Thanks in advance!
//Carl
Hi Carl Önnheim ,
The name of your service in
cds.requires
must be the same as the one you connect to in your custom handler.That means:
in combination with
should work.
Note: The dead msg queue is solely a feature of Event Mesh, this must be created manually before, CAP doesn't do that automatically, only for own queues.
Maybe Tobias Griebe knows why the message doesn't get pushed into the dead queue after many failed attempts? (CAP delegates the queue configuration to Even Mesh).
Thanks and best regards,
David
Hi,
Thanks for the quick reply. This is what we have been trying, sorry I was not clear on that aspect in the first comment. With "messaging" in "cds.requires" and in "conect.to" we can see in the output that it connects, like this:
If we have "messaging1" in both places, the log stops after "terminate with ^C" and the messaging does not work.
I tried to isolate it a bit further (on version 5.4.3). I think it is related to this.
The "connect.to("messaging")" in "server.js" causes the "subscribedTopics" of the AMQPWebhookMessaging to be populated before the "listening" event occurs. If we use a different name they occur in the wrong order (subscribedTopics is not populated when the startListening method is called and nothing is connected).
Adding this to a custom server.js makes the other-named messaging instance connect early and then it works properly.
Hope this helps finding the root cause.
Thanks!
//Carl
Hi Carl,
Thanks for this nice writeup, yes, that explains the problem.
You're connecting to 'messaging1' too late.
I just saw that you didn't await the promise when registering the messaging handlers, this will lead to a race condition. So you need to write the following:
Otherwise, when we cannot await the
srv => {...}
function.Best regards,
David
Perfect, thanks! That did the trick!
Hi David Kunz & Carl Önnheim,
unfortunately the maxRedeliveryCount -> DMQ configuration is not supported when you are using WebHooks. To my knowledge the DMQ for WebHooks only evaluates TTL.
maxRedeliveryCount -> DMQ works with AMQP, but the behaviour is unpredictable with webhooks.
Best regards,
Tobias
Hi Tobias Griebe ,
Thanks for clarification, this is good to know!
Best regards,
David
Hi Rafael Schardosin ,
Thanks for a great blog.
I have a question.
How do you set max retry?
Regards,
Minh
Hey Rafael Schardosin,
Nice article!
One thing you can add is is that you can set the format to CloudEvents spec in the config too:
It will produce something like by filling in all the header fields for you:
Cheers
Lochner
Hello, is there any way to consume text and not json messages from the queue?, it seems that is always expecting json messages the cap amqp client.