Skip to Content
Technical Articles

Leveraging SAP Data Hub Managed Connections within Custom Operators

Contents

1. Introduction
2. Create Custom Operator
2.1. Custom Operator – Configuration
2.2. Custom Operator – Script
3. Create Pipeline
4. Conclusion

Introduction

In SAP Data Hub, different data management & processing activities – such as reading/writing files in a data lake, running SQL statements in a Data Base or running a Python script to train a Machine Learning model – are realized by the so called operators. Operators are the basic unit of work in SAP Data Hub’s pipeline runtime and are typically executed in a chain within what we call a graph (or pipeline). SAP Data Hub has 100s of standard operators to realize the most diverse types of tasks out-of-the-box.


Figure 1 – A typical SAP Data Hub pipeline

However, sometimes, one might still want to execute a task that is not normally achieavable by the existing standard operators. A common example is to leverage the HANA Python Client API for Machine Learning (aka hana_ml) to run ML commands on HANA without having to write complex SQL Wrapper procedures for the PAL & APL algorithms. In SAP Data Hub, one can achieve that by writing their own custom Python code in a Python operator. This was described in a recent blog by a colleague of mine, Stojan Maleschlijski:

https://blogs.sap.com/2019/09/27/installing-python-packages-from-tarballzip-files-into-sap-data-intelligence-an-example-with-hana_ml/

When writing custom code in Data Hub, there are basically two options. If it’s a standalone operation that you don’t expect to have to do again, it’s possible to write the custom code directly in the base operator (e.g. a Python3 Operator) in a graph, similar to what Stojan Maleschlijski did in his blog. Alternatively, if you expect to have to reutilize that code and/or you want to share it with other users for reusability, it is recommended to create a custom operator. Custom operators are typically based on a base engine – e.g. Python, JavaScript, Go etc. – and developed by writing a script in the programming language supported by the base engine. In additon to the script, other important capabilities that a custom operator adds in comparison to directly writing code in a scripted operator are the ability to add custom parameters and the ability to define custom tags (i.e. to define which docker image should be used as the runtime for this operator). Tagging for custom operators has been extensively explored in other blogs and hence I won’t revisit it again. For more details on that, read this blog by Jens Rannacher:

https://blogs.sap.com/2018/01/23/sap-data-hub-develop-a-custom-pipeline-operator-with-own-dockerfile-part-3/

In this blog, Jens Rannacher also explores how to define custom parameters for the custom operator. In particular, he adds 3 simple text parameters that are then used in the Python script. He then shows how one can consume the value of the parameters that have been set in the graph configuration during the graph execution runtime. For example, if the custom operator has a custom parameter called customParam, you can read it’s value in runtime by simply pointing to api.config.customParam in the Python script. The vflow api is very powerful and, in particular, it gives us the capacity to read any configured parameter through the api.config object.

However, what if we want to create a more complex parameter that is not just a simple string? Looking at several of the standard operators in SAP Data Hub, we can observe more complex types of parameters with radio buttons, drop-down boxes and even more complex ones, for example, the type of parameter used to select a connection from the connection manager (or enter the connection details manually). Below is a screenshot of this connection parameter.

Figure 2 – Connection Selection screen

This type of parameter is particularly handy when one needs to have the connection details of a system for their custom code, and this system happens to be already maintained as a managed connection in SAP Data Hub’s Connection Manager. It avoids the developer from hard coding the connection details, including user and password, transparently in the code for anyone (with access) to see, such as the example in Stojan Maleschlijski‘s blog mentioned above; even further, it avoids the developer from having to know the connection details altogether.

In the next sessions, we will explore how to create a custom Python operator with the Connection type and how to consume the connection details in the Python script.

Create Custom Operator

For the purpose of this exercise, we will create a custom operator based on the Python3 engine, similar to the one described on the aforementioned Jens Rannacher‘s blog, to apply a Decision Tree model in HANA PAL that has been previously trained (and saved to a table). It is also assumed that a custom dockerfile with the hana_ml python api has already been created, as per explained in Stojan Maleschlijski‘s blog.

