Spark Data Wrangling in SAP Analytics Cloud
——— This blog is outdated. For more updated information, please refer to the Guided Playlist on Smart Wrangling in SAP Analytics Cloud.———
Back in July 2015, we started a project code named “Sparca” to help break apart the data wrangling functionality into a separate microservice from our monolithic architecture. It was a great learning experience that I decided to document and share with you now that it was released back in December 2016.
The Sparca Data Wrangler helps with the pruning, transforming, and formatting of your data so that you can create a SAP Cloud Analytics Model that will eventually be consumed by Charts and Tables. Data wrangling sits in between the Data Acquisition and the Modelling workflow. The Data Acquisition phase involves retrieving raw data from all the different cloud or on-premise data sources such as Concur, SuccessFactors, BW, HANA, BI Universes etc. When you initially acquire raw data it may need to be cleaned and it can also be quite large in terms of the number of rows and columns. During this data wrangling process sometimes you are still trying to understand your data and make sense out of it, so data profiling also plays a part in model design. Many of these columns and rows might not even be relevant to your charts and the data wrangling design process itself can also be quite time consuming.
In the previous monolithic architecture, does it make sense to use up HANA’s resources for large unrealized data that has not been mined for information yet? Does it make the most sense to use HANA’s memory for long running sessions when a single user has not discovered what they want to create yet? We think we can use an alternative solution for these steps before analytics take place by moving towards a microservice architecture.
Sparca does not use HANA to implement data wrangling so this will alleviate the memory and computing resources used in the monolith. HANA was designed for grouping and aggregation which suits analytics but is not the ideal fit for large datasets with unrealized raw values. For data that is still unprepared we can store it on a cheaper platform like Hadoop which does not require expensive hardware and will allow us to wrangle very large data sources which would otherwise not be possible to fit into your traditional database. Users love looking at our digital boardroom with all the fancy charts and tables. However, creating a relevant model that is consumable by those charts is very challenging. One of the goals of our team is to help make model creation easier and learnable so that you can move on to the more fun parts of the application like chart and story creation. We try to guide the user by automatically doing smart conversions for them, suggesting actions to take based off of data profiling, and giving feedback to users when they are designing an invalid model. It is an imperfect solution that always generates much Usability discussion and controversy!
It was very interesting working on the Sparca project because it introduced us to many new technologies such as Hadoop, Spark, Akka, and Spray. We were also able to express our algorithms using Scala which challenged us to write our code in an immutable functional way without any side effects. The Sparca Service is also different than your traditional web application in that it doesn’t have a database at all. Everything is in memory! So this brought up some interesting solutions to implement workflows that we previously took for granted when a database was present. All this innovation we took advantage of is all open source. So we were able to learn a lot by looking at source code and Googling for problems that others have encountered.
Back End Architecture
One constant pattern that runs up and down our stack is layering. We use layering to manage complexity and help decide how features should be implemented and tested. It is very hard to make code changes on an agile team that wants to deploy every 2 weeks if you don’t understand the ramifications of the changes in your commits. Layering is one pattern we use to abstract complexity and manage our code. Here are some benefits we discovered in our layered service:
- we can advise and have a common understanding where code should be changed
- we understand the consequences of a code change without intimately knowing the other layers
- have greater flexibility to swap out layers without affecting the entire code base
- we can come up with testing strategies at each layer that will harden the code over time.
The foundation of our stack is Apache Spark which is used to support big data profiling, consumption, and modification. The next layer up is our “core processing” layer where all services are implemented based off the Spark platform. These services are consumed in the next Actor layer where workflows are stitched together. Note that the “core” layer knows nothing about Akka technology. The next layer up is the web layer which contains all the http specific functionality like request routing, JSON parsing, resource serving etc. Again note that the Actor layer is not dependent on being executed in a HTTP request. This facilitated in writing isolated Akka tests that just involves sending and receiving messages to Actors. The top layer is the “SCP server” layer which contains all the specific SCP, Orca, and J2ee logic necessary for integration. Another strength to this layering design is that if you work on the core functionality of Sparca (in a lower layer), you don’t need to run or be aware of Orca or SCP. SCP (SAP Cloud platform) is the PaaS solution that we currently use to deploy our microservice.
I will start from the bottom up because that is how this innovation project evolved.
Spark is an open source in-memory data engine that we use to execute all our transforms and run all our queries. Spark has a dataset abstraction called an RDD which can hold a large amount of data on your cluster across many different machines. Since the application programmer is only aware of the RDD abstraction it is quite nice to logically design your algorithms against the RDD and let the spark engine take care of the distributed execution. Distributed programming is a bit different in that you have to be mindful of how much data you transfer from node to node or cluster to client. The application developer also has the challenges of their code (and dependencies) running on different machines so there is no “stepping through code” or systematic log output.
There is a higher level abstraction called a Spark DataFrame that allows us to execute SQL like queries against the RDDs or you can programmatically change the RDD directly through their API. A functional programming language supports parallel execution of code by having clearly defined inputs and outputs on immutable data. This work on an immutable RDD can naturally be broken up into multiple partitions where each partition can be executed on separate nodes within your cluster. So Scala was the language of choice used to execute the spark API. Most spark examples on the internet are written in Scala as well so that was convenient. Scala code can be run on a JVM and can be integrated with Java quite seamlessly. Once you start coding in Scala it is hard to go back to Java since it is so verbose. Currently we are unable to execute our code in a distributed fashion because Hadoop is not supported yet on SCP. So in order to initially ship this technology we are running all of our spark algorithms in an environment called “local mode” which is in the same process as our application server. This will temporarily serve us well since the data set size requirements that we currently must support are quite small (100s of MB). We are currently porting over our application to SAP Cloud Foundry so we will have the S3 Object Store available to us now for larger datasets. You can find more information about Spark here: http://spark.apache.org/
In our core processing layer is where all our services are implemented. It is hard to be Agile and deploy every 2 weeks making significant changes in the code if you do not know the side effects of your change. Not only is the Sparca stack separated by layers but we also strive to write cohesive modules where functionality is isolated and easy to rationalize about. We also try to write decoupled modules that are each independently tested before they interact with other services. In our processing layer we have these services that have some complex logic like relationship detection and model validation. We have a gauntlet of tests that we run that gives us the confidence that they are functionally ready to be consumed at a higher level in the stack.
The Data Inferencing module allows us to discover the types of values and the semantic types of values. For example, it can suggest to us that “01/05/2012 11:00:11” is of type date or the string “2ab340-2359-as8d” is semantically a unique id. We can then use this information in type conversion or modelling suggestions.
The Data Profiling modules derive data statistics about a column of data and also relationships between the columns data. This information is used to populate our histogram charts and quality bars. Column relationship information can be used to suggest how you can create relevant dimensions in your model.
The Data Validation module is responsible for running various model and data validation rules against the dataset that will provide feedback if the model can be successfully be created or how it can be fixed.
I group the Query, Transforms, and Persistence services as part of our core Dataset module. The Dataset is the abstraction that you visibly see in the table UI control. The Dataset is consumed by services like Type Inferencing, Data Validation, Data Profiling, etc. The persistence service is responsible for all the serialization, deserialization, versioning, and compatibility of the state of the wrangling session. This state is used after the model has been created and the user wants to schedule it within Orca.
The Actor layer is where we introduce Akka Actors. Akka is great for doing asynchronous work and it is framework that is naturally designed to “scale up” and “scale out”. For example, every time you execute a transform action we asynchronously kick off work to incrementally profile the column data or discover new column relationships. So the transform execution can immediately return while in a separate thread we are executing expensive asynchronous work concurrently. So when we want to “scale up” by adding more cores or memory, more work can be processed in parallel by this inherit concurrency. Akka can be configured to not only run within the same process but also a separate process on the same machine or on a different machine entirely. We do not yet take advantage of this feature but are excited to experiment with scaling out across many nodes by just changing the configuration of the Actors. Also actors are useful to logically build workflows by piecing together the services. I kind of think about this as machines working in an assembly line. For example, the loader actor will call the loading service, which will then call the type inferencing service, which will then call the convert type transform, which will then call the data profiling service. Personally I find actors easily to test as well since they take in messages and produces messages. So there is a clean separation between actors and clear boundaries of responsibility when testing. You can find more information about Akka actors here: http://doc.akka.io/docs/akka/2.4/scala/actors.html
Another aspect of Akka that we are excited about is the Finite State Machine functionality. When we create an Orca Model, there are many different logical states that we need to transition through (load, transform, validate, model creation, save wrangling session) so we are implementing a FSM workflow that will make our code more readable and testable.
One internal secret on our team is that some team members do not even install HANA/Orca or know very little about SCP. SCP (SAP Cloud Platform) is the deprecated PaaS that we initially deployed on. This is possible on the Sparca team because our service can also run in a “stand alone mode” outside of SAP technology. We are able to run a light weight “spray can” server that can handle HTTP requests and serve our js resources. We find this is a strength on our team by reducing the conceptual weight of all the different technologies (Orca, HANA, SCP) needed to execute and develop generic wrangling workflows. You can find more details about the spray server here: http://spray.io/
The Web layers’ responsibility is to provide all the HTTP request routing to the appropriate actor to carry out the request. This routing is implemented in a Spray DSL language that also has a complementary JSON library to help with all the tedious JSON parsing and creation. Since the Sparca UI is served from the Sparca service, this enables us to deploy Sparca independently of Orca when delivering features. Another nice feature of the Web Layer is that it provides an Actor Factory that can create different actors based on which environment it is started in. For example, for our unit tests we can create a Test Loader actor instead of the real Loader Actor. Or when the server is started in the SCP environment we can change the behavior of the Loader actor to accommodate special workflows specifically for SCP like the Document Service instead of HDFS.
SCP Sparca Server layer
The SCP Sparca Server layer is necessary for Sparca to be deployed as a SCP Java Service. It contains all the SCP specific functionality so it does not pollute our core code base in underlying layers. It contains all our integration points with JApp (Data Acquisition), Orca Publishing API (Modeling), and the Orca Wrangling Session API (Saved State). It implements a connector bridge from the J2ee servlet world to our spray server. It provides all the JMX monitoring we need to understand the health of our application in the SCP cockpit. We also reuse all the tried and true j2ee servlet functionality like session management, filters, authentication, etc. This is the project that we use to build our war file that is deployed to SCP.
Client Side Architecture
The Sparca UI is hosted in a single UI5 control where the developer can specify a DOM element to insert. Currently there is a simple API exposed like start, stop, and load. Our UI is served from the Sparca Service and is not dependent on Orca.
When we designed our java script architecture, we stole this design idea that says something like “the key to building large scale java script applications is to never build one. Build your application up from small pieces”. For all of our controls, views, and controllers we built small decoupled modules that can be independently tested. In our project the Table Control, Side Panel, Card View, etc. should all be able to run standalone from each other. Originally during the early days of the project I would do tests like turn off controls and make sure the existing workflows still functioned. So obvious code smells would be if a module would directly talk to another module via call backs, function calls, or direct event handling. We prefer that all modules are decoupled and changes are done indirectly through our managers. We feel that this will give us more flexibility in the future when our application will change and also make our controls more reusable within our code.
Another design consideration for the Sparca control is that we wanted predictable behavior and a unidirectional flow of execution. Initially, we also had some anti-patterns about how to modify data while it was being shared across modules so we needed a solution to deal with this. When working with many developers on the same team, we felt that it was important that developers understood where to make changes and how to share data. For any change that happens in our modules we would send an event to a data manager. This data manager stores the truth about what the state of the data is and is responsible for updating the truth with the backend Sparca service. Once the truth is updated, it will publish a subsequent event for any interested listeners. So one change request goes into the data manager and multiple events could be published. The decoupled modules would listen for specific updates that they are interested in. This pattern has grown from one data manager to other managers that are dependent on the original data manager.
On the Sparca team we have a tribal term called “Big M” vs “Little M”. Big M represents the data that is the truth which is held by a manager. Big M is read only and can only be referenced. If a control wants to update the data model, they need to either clone the data or send an event to the data manager. If the control clones and modifies the data, they own the data and we call this Little M. If you send an event to the data manager to modify the truth, then all the listeners would then be updated when the modification happens.