Skip to Content
Technical Articles

Develop Dynamic Customisable Python Operators

In this article I would like to provide you with a step by step guide on how to make your custom python operator’s configurations even more flexible by allowing the enduser to dynamically configure your operator. Therefore they will use placeholders in your operator’s configurations, which will then be replaced at runtime with a provided Message header value in one of the input Messages. These placeholders will be in the form of \${<varName>=<defaultValue>} where “varName” specifies the input Message attribute name that should be placed here and if it does not exists the default value will be taken.

Please note that this guide might need some first experiences with SAP Data Hub Python operators. That’s why I recommend everybody to first have a look at this series of tutorials that my colleague Jens Rannacher has written:

Example Goal: Dynamic Path Formatting

You might know this feature already if you’ve worked with the standard “Write File” operator. This operator allows dynamic path formatting using schemes and Message headers. In this guide we will have a look at the Message header part. This makes it possible for the enduser to dynamically write files into paths such as /my/main/path/\${current-year}/\${current-month}/\${filename}.csv resulting in e.g. “/my/main/path/2019/10/retailer001.csv”. Below you can see a screenshot of the operator’s documentation about it’s dynamic path formatting feature.

Let’s put that into our Custom Python Operator

At the end of this guide you should be able to run a graph with a custom python operator that is able to replace variable names in it’s path configuration with the Message headers of the input Message.

The Message generator will generate Messages with random current_month and current_year attribute values. The attribute filename will not be provided so that the operator has to replace this configuration variable with a default value. We will configuring the operators “path” configuration with the following string “/my/main/path/\${current_year}/\${current_month}/\${filename=foo}.csv”. The result will be written into the output Message and shown in the second wiretap for demonstration purposes. The result should look similar to this output:

[2019-11-04 08:14:00,000] {"current_month":"4","current_year":"2013","message.id":0}/my/main/path/2013/4/foo.csv
[2019-11-04 08:14:01,000] {"current_month":"4","current_year":"2014","message.id":1}/my/main/path/2014/4/foo.csv
[2019-11-04 08:14:01,000] {"current_month":"8","current_year":"2005","message.id":2}/my/main/path/2005/8/foo.csv
[...]

1. Create a new Graph with our Message Generator

First of all create a new graph, drag and drop the “Message Generator” and the “Wiretap” operators into your graph, connect them and then open the code editor of the “Message Generator” operator. Replace the code with the following snippet:

var counter = 0;

getRandomInt = function(min, max) {
    return Math.floor(Math.random() * (max - min + 1)) + min;
};

generateMessage = function() {
    var msg = {};
    msg.Attributes = {"message.id": counter, "current_month": getRandomInt(1, 12).toString(), "current_year": getRandomInt(2000, 2019).toString()};
    msg.Body = "";
    
    counter++;
    return msg;
}

$.addTimer("500ms",doTick);

function doTick(ctx) {
    $.output(generateMessage());
}

Save the graph, run the graph and have a look at the output of the first wiretap. You should see an output similar to this:

[2019-11-04 08:39:24,000] {"current_month":"4","current_year":"2018","message.id":35}
[2019-11-04 08:39:24,000] {"current_month":"10","current_year":"2011","message.id":36}
[2019-11-04 08:39:25,000] {"current_month":"11","current_year":"2007","message.id":37}
[...]

2. Add the Custom Python Operator

Drag and Drop a “Python 3” and a second “Wiretap” operator into your graph. Add the following two ports to your custom python 3 operator:

  1. Name: “in”, Type: “message”, “Input”
  2. Name: “out”, Type: “message”, “Output”

Connect the operators as shown in the picture above and open the “Python 3” operator’s code editor. Replace the code with the following snippet. Please have a closer look at the code now and try to understand it. Basically there are two functions here that will turn your configurations into dynamic configurations. “dynamic_config_string” for configurations of type “string” and “dynamic_config_dict” for configurations of type “Object” (dicts). To replace the placeholders with Message attribute values we make use of the regex library “re” with the following search pattern “${<any>}”.

import re
import json

# RegEx to search for ${<varName>} --> The backslash will be escaped in the configString anyway to prevent Modeler from interpreting it as a substitution parameter
# config: "\${current_month}" will look like this in the api.config variable: "${current_month}"
pattern = re.compile('\$\{[^\}]*\}') 

# Function that replaces all variables in "inConfigString" with attribute values of "inMsg"
# If no Message attribute value is provided the default value will be used
# If no default value is provided an Exception will be thrown
# This function returns the result configString
def dynamic_config_string(inConfigString,inMsg):
    global pattern
    # Find all variables in the inConfigString e.g. ['\${current_month=10}','\${current_year}','\${filename}']
    variables = pattern.findall(inConfigString)
    for var in variables: # in var you will see e.g. \${current_month=10} or \${current_month}
        split=var.strip("$\{}").split("=") # in key you will see e.g. ['current_month','10'] or ['current_month']
        # try replacing the variable with the provided inMsg attribute value
        try: 
            value=inMsg.attributes[split[0]]
            inConfigString=inConfigString.replace(var,str(value)) 
        # KeyError -> inMsg attribute is not provided
        except KeyError:
            # try replacing the variable with the provided default value
            try: 
                default=split[1]
                inConfigString=inConfigString.replace(var,default) 
            # IndexError -> Default value is not provided
            except IndexError: 
                raise Exception("Message header '"+split[0]+"' undefined and no default value provided")
    return inConfigString