To do so, in the Navigation pane (left side Palette) of the Pipeline Modeler, go to the Operators tab and click on the  “+” button on the top menu. Enter any name and description you desire, choose the “Python3 Operator” as the base operator and either enter a category or leave it empty – if you leave the category empty, the new  operator will be added to the “Others” category when selecting it from the Navigation pane. Click on OK.

Figure 3 – Create Operator screen

Tip: if you want to include your operator in a subfolder structure, you can enter the desired structure as a namespace in the Operator name (similar to a package in a Java class), e.g. an operator with the name com.mycompany.myoperator would be created with the subfolder structure /com/mycompany/myoperator/ under the /operators/ folder.

After it is created, you can search for the name of your new Custom Operator in the Operators tab of the Navigation pane and it will also be open on the Editor view.

If you wish, add a custom icon (use a .svg file). In my case, I used the SAP Logo as my custom icon.

In the first tab (“Ports”), create the necessary ports for your operator. In my case, I created just an Output port named “output” of type “string”.

Figure 4 – Custom Operator – Ports

Tip: if your graph will process data (e.g. acquire data from a source and push it into a HANA table) prior to the ML model apply, add an input port – this input port will serve as the trigger for the ML model apply after the data has been loaded to the appropriate location expected by the custom operator. If the custom operator will just apply the model on top of an already existing HANA table, you don’t need any input ports, just an output one. In case you might have both scenarios, leave the input port open – you can always add an input port later, during the graph design time.

In the “Tags” tab, add the tag that was used to refer to the hana_ml custom dockerfile. In my case, that tag was “hana_ml:1.0.7”. The other prerequisite tags in the dockerfile (opensuse, python36 and tornado:5.0.2) are inherited from the Python3 operator and don’t need to be explicitely defined in the custom operator.


Figure 5 – Custom Operator – Tags

Custom Operator – Configuration

 

The “Configuration” tab is where we can define the custom parameters that can modify the operator’s runtime behavior. Defining custom parameters whenever possible allows for more reusability of the custom operator by different users and teams.

There are several types of custom parameters, ranging from simple types such as String, Integer and Boolean, to arrays, to complex types, that can be a collection of other parameters (“Object” type) or predefined types based on pre-existing definitions (“Custom” types).

There are also different ways of how to define custom parameters. The straightforward way is to click on the “+” button next to the “Parameters” list and enter the parameter name and type. Notice that this button is only available when no “configuration schema” is selected. Also, through this method, only a few types are available to be selected (String, Boolean and Object). For anything else, it is necessary to use a Config Schema.

Config Schemas are basically configuration presets that are saved to a local definition file in the SAP Data Hub repository (/vrep/). When creating a new custom operator, one can either import an existing Config Schema or create a new one from scratch. In our case, we will create a new Config Schema. To do so, click on the “Generate Config Schema” button in the “Config Schema” session of the “Configuration” tab (above the “Parameters” list). A new config schema will be created with the same fully qualified name as the operator (namespace + name).


Figure 6 – Custom Operator – Configuration

Once a Config Schema has been created, in order to modify it and add Parameters, one needs to click on the pencil icon in the Parameters section to edit the schema. Once in the Schema definition screen, you’ll see a list of Properties to the left – these are the Custom Parameters in the Config Scherma – and their definition to the right. We will add the following properties:

  • hanaConnection:
    • Title/Description: HANA Connection
    • Data Type: Object
    • Visibility: Visible
  • hanaSchema:
    • Title/Description: HANA Schema
    • Data Type: String
    • Visibility: Visible
  • hanaModelTable:
    • Title: HANA Model Table
    • Description: HANA Table including trained model
    • Data Type: String
    • Visibility: Visible
  • hanaApplyTable:
    • Title: HANA Apply Table
    • Description: HANA Table with data for apply
    • Data Type: String
    • Visibility: Visible
  • hanaPredictTable:
    • Title: HANA Predict Table
    • Description: Target HANA Table for predicted data
    • Data Type: String
    • Visibility: Visible
  • hanaSequence:
    • Title: HANA Sequence
    • Description: HANA Sequence (for APPLY_ID)
    • Data Type: String
    • Visibility: Visible

