Zen and the Art of SAP Data Intelligence. Episode 4: a few things you should know before grouping operators in your pipeline
In the early days of Data Intelligence, I worked with a young intern to test a prototype of the product on a large amount of astronomical data. My student was fascinated by the scenario that involved the ML tagging of pulsar signals collected by some of the world most powerful radio telescopes and was eager to process all the data and get some real science out of them. But the ML tagging was computational intensive: it took several seconds to score a single candidate and we had tens of millions to process. I suggested to find a way to parallelize the pipeline to speed up the whole thing. My suggestion was immediately applied: the student decided to group the ML operators and set the multiplicity to 1000.
Two minutes later, our little prototype cluster became unresponsive and my very worried student did not look so well either. Fortunately, they were both pretty resilient and I managed to revive them shortly after. But the event taught us a very important lesson: never underestimate the power of the Schwartz!
In this blog post I will dig deep into the concept of creating operator groups in a SAP Data Intelligence pipeline and I will give you a few useful hints to make sure you can best exploit this powerful feature. If you have very little time to read, feel free to jump Step 1 and Step 2, but make sure you do not miss Step 3 and 4: they contain the most important takeaways!
- The steps of this post have been performed with Data Hub 2.6.1, but should be valid for any Data Hub ≥ 2.6 and any Data Intelligence ≥ 1910 versions.
- Recommended browser: Chrome.
- A basic knowledge of the SAP Data Intelligence Modeler is recommended.
Step 1: how to group operators and why
Let’s start with what the official SAP Data Intelligence Modelling Guide says about operator groups.
“…The most common use case for using groups is to distribute work among many compute nodes either through partitioning the graph into many groups, adding multiplicity larger than 1 for a group, or both. This can lead to better graph throughput and cluster utilization. A user may also want to have different restart policies for different groups. For example, in a group one may want the container to be redeployed when it fails, while in a second group the user may want the graph to terminate if this group fails. Finally, one can also create a group if currently there is no dockerfile satisfying the requirements of a graph. In this case, the user needs to partition the graph into groups in such a way that for each of them exists at least one dockerfile which satisfies its requirements…”
In summary, there are three reasons for grouping operators:
- Increase throughput by parallelizing and distributing the workload among the worker nodes.
- Differentiate the restart strategy across different parts of the same pipeline.
- Use different dockerfiles for different parts of the pipeline.
These are very useful traits for a feature that is super-simple to use. All you need to do is to select the operators to group by clicking on them while holding the SHIFT key down and then select GROUP from the context menu.
Unfortunately, you cannot group operators by dragging the mouse pointer to create a selection area: the click and drag will shift the whole canvas around instead. Configure, extend, and remove a group is equally simple as explained in the official manual.
Step 2: what happens when you group operators
When you start a pipeline without groups, the Modeler will first try to find one suitable docker image to host and execute all the operators in the pipeline. If such a docker image exists, then the Modeler will ask Kubernetes to start this image in a new Pod and execute the pipeline there.
1 pipeline without groups = 1 pod.
If your pipeline contains one group of operators, then two docker images are used and two pods are started: one pod for the operators inside the group, and one pod for all the others (this is called default group). Increase the number of groups and their multiplicity and you get more and more pods accordingly.
Total number of pods in a pipeline with groups = 1 + sum of multiplicity factors of all groups.
When you execute a pipeline with groups, you can check the execution status of all the replicas of the different groups by navigating to the “Group” tab of the graph status summary. In Step 1 we saw a picture of the RClient example where the R operator is grouped with multiplicity 5. In the image below you can see the execution status of the same graph.
The group tab contains a table with one row per group instance. In this case we have 6 rows, one for the default group, i.e. the pod containing all the un-grouped operators, plus the five instances of group 1, i.e. the group containing the RClient operator with multiplicity 5.
Step 3: be careful with multiplicity
I started this blog post with sharing the story of how we temporarily knocked a cluster down by choosing a too high multiplicity factor for an operator group. The possibility to change the multiplicity of an operator group is a very useful feature: if we identify one or more bottleneck operators in our pipeline, we can try to increase the global throughput by grouping those operators and increase the multiplicity to parallelize the processing. But be careful: we already saw that in this case the Modeler will ask Kubernetes to schedule one pod for each instance of the group. This means that in the example above my student triggered the creation of 1000 pods. Kubernetes will try its best, but that’s obviously mission impossible, unless you have a huge cluster: the many starting pods will quickly saturate all the worker nodes CPU resources. As a consequence, all Data Intelligence pods will start to suffer and get slower and slower to the point that the whole architecture will become unresponsive.
Though the situation is temporary (eventually the creation of 1000 pods will fail, the request will be cancelled, and all the vital pods will come back to normal), it could be a dangerous and expensive event in a productive environment. The recommendation is therefore to carefully estimate how much memory and CPU will each instance of a group consume before deciding the multiplicity factor.
The latest version of SAP Data Intelligence offers also the possibility to define the resource requirements for each group in a graph: this can be extremely helpful in order to avoid resource saturation. Hopefully, Data Intelligence will soon offer the possibility to define Kuberentes node selector labels, affinity and anti-affinity rules, data locality criteria and so on… All these features are already available on Kubernetes and would be extremely helpful for all Data Intelligence users and administrators.
Step 4: mind the group boundaries and the data exchange across them
I bet you will not find much info in the available documentation on this topic, but I consider it to be one of the most critical things to know before grouping operators in a pipeline. How does the system manage the data exchange between the different groups of a single pipeline? In other words, how are messages delivered from one pod to another within the same pipeline?
Let’s start with a small experiment.
In the picture above there are two versions of the same graph: an operator generates a random message every 10 ms and another operator consumes and processes those messages with the processing taking 1 second per message. Between the two main operators there are two wiretap operators for us to monitor the data flux. The only difference between version A and version B of the pipeline is that in the latter half of the operators are grouped together.
We first run the pipeline A and compare the output of the two wiretap operators.
As you could easily expect, they are almost identical: they both show that, except for the very first few messages, the throughput is 1 message per second, i.e. the time it takes for a message to be processed. This means the random generator cannot go as fast as its configuration requires, but must wait for one message to be processed before being able to send the following one. This effect is called backpressure. We can conclude the wire connecting two operators can only store one message a time and the upcoming message must wait for the previous one to be consumed before being allowed through.
Let’s now repeat the same experiment with pipeline B.
As you can see from the picture above, the logs of the two wiretap operators are very different now. The snapshots were taken after 100 minutes of running. The first wiretap, the one outside the group, logged 600,000 messages with the expected 100Hz rate. The second wiretap though, the one inside the group, logged at the slower 1Hz rate. This means hundreds of thousands of messages have left the first wiretap, but have not yet reached the second one. Where did they go?
The reason for this surprising behavior is rooted in the architecture of Data Intelligence. When operators are executed in the same pod because they belong to the same group the messages are directly handed from one operator to the other within the same process. The data exchange is virtualized to minimize data copy, and only a reference to the same message is shared across different operators to avoid memory waste. One operator cannot send the next message before the previous one is consumed.
But the data exchange between operators located in two different pods behaves very differently. In fact, SAP Data Intelligence has a dedicated messaging system running in a separate pod. Operators sending messages across a group boundary are publishing those messages in a dedicated queue. The operators on the other side of the group boundary subscribe to the queue to receive those messages. The queue behaves as a FIFO and stores messages as much as it can, within the memory limits of the pod. In the picture below, you see the memory of the messaging pod growing with time: our 600,000 messages occupied almost 350MB in less than two hours.
This is a crucial thing to consider when designing a graph with groups. In fact, if pipeline A is stopped or crashes at any point in time, you would lose just a few pending messages, say one per interconnecting line. Those messages have already been produced by the first operator, but never processed by the last operator and will simply disappear. If this sounds already quite critical to you, consider what happens in pipeline B: a stop or a crash after 100 minutes and you will lose 600,000 messages!
IMPORTANT DISCLAIMER the experiment above was conducted with Data Hub 2.6. Starting from Data Hub 2.7 and Data Intelligence 19**, the messaging system has a limit on the number of messages. This limit is around 1600 messages.
In this blog post we deep-dived into one of the most useful features of the SAP Data Intelligence Modeler application: the possibility to group operators in a pipeline. We quickly reviewed what the official documentation say about groups and then we discovered a few important implications of their use. Our analysis can be summarized in the following takeaways:
- Check your resources before exceeding with the multiplicity factor.
- Be aware of the capacious FIFO across the group boundaries.
Keep in mind the two points above the next time you design a pipeline and consider all alternatives before rushing into grouping. If my student knew what we have just learned he would have started with a much lower multiplicity or even decided to start several copies of the same pipeline without groups to achieve the same degree of parallelization.
I hoped you enjoyed this new episode of Zen and the Art of SAP Data Intelligence. If you want me to deep dive into a specific scenario or you have questions whose answers cannot be found in the official documentation, do not hesitate do drop me a comment below: I’ll be more than happy to help.
Ciao for now.
For the philomaths
In step 3, I used the word “backpressure” to describe the fact that a bottleneck in a data pipeline will make the whole pipeline clock at the rate of the slowest operator. This is mentioned in the Execution Model chapter of the SAP Data Intelligence Modelling Guide. A quick Google search would also return you a lot of articles and posts about backpressure in data handling. A few of the topmost search results are https://medium.com/@jayphelps/backpressure-explained-the-flow-of-data-through-software-2350b3e77ce7 and https://nodejs.org/es/docs/guides/backpressuring-in-streams/.
In the latest SAP Data Intelligence version at the time of writing, the messaging system used by the Modeler to handle the data exchange across group boundaries is based on NATS. Future implementations might be based on a different technology.
The last picture in Step 4 comes from the Grafana system monitoring application. It is a snapshot of the memory consumptions of the NATS pod obtained from Kubernetes. Grafana is delivered as part of Data Hub or Data Intelligence and documented in the here.
Finally, if your were captivated by the story of the radioastronomy data and would like to know more, just drop me a comment: I might consider writing a full blog post about it in the near future.