# Wrapper function that replaces all variables in "inConfigDict" with attribute values of "inMsg" 
# If no Message attribute value is provided the default value will be used
# If no default value is provided an Exception will be thrown
# This function returns the result configDict
def dynamic_config_dict(inConfigDict,inMsg):
    try:
        inConfigString=dynamic_config_string(json.dumps(inConfigDict),inMsg)
        return json.loads(inConfigString)
    except Exception as e:
        raise e

def on_input(msg):
    path = dynamic_config_string(api.config.path,msg)
    msg.body=path
    api.send("out",msg)

api.set_port_callback("in",on_input)

As you can see in the “on_input” function we basically just need to provide the configuration and the input Message and the result will be written into the output Message’s body.

3. Add the Dynamic Path Configuration

Now that our custom python operator is able to handle dynamic configurations, we only need to add the “path” configuration to the operator. Therefore click on the operator’s configuration icon and add a new property to the operator’s configurations with the following values:

  • Name: “path” (we consume it in our operator’s code with api.config.path)
  • Type: “text”
  • Value: “/my/main/path/\${current_year}/\${current_month}/\${filename=foo}.csv”

Save your graph and run it. Now it should deliver the results we would expect in the second Wiretap:

[2019-11-04 08:14:00,000] {"current_month":"4","current_year":"2013","message.id":0}/my/main/path/2013/4/foo.csv
[2019-11-04 08:14:01,000] {"current_month":"4","current_year":"2014","message.id":1}/my/main/path/2014/4/foo.csv
[2019-11-04 08:14:01,000] {"current_month":"8","current_year":"2005","message.id":2}/my/main/path/2005/8/foo.csv
[...]

Congratulations, you have reached the end of this tutorial.

4. (Bonus) Dynamic JSON Dict Configurations

Remember our dynamic_config_dict function above that we did not make use of? It basically just converts your JSON dict into a string to replace the variables and then converts the resultString back into a resultDict. Let’s try it out by adjusting our custom Python operator’s on_input function like this:

def on_input(msg):
    path = dynamic_config_string(api.config.path,msg)
    msg.body=path
    # --> This section is new
    resultDict = dynamic_config_dict(json.loads(api.config.metadataObj),msg)
    msg.attributes["metadataObj"]=resultDict
    # <--
    api.send("out",msg)

 

Of course we will also need to add the new configuration “metadataObj” to our operator. Just follow the steps mentioned in “3. Add the Dynamic Path Configurations” with the following values:

  • Name: “metadataObj”
  • Type: “json”
  • Value: {
    “year”: “\${current_year}”,
    “month”: “\${current_month}”,
    “filename”: “\${filename=foo}.csv”
    }

The results of the second Wiretap should now look similar to this output:

{"current_month":"6","current_year":"2007","message.id":21,"metadataObj":{"filename":"foo.csv","month":"6","year":"2007"}}/my/main/path/2007/6/foo.csv

Note!: For simplicity reasons we used the functionality to add a configuration of type “json” in the UI here. This configuration however is still interpreted as a json string. So basically we convert it to a JSON Dict first (json.loads()) to call the dynamic_config_dict function even though we could’ve called the dynamic_config_string function directly. However if you create an own operator that extends from the Python 3 operator, it is possible to add configuration attributes of type “Object”. These configurations will be interpreted directly as a dict. Here you will need to use this function. Just keep that in mind and enjoy working with dynamic configurations!

Final Graph JSON (including Bonus):

As a reference you can find the final graph.json attached here. Just create a new graph, switch to “JSON” view (top-right corner) and copy&paste this graph.json into your modeler. Switch back to “Diagram” view and you will see the full graph. Save and run the graph to try it out.