The next two properties should already be there (you can change their order by selecting them and clicking on the up and down arrows on the top of the screen):

  • codelanguage
    • Data Type: String
  • script:
    • Data Type: String

Your properties should look like this now:

Figure 7 – Config Schema properties

Now that we have defined the first level properties, we will enter the details for the hanaConnection property. Given it has a type “Object”, it can hold additional properties of its own. Click on the arrow to the right of the hanaConnection property, and it will open a new set of properties under hanaConnection. Add the following properties:

  • configurationType
    • Title/Description: Configuration Type
    • Data Type: String
    • Value Help: Pre-defined Values
    • Values (without quotes):
      • ” “
      • “Configuration Manager”
      • “Manual”
    • Visibility: Visible
  • connectionID
    • Title/Description: Connection ID
    • Data Type: String
    • Format: com.sap.dh.connection.id
    • Value Help: Value from Service
    • Url: /app/datahub-app-connection/connections?connectionTypes=HANA_DB
    • Value path: id
    • Display Mode: Autocomplete
    • Visibility: Conditional
    • Conditions:
      • Property name: configurationType
      • Value: Configuration Manager
  • connectionProperties
    • Title/Description: Connection Properties
    • Data Type: Custom Type
    • Type: com.sap.dh.connections.hana_db
    • Visibility: Conditional
    • Conditions:
      • Property name: configurationType
      • Value: Manual

The hanaConnection properties should look like this:

Figure 8 – hanaConnection properties

Tip: if you are having difficulties entering a blankspace in the list of Values for the configurationType property, go into the JSON view (top right menu) and edit the “enum” attribute directly.

Tip: if you wish to make it faster, just copy the code below and replace the existing content of the JSON view. If you chose a different name for your Custom Operator, just copy and paste the content of the “properties” attribute below (leaving the “$schema”, $id” and “type” attributes unaltered). When you switch back to the Form view, you should see all properties properly filled up.

{
	"$schema": "http://json-schema.org/draft-06/schema#",
	"$id": "http://sap.com/vflow/gcoe.predict_mortgage_default.configSchema.json",
	"type": "object",
	"properties": {
		"hanaConnection": {
			"title": "HANA Connection",
			"description": "HANA Connection",
			"type": "object",
			"properties": {
				"configurationType": {
					"title": "Configuration Type",
					"description": "Configuration Type",
					"type": "string",
					"enum": [
						" ",
						"Configuration Manager",
						"Manual"
					]
				},
				"connectionID": {
					"title": "Connection ID",
					"description": "Connection ID",
					"type": "string",
					"format": "com.sap.dh.connection.id",
					"sap_vflow_valuehelp": {
						"url": "/app/datahub-app-connection/connections?connectionTypes=HANA_DB",
						"valuepath": "id",
						"displayStyle": "autocomplete"
					},
					"sap_vflow_constraints": {
						"ui_visibility": [
							{
								"name": "configurationType",
								"value": "Configuration Manager"
							}
						]
					}
				},
				"connectionProperties": {
					"title": "Connection Properties",
					"description": "Connection Properties",
					"$ref": "http://sap.com/vflow/com.sap.dh.connections.hana_db.schema.json",
					"sap_vflow_constraints": {
						"ui_visibility": [
							{
								"name": "configurationType",
								"value": "Manual"
							}
						]
					}
				}
			}
		},
		"hanaSchema": {
			"title": "HANA Schema",
			"description": "HANA Schema",
			"type": "string"
		},
		"hanaModelTable": {
			"title": "HANA Model Table",
			"description": "HANA Table including trained model",
			"type": "string"
		},
		"hanaApplyTable": {
			"title": "HANA Apply Table",
			"description": "HANA Table with data for apply",
			"type": "string"
		},
		"hanaPredictTable": {
			"title": "HANA Predict Table",
			"description": "Target HANA Table for predicted data",
			"type": "string"
		},
		"hanaSequence": {
			"title": "HANA Sequence",
			"description": "HANA Sequence (for APPLY_ID)",
			"type": "string"
		},
		"codelanguage": {
			"type": "string"
		},
		"script": {
			"type": "string"
		}
	}
}

 

