Blog 8: Message Aggregation Pattern in Integration Flows
Hello Integrators!
In this blog, we shall look into the Aggregation Pattern supported in SAP Cloud Platform Integration(f.k.a HCI). Aggregation is the first stateful pattern supported by SAP Cloud Platform Integration. Stateful means it keeps a track of “state” of the messages and clears the status only if a condition is achieved. This shall become clearer as you read through the blog.
What is the Aggregation Pattern?
An aggregator flow step collects and stores individual messages until a complete set of related messages has been received. On successful receipt of the messages, the aggregator publishes a single message (aggregated on some principle). We shall look into the details using an example.
Let’s say we want to aggregate every incoming messages of the same format together into a new message. The messages are of the following format.
Quite specifically, let’s say we receive three separate messages into the Integration Flow.
Now, we want to aggregate the messages according to a predefined condition. Now, let us define the conditions. Each aggregator is configured by a Correlation expression and an Aggregation strategy. They are represented in the properties of the Aggregator flow step.
The correlation expression is used to define which messages should be aggregated together. For every incoming message, depending on the correlation message – a correlation id is created. All messages with the same id are aggregated together.
The aggregation strategy provides the logic for combining all messages into a single aggregated message. Now, we want to combine all the messages strictly in sequence. In this case, the sequence number must be provided in the incoming message. In our example, it is provided in the field /Mobile/ MCode. Further, we shall denote the last message by the field value /Mobile/LastItem = true. This means as soon as this message is received by the aggregator, the messages have to be grouped and sent as a single message.
Further, I shall give a timeout period of one minute. That means, between two messages – the maximum waiting time should be one minute. If that time period is elapsed, then the aggregator should combine all the messages received thus far and send a single message. Check our settings below.
So, finally when the three messages arrive, it is aggregated as one message and sent to the receiver. This is how our final output looks like:
Two points to note:
1. In the message monitoring tool, the logs appear in pairs. One for receiving the message into aggregator and the other for confirming it into the data store.
2. Post the aggregation step, you may want to know if the aggregation has been successfully completed based on the expression or did a timeout happen. The information can be obtained from the header parameter in the integration flow. It is available from the ${header.CamelAggregatedCompletedBy} parameter.
The values would be timeout or predicate. You can use this in an exclusive gateway step.
Further enhancements to the aggregation step is planned. So, keep a look out for it!
Best Regards,
Sujit
Hi Sujit,
I am trying to implement aggregator. In the message monitoring tool, I have the logs with one completed and failed. When I see the trace in the completed message I could see the aggregated message after the aggregator( It has the same structure like your example) . But when I view the failed log I can't see the message trace after Aggregator. The detailed property of failed message shows me that "WstxUnexpectedCharException:Unexpected character 'b' ". Can you help me?
This is a cool feature which can be just added using a step instead of designing and developing it. You could have done the same in a ccBpm but that invovles a lot of design and development. Are there any plans to make it available on premise?
Thanks,
Rahul.
You mean it doesn't run when deployed to PO?
Hi Sujit,
very descriptive explanation. i tried this feature in an iflow which pushes data to Odata. initialy we are trying to send mail adapter. but got the error: cause: "com.sybase.jdbc4.jdbc.SybSQLException: Data exception - string data right truncated".
> message sequence expression xpath: /target. but we have two childnodes in main target node. so, how can we find the expression because there is not unique for these two child nodes?