Custom Parallelization of Hana Views from Apache Spark
In two recent blog I demonstrated how easy it is to call HANA views from Apache Spark and push down more complex SQL logic to HANA
Calling HANA Views from Apache Spark | SCN
Optimising HANA Query push-down from Apache Spark
As datavolumes and complexity of the underlying HANA view increase it may not be possible to execute a single query on HANA. It may overload your HANA datasource, causing the typical ‘Out of Memory’ (OOM) errors you see when working with highly complex calculation views. You could increase your users OOM limit, revisit the design of the view or run the query in smaller manageable chunks, at a time, and join the results back again.
NOTE: This isn’t just an issue just for Spark accessing complex Hana views, but may also occur with Vora accessing HANA as well.
If you take the take the simple test dataset from the first blog in this series, assuming granular info is needed in Spark for subsequent joins or processing, then it may not be practically to bring all the results over in one go.
In this simple example you could break it into many sub queries such as:
select * from “_SYS_BIC”.”test/CA_RDATA” WHERE RUP2 >= ‘AA’ and RUP2 <= ‘AZ’
select * from “_SYS_BIC”.”test/CA_RDATA” WHERE RUP2 >= ‘BA’ and RUP2 <= ‘BZ’
Etc.
Then union the results in Spark.
You may have noticed in the first blog that the, Spark does have some basic partitioning logic to handle this. The down side is that it does not work on many column types, and expects the data source to have a nice sequential row number as part of the key to split the load into multiple sub-queries. Unless you have you designed your data or view in HANA this way thing this is unlikely to work your you.
In this blog I’ve created a small scala script that you can try help you to automate the split, build the SQL, run the queries in parallel before finally unioning the results.
It has a few input parameters:
- the Table, View or Query
- the column to be used to split the query by
- the number of Splits (separate calls to HANA)
Depending on these parameters it will auto generate the splits for you such as:
The SQL statements are union-ed together in a single Spark Dataframe, which can then be queried:
This Dataframe then pushes down the split logic when it is called in Hana:
The basic logic of the below code is to:
- Find the distinct values for the specified column and assign a row number, using SQL similar to:
select to_int(ROW_NUMBER() over ()) as “ROWID” , “RUP2” from (select distinct “RUP2” from SYSTEM.RDATA order by “RUP2” )
- Based on the number of Distinct column values and split number request, the ‘interval’ or number of column values in each split group is determined.
Math.round( Max Row Number/number of Splits)
- The ‘Mod’ function is used to identify which “column” value is used as the end point in each split. Row Number % interval
- Records with Mod 0 are identified, and LAG function used to determine start point of each range
- A dynamic WHERE and SELECT clause are created
- Finally the individual SELECT clauses are created as individual Spark Dataframes which are then unioned together as a single Spark Dataframe
Here is the code I created, I hope you find it useful.
Scala Code |
---|
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.expressions.Window /* Function returns a Dataframe for a Hana sourced table or View*/def selectHana(hanaView: String) : org.apache.spark.sql.DataFrame = { val driver =”com.sap.db.jdbc.Driver” val url =”jdbc:sap://<HANA HOST>:<PORT>” //Different port for MDC val database = “<DATABASE NAME>” //Needed for MDC val username = “<USERID>” val password = “<PASSWORD>” var resultsDF: DataFrame = null; resultsDF = sqlContext.read.format(“jdbc”).option(“driver”,driver) .option(“url”,url) .option(“databaseName”, database) .option(“user”, username) .option(“password”,password) .option(“dbtable”, hanaView) .load() return resultsDF } /* Function which generates a ‘WHERE’ clause based on inputs */ def buildWhereClause(columnId: String, columnFr: String , columnTo: String ) : String = { var whereStr = “””where “””” if (columnFr == null) { // First Row whereStr = whereStr + columnId + “””” <= ‘””” + columnTo + “‘” } else { whereStr = whereStr + columnId + “””” > ‘””” + columnFr + “””‘ and “””” + columnId + “””” <= ‘””” + columnTo + “‘” }
return whereStr } //Define UDF function, used in a Dataframe, to build the where clauseval buildWhereClause_udf = udf(buildWhereClause _) /* Function which splits a HANA view, based on an input colum, into multple sub queries based on splits requested */ def splitHanaFilter(hanaView: String, splitColumn: String, numSplits: Int) : org.apache.spark.sql.DataFrame = {
val wSpec = Window.orderBy(“ROWID”) //Return at data frame with the distinct splitColum, with an associated row id */ //e.g. // select to_int(ROW_NUMBER() over ()) as “ROWID” , “RUP2” from (select distinct “RUP2” from SYSTEM.RDATA order by “RUP2” ) val rowIdSql = “””(select to_int(ROW_NUMBER() over ()) as “ROWID” , “””” + splitColumn + “””” from (select distinct “””” + splitColumn + “””” from “”” + hanaView + “”” order by “””” + splitColumn + “””” )) as tmp””” val RowIdDF = selectHana(rowIdSql) //Get MaxRow and set interval val maxRowDF = RowIdDF.agg(max($”ROWID”).alias(“MAXROWID”)) val maxRowId : java.lang.Integer = maxRowDF.head().getInt(0) val interval = Math.round(maxRowId/numSplits + 0.5) //Rounds up to keep the number of Splits aligned, otherwise may get 1 extra split for few records. //Add Colum representing the Mod of the ‘Interval’ then return filtered records where MOD zero val filterRowIdDF = RowIdDF.join(maxRowDF) .withColumn(“MOD”, RowIdDF(“ROWID”)%interval) .filter(“MOD = 0 OR ROWID = MAXROWID”).select(“ROWID” , splitColumn , “MAXROWID” ) .withColumn(“PREVKEY”, lag(RowIdDF(splitColumn), 1).over(wSpec) ) .withColumnRenamed(splitColumn, “KEY”) .withColumn(“WHERE”,buildWhereClause_udf(lit(splitColumn), $”PREVKEY”, $”KEY” )) .withColumn(“SELECT”, concat ( lit(“(select * from ” + hanaView),lit(” “) , $”WHERE” , lit(“) as tmp”) ) ) return filterRowIdDF } /* Main Execution of the functions */val hanaTableView = “SYSTEM.RDATA” //val table_view = “””(select “$rowid$” as “rowid”, “RUP2” from SYSTEM.RDATA group by “RUP2″) as tmp””” val splitColumn = “RUP2” var numSplits : Integer = 5; var unionResultsDF: DataFrame = null; var selectArray = splitHanaFilter(hanaTableView, splitColumn, numSplits).select(“SELECT”).collect /* Loop through the sub-queries, create individual Dataframes, then union results*/ var iteration = 1 selectArray.foreach( row => { var rowSELECT = row.getString(0) //println(x.getString(0)) if (unionResultsDF!= null) { unionResultsDF = unionResultsDF.unionAll(selectHana(rowSELECT)) //println(“Next: ” + iteration) } else { //println(“1st: ” + iteration) unionResultsDF = selectHana(rowSELECT) } println(“Iteration ” + iteration + “: ” + rowSELECT) iteration = iteration + 1 } ) unionResultsDF.show() //Only first SQL called println(“Count ” + unionResultsDF.count()) unionResultsDF.registerTempTable(“RDATA_PARA”); |
When you execute the scala code (to create the final Spark Dataframe, and register as a Spark table) you may see results as follows:
While this code was written with only Apache Spark in mind, with some little tweaking it can also be applied to Vora queries against HANA as well.
I hope you’ve found this useful. If you found this helped you or you needed to make any other tweaks to optimise it in your environment then please add a comment.