After you are done filling up the properties, either manually or via the JSON view, click on OK. That is it for the “Configuration” tab.

Custom Operator – Script

 

Before we go into the script that will retrieve the connection details, it would be helpful to describe what happens in runtime with these properties.

During the graph modeling, the user needs to configure all the necessary properties for the proper execution of the Custom operator, including the hanaConnection property. The value helper will list all connections of the type HANA_DB currently maintained in the Connection Manager, so that we can choose which HANA system it should connect to apply the ML model. And even though the connectionProperties property is left empty, during the graph execution runtime, the Pipeline (vflow) runtime will fill out these values implicitely, so that they can be used by the executed graph.

The way we access these properties in the Python script is not different from the way we would access any other property, i.e., via the api.config object. For example, reading the api.config.hanaSchema value would retrieve the hanaSchema property that was maintained during the graph configuration. For the hanaConnection property, in particular, given its nested properties are a JSON in itself, they can be accessed as simple Python dictionaries. For example, the HANA Connection hostname can be accessed by reading the api.config.hanaConnection[‘connectionProperties’][‘host’] value.

The script below retrieves all necessary connection details from the chosen connection and applies a previously trained PAL decision tree regressor model (saved in the hanaModelTable) to the data in the defined hanaApplyTable. The outputs are saved in a temporary uniquely named table, joined with the APPLY_ID value retrieved from a sequence (used to control different apply runs) and then saved to the desired output table (hanaPredictTable). The temporary table is then dropped for clean up. These commands are created as sequential SQL statements sent to a downstream HANA Client operator to be executed. Copy and paste it in the Script tab of the Custom Operator.

import hana_ml as hana_ml
from hana_ml.algorithms.pal import trees
from datetime import datetime

hanaConn = api.config.hanaConnection['connectionProperties']
conn = hana_ml.dataframe.ConnectionContext(hanaConn['host'], hanaConn['port'], hanaConn['user'], hanaConn['password'])

def apply():
    
    # Retrieve model
    df_model_saved = hana_ml.dataframe.DataFrame(conn, 'select * from ' + api.config.hanaSchema + '.' + api.config.hanaModelTable)
    tree_reg_saved = trees.DecisionTreeRegressor(conn, algorithm='cart')
    tree_reg_saved.model_ = df_model_saved.select('ROW_INDEX', 'MODEL_CONTENT')

    # Create HANA dataframe for the table that holds the data for prediction
    df_apply = conn.table(api.config.hanaApplyTable, schema=api.config.hanaSchema)

    # Predict the probability of default
    features = ['INCOME', "BOCREDITSCOR", "COBOCREDITSCOR", "LTV", "TERM", "RATE", "BOAGE", "COAGE"]
    df_predict = tree_reg_saved.predict(df_apply, features=features, key="ASSIGNED_ID").select("ASSIGNED_ID", "SCORE").filter("SCORE > 0.5")

    # Save dataframe to HANA table
    table_name = api.config.hanaPredictTable + '_' + datetime.now().strftime('%Y%m%d%H%M%S')
    df_predict.save((api.config.hanaSchema, table_name))

    # Create SQL command for subsequent processing
    sqlStmt = ('SELECT ' + api.config.hanaSchema + '.' + api.config.hanaSequence + '.nextval from DUMMY;' +
        ' INSERT INTO ' + api.config.hanaSchema + '.' + api.config.hanaPredictTable + 
        ' SELECT ' + api.config.hanaSchema + '.' + api.config.hanaSequence + '.currval, * FROM ' + api.config.hanaSchema + '.' + table_name +
        '; DROP TABLE ' + api.config.hanaSchema + '.' + table_name + ';')
    api.send("output", sqlStmt)

api.add_generator(apply)


def shutdown():
     conn.close()

api.add_shutdown_handler(shutdown)

 