{
	"properties": {},
	"description": "Python Operator Dynamic Path Formatting",
	"processes": {
		"python3operator1": {
			"component": "com.sap.system.python3Operator",
			"metadata": {
				"label": "Dynamic Path Config",
				"x": 350,
				"y": 12,
				"height": 80,
				"width": 120,
				"extensible": true,
				"config": {
					"script": "import re\nimport json\n\n# RegEx to search for ${<varName>} --> The backslash will be escaped in the body string anyway\n# config: \"/my/main/path/\\${current_year}/\\${current_month}/\\${filename}.csv\" will look like this in the api.config variable: \"/my/main/path/${current_year}/${current_month}/${filename}.csv\"\n# If you find a way to read api.config as raw string let me know! \npattern = re.compile('\\$\\{[^\\}]*\\}') \n\n# Function that replaces all variables in \"inConfigString\" with attribute values of \"inMsg\"\n# If no Message attribute value is provided the default value will be used\n# If no default value is provided an Exception will be thrown\n# This function returns the result configString\ndef dynamic_config_string(inConfigString,inMsg):\n    global pattern\n    # Find all variables in the inConfigString e.g. ['\\${current_month=10}','\\${current_year}','\\${filename}']\n    variables = pattern.findall(inConfigString)\n    for var in variables: # in var you will see e.g. \\${current_month=10} or \\${current_month}\n        split=var.strip(\"$\\{}\").split(\"=\") # in key you will see e.g. ['current_month','10'] or ['current_month']\n        # try replacing the variable with the provided inMsg attribute value\n        try: \n            value=inMsg.attributes[split[0]]\n            inConfigString=inConfigString.replace(var,str(value)) \n        # KeyError -> inMsg attribute is not provided\n        except KeyError:\n            # try replacing the variable with the provided default value\n            try: \n                default=split[1]\n                inConfigString=inConfigString.replace(var,default) \n            # IndexError -> Default value is not provided\n            except IndexError: \n                raise Exception(\"Message header '\"+split[0]+\"' undefined and no default value provided\")\n    return inConfigString\n\n# Wrapper function that replaces all variables in \"inConfigDict\" with attribute values of \"inMsg\" \n# If no Message attribute value is provided the default value will be used\n# If no default value is provided an Exception will be thrown\n# This function returns the result configDict\ndef dynamic_config_dict(inConfigDict,inMsg):\n    try:\n        inConfigString=dynamic_config_string(json.dumps(inConfigDict),inMsg)\n        return json.loads(inConfigString)\n    except Exception as e:\n        raise e\n\ndef on_input(msg):\n    path = dynamic_config_string(api.config.path,msg)\n    msg.body=path\n    # --> This section is new\n    resultDict = dynamic_config_dict(json.loads(api.config.metadataObj),msg)\n    msg.attributes[\"metadataObj\"]=resultDict\n    # <--\n    api.send(\"out\",msg)\n\napi.set_port_callback(\"in\",on_input)",
					"path": "/my/main/path/\\${current_year}/\\${current_month}/\\${filename=foo}.csv",
					"metadataObj": "{\n    \"year\": \"\\${current_year}\",\n    \"month\": \"\\${current_month}\",\n    \"filename\": \"\\${filename=foo}.csv\"\n}"
				},
				"additionalinports": [
					{
						"name": "in",
						"type": "message"
					}
				],
				"additionaloutports": [
					{
						"name": "out",
						"type": "message"
					}
				]
			}
		},
		"messagegenerator1": {
			"component": "com.sap.util.dataMessageGenerator",
			"metadata": {
				"label": "Message Generator",
				"x": 12,
				"y": 12,
				"height": 80,
				"width": 120,
				"extensible": true,
				"config": {
					"script": "var counter = 0;\n\ngetRandomInt = function(min, max) {\n    return Math.floor(Math.random() * (max - min + 1)) + min;\n};\n\ngenerateMessage = function() {\n    var msg = {};\n    msg.Attributes = {\"message.id\": counter, \"current_month\": getRandomInt(1, 12).toString(), \"current_year\": getRandomInt(2000, 2019).toString()};\n    msg.Body = \"\";\n    \n    counter++;\n    return msg;\n}\n\n$.addTimer(\"500ms\",doTick);\n\nfunction doTick(ctx) {\n    $.output(generateMessage());\n}\n"
				}
			}
		},
		"wiretap1": {
			"component": "com.sap.util.wiretap",
			"metadata": {
				"label": "Wiretap",
				"x": 181,
				"y": 12,
				"height": 80,
				"width": 120,
				"ui": "dynpath",
				"config": {}
			}
		},
		"wiretap2": {
			"component": "com.sap.util.wiretap",
			"metadata": {
				"label": "Wiretap",
				"x": 519,
				"y": 12,
				"height": 80,
				"width": 120,
				"ui": "dynpath",
				"config": {}
			}
		}
	},
	"groups": [],
	"connections": [
		{
			"metadata": {
				"points": "136,52 176,52"
			},
			"src": {
				"port": "output",
				"process": "messagegenerator1"
			},
			"tgt": {
				"port": "in",
				"process": "wiretap1"
			}
		},
		{
			"metadata": {
				"points": "305,52 345,52"
			},
			"src": {
				"port": "out",
				"process": "wiretap1"
			},
			"tgt": {
				"port": "in",
				"process": "python3operator1"
			}
		},
		{
			"metadata": {
				"points": "474,52 514,52"
			},
			"src": {
				"port": "out",
				"process": "python3operator1"
			},
			"tgt": {
				"port": "in",
				"process": "wiretap2"
			}
		}
	],
	"inports": {},
	"outports": {}
}
Be the first to leave a comment
You must be Logged on to comment or reply to a post.