You can now save the operator (Save button on the Editor toolbar on the top).

Create Pipeline

We can now create a pipeline that executes the operator we have just defined. Create a new graph and save it. Drag and drop the following operators:

  • the newly created Custom Operator;
  • a toMessage converter;
  • a HANA Client; and
  • a Graph Terminator.

Connect them as described below:

  • Custom Operator’s output port to the toMessage’s inbody port;
  • toMessage’s out port to the HANA Client’s sql port;
  • HANA Client’s result port to the Graph Terminator’s stop port.

The graph should look like this now:


Figure 9 – Predict Mortgage Default Graph

Next, select the custom operator and start filling the parameters. First, click on the pencil on the HANA Connection parameter and a screen for the connection selection will pop up. Pick your desired HANA connection.

Figure 10 – Connection Selection screen

Continue filling the other necessary parameters:


Figure 11 – Custom Operator Configuration view

Tip: the .fit() and .predict() methods of the PAL and APL classes of the hana_ml Python API accept both tables and views as the input dataframe. In our case, I have used a view as the input dataframe for the apply.

The only other parameter that needs to be filled is the Connection in the HANA Client (it needs to be the same as the one chosen for the Custom Operator). Since we will be executing SQL and not loading data, all the other HANA Client parameters can be left empty. Just in case, make sure the “Terminate on Error” HANA Client parameter is set to True.

Tip: instead of creating the graph manually, you can also switch to the graph JSON view, copy and paste the JSON definition below and switch back to the Diagram view. Notice that you might need to edit the technical name of the Custom Operator, in case you named it something else (or had a different subfolder structure).

{
	"properties": {},
	"icon": "",
	"iconsrc": "",
	"description": "Predict Mortgage Default",
	"processes": {
		"graphterminator1": {
			"component": "com.sap.util.graphTerminator",
			"metadata": {
				"label": "Graph Terminator",
				"x": 480.99999809265137,
				"y": 12,
				"height": 80,
				"width": 120,
				"config": {}
			}
		},
		"saphanaclient1": {
			"component": "com.sap.hana.client2",
			"metadata": {
				"label": "SAP HANA Client",
				"x": 311.99999809265137,
				"y": 12,
				"height": 80,
				"width": 120,
				"config": {
					"connection": {
						"connectionProperties": {
							"additionalHosts": [],
							"host": "host",
							"password": "",
							"port": 9000,
							"useTLS": false,
							"user": ""
						},
						"configurationType": "Configuration Manager",
						"connectionID": "AWS_HANA24"
					}
				}
			}
		},
		"tomessageconverter1": {
			"component": "com.sap.util.toMessageConverter",
			"metadata": {
				"label": "ToMessage Converter",
				"x": 196.99999904632568,
				"y": 27,
				"height": 50,
				"width": 50,
				"config": {}
			}
		},
		"predictmortgagedefault1": {
			"component": "gcoe.predict_mortgage_default",
			"metadata": {
				"label": "Predict Mortgage Default",
				"x": 12,
				"y": 12,
				"height": 80,
				"width": 120,
				"extensible": true,
				"config": {
					"script": "import hana_ml as hana_ml\nfrom hana_ml.algorithms.pal import trees\nfrom datetime import datetime\n\nhanaConn = api.config.hanaConnection['connectionProperties']\nconn = hana_ml.dataframe.ConnectionContext(hanaConn['host'], hanaConn['port'], hanaConn['user'], hanaConn['password'])\n\ndef apply():\n    \n    # Retrieve model\n    df_model_saved = hana_ml.dataframe.DataFrame(conn, 'select * from ' + api.config.hanaSchema + '.' + api.config.hanaModelTable)\n    tree_reg_saved = trees.DecisionTreeRegressor(conn, algorithm='cart')\n    tree_reg_saved.model_ = df_model_saved.select('ROW_INDEX', 'MODEL_CONTENT')\n\n    # Create HANA dataframe for the table that holds the data for prediction\n    df_apply = conn.table(api.config.hanaApplyTable, schema=api.config.hanaSchema)\n\n    # Predict the probability of default\n    features = ['INCOME', \"BOCREDITSCOR\", \"COBOCREDITSCOR\", \"LTV\", \"TERM\", \"RATE\", \"BOAGE\", \"COAGE\"]\n    df_predict = tree_reg_saved.predict(df_apply, features=features, key=\"ASSIGNED_ID\").select(\"ASSIGNED_ID\", \"SCORE\").filter(\"SCORE > 0.5\")\n\n    # Save dataframe to HANA table\n    table_name = api.config.hanaPredictTable + '_' + datetime.now().strftime('%Y%m%d%H%M%S')\n    df_predict.save((api.config.hanaSchema, table_name))\n\n    # Create SQL command for subsequent processing\n    sqlStmt = ('SELECT ' + api.config.hanaSchema + '.' + api.config.hanaSequence + '.nextval from DUMMY;' +\n        ' INSERT INTO ' + api.config.hanaSchema + '.' + api.config.hanaPredictTable + \n        ' SELECT ' + api.config.hanaSchema + '.' + api.config.hanaSequence + '.currval, * FROM ' + api.config.hanaSchema + '.' + table_name +\n        '; DROP TABLE ' + api.config.hanaSchema + '.' + table_name + ';')\n    api.send(\"output\", sqlStmt)\n\napi.add_generator(apply)\n\n\ndef shutdown():\n     conn.close()\n\napi.add_shutdown_handler(shutdown)",
					"hanaConnection": {
						"configurationType": "Configuration Manager",
						"connectionID": "AWS_HANA24",
						"connectionProperties": {}
					},
					"hanaPredictTable": "DEFAULT_MORTGAGES",
					"hanaSequence": "SEQ_APPLY_ID",
					"hanaModelTable": "DEFAULT_LOAN_MODEL_REGTREE",
					"hanaSchema": "TEST",
					"hanaApplyTable": "V_APPLY_MORTGAGES",
					"codelanguage": "python"
				}
			}
		}
	},
	"groups": [],
	"connections": [
		{
			"metadata": {
				"points": "250.99999904632568,52 278.9999985694885,52 278.9999985694885,43 306.99999809265137,43"
			},
			"src": {
				"port": "out",
				"process": "tomessageconverter1"
			},
			"tgt": {
				"port": "sql",
				"process": "saphanaclient1"
			}
		},
		{
			"metadata": {
				"points": "435.99999809265137,52 475.99999809265137,52"
			},
			"src": {
				"port": "result",
				"process": "saphanaclient1"
			},
			"tgt": {
				"port": "stop",
				"process": "graphterminator1"
			}
		},
		{
			"metadata": {
				"points": "136,52 164,52 164,43 191.99999904632568,43"
			},
			"src": {
				"port": "output",
				"process": "predictmortgagedefault1"
			},
			"tgt": {
				"port": "inbody",
				"process": "tomessageconverter1"
			}
		}
	],
	"inports": {},
	"outports": {}
}

Save and run your graph. it should finish with a success message, and your predicted data should be in the target table maintained in the HANA Predict Table parameter. You can then proceed with any analyses you wish on the target (HANA) side, for instance, creating a simple calc view to see the % of predicted defaults. If this was historical data that had already been labeled, you could even create a confusion matrix.


Figure 12 – Prediction Results in HANA

Conclusion

The usage of complex properties on Custom Operators enable several advanced usages of the Pipeline runtime. In this blog, we have seen how to explore complex properties in order to retrieve connections from the SAP Data Hub Connection Manager. This has several advantages compared to hard coding the connection details in an operator, specially related to security and maintainability.

We have explored the consumption of such complex properties in a Python-based operator, however they are also available for other engines. For instance, if one wants to consume a connection’s properties in a JavaScript-based operator, it is possible to do:

var connectionObj = $.config.getObject("connection").connectionProperties;

In a future blog, we will explore the construction of the “Predict Mortgage Default” scenario that was depicted here, particularly the Jupyter Notebooks that are used to prepare the data and train the model.

4 Comments
You must be Logged on to comment or reply to a post.