Technical Articles
SAP Data Hub – Develop a custom Pipeline Operator with own Dockerfile (Part 3)
In the previous parts of this series of tutorials, you learned how to develop your own Pipeline Operator without the need of bringing an own Docker environment. In this the tutorial, I explain how to integrate a custom Dockerfile into the SAP Data Hub and how to use this Dockerfile in a custom operator.
This is the third article of a series of tutorials:
- SAP Data Hub – Develop a custom Pipeline Operator from a Base Operator (Part 1)
- SAP Data Hub – Develop, Run, Monitor and Trace a Data Pipeline (Part 2)
- SAP Data Hub – Develop a custom Pipeline Operator with own Dockerfile (Part 3)
Create an Operator with own Docker File
In the following, we create a custom Python operator “Stock Price Reader” which reads stock prices from a public API.
Python is a programming language which is natively supported by the pre-shipped Python base operators. This means that the SAP Data Hub pipeline engine is capable running Python scripts natively without the need of providing your own Python environment and there is an API available which allows you to integrate your own Python code, reading configuration parameters and connecting to ports without the need of messing around with process execution and the handling of arguments. The Stock Price Reader requires a specific Python library that is not included in the Python standard library and therefore requires a custom Docker image that provides Python with that library.
It is worth mention that there is a pre-shipped HTTP Client Operator that could be used to achieve the same. Nevertheless, we use this simple example to demonstrate the Docker-based extensibility concept of the SAP Data Hub.
1. Create a Dockerfile
A Docker image is described by a Dockerfile (https://docs.docker.com/engine/reference/builder/), which is a text document that contains all the os level commands required to assemble a Docker image. In the SAP Data Hub, all the pre-shipped and also the custom Dockerfiles are stored in a repository together with the operators and pipelines (graphs). In the following, you learn how to create a Dockerfile in the SAP Data Hub Pipeline Modeler which is later used in our custom operator.
To separate your Dockerfiles from the pre-shipped Dockerfiles, create an own root folder in the Docker Files section:
- Open the Repository tab in the SAP Data Hub Pipeline Modeler, navigate to the Docker Files section, right-click and click on Create Folder:
- Type in a Name for the folder, in our case we choose “acme” and click OK:
Next, create a subfolder in the root folder for categorizing your Dockerfile:
- Right-click on the previously created “acme” folder and click again Create Folder:
- Type in a Name for the folder (in our case we type “python” as we want to structure all Dockerfiles with Python in the same folder). Then click OK:
Now, we create a Dockerfile that will be used when running your custom operator.
- Right-click on the folder “python” and click on Create Docker File:
- Type in a name for the Docker File, in our case we type “requests” and click OK:
A new tab opens where you can describe the details of the Dockerfile.
- In the Code Editor, paste the following Dockerfile instructions:
# Use an official Python 3.6 image as a parent image
FROM python:3.6.4-slim-stretch
# Install python library "requests"
RUN pip install requests
# Install python library "tornado" (Only required with SAP Data Hub version >= 2.5)
RUN pip install tornado==5.0.2
- The FROM instruction initializes a new build stage and sets Python in version 3.6 as the base image for subsequent instructions.
- The RUN command installs the Python library requests with the Python package manager pip. This library is later used in our custom operator.
Next, provide tags for the Docker image to describe its properties:
- Open the Docker File Configuration Pane by clicking on the icon in the upper right corner:
- Add new Tags by clicking on the “+”-icon:
- Add the Tag “python36”: We use this tag to declare that our Docker image includes Python version 3.6. We could also choose a different name and add the version to the corresponding version field on the rights side. However, this tag is used in the pre-shipped Python operator for which reason we choose the same naming convention.
- Add the Tag “python_requests”: We use this tag to declare that the Python library requests is available in the Docker image.
- When you are using SAP Data Hub version >= 2.5, you also need to add the Tag “tornado” with version 5.0.2 as this is required by the updated Python Subengine.
- Save the Dockerfile by pressing [CTRL]+[S] or click on Save in the upper right corner:
- Build the Docker Image by clicking on the Build icon in the upper right side:
You can monitor the status of the Docker build process from the Log tab in the bottom pane:
Once finished, the SAP Data Hub Pipeline pushes the image to the local Docker registry that was configured during installation of the SAP Data Hub Distributed Runtime.
2. Create a custom Operator using the Docker Image
2.1. Create the Operator
- Expand the Operators section in the Repository
- Right-click the folder “acme” that you have created in the previous tutorials and choose the Create Operator menu option:
- In the Name text field, provide the name “stock_price_reader” for the new operator.
- In the Display Name text field, provide the display name “Stock Price Reader” for the operator.
- In the Base Operator dropdown list, select the “Python3Operator”:
The Python3Operator allows us to run inlined Python code or an attached Python script.
- Choose OK.
The tool opens the form-based Operator Editor Window:
2.2. Define the Input and Output Ports
- Add an Input Port with the name “input” of type “string”:
This port will be used later to pass a stock symbol to the operator and to trigger a request for the stock price.
- Add two Output Ports with the name “output” and “debug”, both of the same type “string”:
The “output” port will be used later to send the requested stock price and the “debug” port will be used to send debug messages, such as error messages.
2.3. Define Tags
The Tags describe the runtime requirements of the operator and allow to force the execution in a specific Docker image instance whose Docker file was annotated with the same Tag and Version.
In our case, we require Python version 3.6 and the Python library “requests” which is not included in the Python standard library. Both are provided by the Dockerfile which we have created before.
- In the Tags section, choose + (Add tag) and choose the tag “python_requests” and the tag “python36”. Since there are no different versions available, we do not need to choose any version:
If the Python standard library would be sufficient, we could also skip adding additional Tags to our operator. Python 3.6 is already provided by the pre-shipped Python36 Docker image that is used by the Python3 Base Operator from which we derived our custom operator.
2.4. Provide the Operator Configuration
In the Operator Configuration section, you can find already one Parameter “codelanguage” that was inherited from the Python3Operator. It is not possible to remove the inherited Parameters, but you can change their default values.
- Add three additional Parameters that we will later use to control the behavior of the operator during runtime:
Name | Type | Default-Value | |
connection_timeout | String | 5 | This allows controlling the connection timeout in seconds |
request_interval | String | 2 | This allows controlling how often the stock price is requested |
stock_symbol | String | sap | This allows controlling which stock price is requested |
You can generate (Auto Propose) a Type from the Parameters that allows adding additional semantics on top, such as validation of the parameter values, UI helpers, the definition of Enums as well as re-use in other operators. However, we do not make use of this feature in this tutorial.
2.5. Define the Operator Script
In the Script section, you can provide your own Python script in two different ways:
- Inline Editor: The code can be written directly into the Script Editor which stores the code together (inline) with the operator definition in the repository. This is the preferred way in cases where you only want to write small scripts that do not require external testing.
- Upload File: You can upload a Python script as a file which is then stored side-by-side with the Operator in the Repository and referenced in the Operator definition. This is the preferred way in cases where you plan to integrate more-complex application logic in Python language which shall also be testable externally, e.g. by accessing the Python script on disk.
In this example, we go for option (2) and upload a file containing a Python script.
- Click on the Inline Editor in the Script section and then click on Uploaded File in the drop-down menu:
This will by default reference and create a file script.py in the Repository as shown on the right side of the Script editor. All the code written into the Script Editor will end up in this script which can be accessed from the Repository or disk.
In our case, we will create an own Python script locally and upload this to the Repository via the Script Editor.
- Open an Editor of your choice, e.g. Notepad, paste the following Python code and save it as a file called “stock_price_reader.py”:
import requests
'''
Retreive latest stock price from public api
'''
def request_stock_price(stock_symbol,connection_timeout):
url = "https://api.iextrading.com/1.0/stock/%s/quote" % (stock_symbol)
r = requests.get(url, timeout=connection_timeout)
if r.status_code == 200:
quote = r.json()
return quote["symbol"] + "," + str(quote["latestPrice"]) + "," + str(quote["latestUpdate"])
else:
raise ValueError(r.content)
'''
Mock pipeline engine api to allow testing outside pipeline engine
'''
try:
api
except NameError:
class api:
def send(port,data):
print("Send data \"" + str(data) + "\" to port \"" + port + "\"")
def set_port_callback(port, callback):
print("Call \"" + callback.__name__ + "\" to simulate behavior when messages arrive at port \"" + port + "\"..")
callback()
def add_timer(interval, callback):
print("Call \"" + callback.__name__ + "\" to simulate behavior when timer calls the callback.")
callback()
class config:
stock_symbol = "sap"
connection_timeout = "5"
request_interval = "2"
'''
Interface for integrating the request_stock_price() function into the pipeline engine
'''
def interface(stock_symbol=None):
connection_timeout = float(api.config.connection_timeout)
try:
result = request_stock_price(stock_symbol,connection_timeout)
api.send("output", result)
except Exception as inst:
api.send("debug", str(inst) )
def timer_callback():
stock_symbol = api.config.stock_symbol
interface(stock_symbol)
# Triggers the request for every message (the message provides the stock_symbol)
api.set_port_callback("input", interface)
# Triggers the request autonomously every x seconds (The stock_symbol is read from the configuration)
if api.config.request_interval != 0:
api.add_timer(str(api.config.request_interval) + "s", timer_callback)
The script mocks the Python pipeline API, which allows to use and test the Python script externally. The wrapped function request_stock_price does not contain any pipeline dependencies and represents our actual custom code.
- Click the upload button on the right side of the Script Editor:
- Choose the file “stock_price_reader.py” from your local disk where you have stored it, e.g.:
The Pipeline Modeler uploads the file to the Repository, references the script in the Operator definition and shows the content in the Script Editor:
When you now change the code in the Script Editor, it will be changed in the stock_price_reader.py file, although it appears like an inline script.
2.6. Modify the Operator Display Icon
A default operator display icon is used when you create a custom operator. You can change the icon within the tool or upload your own icon in Scalable Vector Graphics (SVG) format.
- In the Operator editor, click the operator’s default icon:
- In the Icon dropdown list, select the wanted icon, in our case we choose “line-chart”:
- Click OK.
The tool uses the new icon for operators when it displays the operator in the Pipeline editor:
2.7. Maintain Documentation for the Operator
- In the operator editor toolbar, click the documentation icon:
- The documentation can be written in Markdown language, e.g.:
Stock Price Reader
===========
This operator reads the latest stock price of a given stock from a public API (https://iextrading.com/apps/stocks/#/)
Configuration parameters
------------
* **connection_timeout** (type int): The connection timeout in seconds
* **request_interval** (type int): Interval which defines how often the stock price is requested
* **stock_symbol** (type string): The symbol of the stock that shall be returned
Input
------------
* **input** (type string): Every message being send to the input port triggers a request of the given stock
Output
------------
* **output** (type string): The stock price in format <symbol>,<stock_price>,<last_update>
* **debug** (type string): Debug messages
- Click on Save to store the text.
2.8. Save the Operator:
- In the editor toolbar, click the Save-icon to save the operator:
3. Explore the Repository Content
Open the System Management application (vsystem) in the browser.
- You can find the host and the TCP port by discovering the vsystem Service in the Kubernetes services, e.g via kubectl:
kubectl get services -n <namespace> | grep vsystem
- Open the File Management by clicking on the corresponding icon on the left side:
- In the View User Files tab, type the Name of the previously created operator “stock_price_reader” into the Search field:
The result shows all files that are stored together with the operator in the Repository.
From the same UI, you can download the corresponding files as .tgz-file via Export Files and import the .tgz-file data via Import Files into another Data Hub instance.
4. Use the Operator in a Pipeline
In the previous tutorials, you have already learned how to create a pipeline by adding and customizing existing operators. In the following steps, you will learn how to define a pipeline by copying a pipeline JSON definition.
- In the navigation pane on the left side, choose the Graphs tab and click on the + icon (Create Graph) to create a new Pipeline:
- Open the JSON view of the Pipeline by clicking on the JSON button on the right side:
- Copy and paste the following JSON definition into the JSON editor:
{
"properties": {},
"description": "",
"processes": {
"terminal1": {
"component": "com.sap.util.terminal",
"metadata": {
"label": "Stock Price Terminal",
"x": 615,
"y": 182,
"height": 80,
"width": 120,
"ui": "dynpath",
"config": {}
}
},
"constantgenerator1": {
"component": "com.sap.util.constantGenerator",
"metadata": {
"label": "Request StockPrice",
"x": 209,
"y": 281,
"height": 80,
"width": 120,
"extensible": true,
"config": {
"mode": "pulse",
"content": "AAPL",
"duration": "2s"
}
}
},
"terminal2": {
"component": "com.sap.util.terminal",
"metadata": {
"label": "Debug Terminal",
"x": 611,
"y": 368,
"height": 80,
"width": 120,
"ui": "dynpath",
"config": {}
}
},
"stockpricereader1": {
"component": "acme.stock_price_reader",
"metadata": {
"label": "StockPrice Reader",
"x": 419,
"y": 281,
"height": 80,
"width": 120,
"extensible": true,
"config": {}
}
}
},
"groups": [],
"connections": [
{
"metadata": {
"points": "333,321 415,321"
},
"src": {
"port": "out",
"process": "constantgenerator1"
},
"tgt": {
"port": "input",
"process": "stockpricereader1"
}
},
{
"metadata": {
"points": "543,312 577,312 577,222 611,222"
},
"src": {
"port": "output",
"process": "stockpricereader1"
},
"tgt": {
"port": "in1",
"process": "terminal1"
}
},
{
"metadata": {
"points": "543,330 575,330 575,408 607,408"
},
"src": {
"port": "debug",
"process": "stockpricereader1"
},
"tgt": {
"port": "in1",
"process": "terminal2"
}
}
],
"inports": {},
"outports": {}
}
- Switch back to the Diagram view to see the rendered definition of the Pipeline that you just copied:
The pipeline uses the Stock Price Reader operator that you have created previously.
- Press [CTRL] + [S] or click on the disk icon to save the Pipeline:
Congratulations, you have reached the end of this tutorial.
People like to read information that can serve simultaneous functions. This blog motivates and give information. So, it is a well attempt to fulfill the queries about SAP.
Why not execute the pipeline?
Thanks for posting Jens - A great tutorial.
@Jens Rannacher
I have followed your instruction in the blog to add "flask" for python to the docker file and used it in Python3Operator using Group Option (right click the operator) and then associate the flask tag + version in the configuration panel. Now when I run my graph I keep getting the following error:
“.Python3Operator: operator.com.sap.system.python3Operator.inst_id=python3operator1: An error happened while executing com.sap.system.python3Operator:python3operator1: Error while executing script in Python Operator: No root path can be found for the provided module "builtins". This can happen because the module came from an import hook that does not provide file name information or because it's a namespace package. In this case the root path needs to be explicitly provided"
I was able to run my code on my laptop without any issues.
Let me know if you have encountered such issue earlier and any suggestion to resolve this issue ?
Thanks,
Rajendra
Hi Rajenda,
I haven't seen this error before, maybe it is caused by different Python or Flask version used locally on your laptop and within the pipeline engine.
Best regards
Jens
@Jens Rannacher
I am trying to create a Docker File in the SAP Data Hub Pipeline Modeler. I have created the folder and gave a name for my Docker File. The Docker File Build is started, but i am recieve an error that the docker image cannot build because "Cannot connect to the Docker daemon at unix:///var/run/docker.sock”.
Do you have a suggestion to resolve this issue?
Thanks,
Tatiana
Great blog thanks Jens! - I'm using DH 2.5 and the above steps work well, the only additional thing I had to do was add tornado to the docker file and tags.
Hi,
I am using version 2.5 of datahub.
Even after adding the tornado tags on docker file and operator, I get
Error building graph: error during init of process: component=com.sap.system.process.subengineOperator process=subengineOperator0: subengine operator init error: failed to start sub-engine: main: Unexpected error while running PythonEngine: No module named ‘tornado’; subengine shutdown error: error stopping graph: expected completed and got ; Post http://localhost:43347/service/stop: dial tcp 127.0.0.1:43347: connect: connection refused
Any help appreciated.
Hi Marcus,
next to the tag, you also need to add the library to the Dockerfile as follows:
I have updated the blog accordingly.
Thanks for the heads-up.
Best regards
Jens
Or, instead of the python:3.6.4-slim-stretch, use the proper SAP-provided python base images. Instead of
you could do
or, from 2.5+ (since the one above was deprecated),
. This way, you guarantee that all libs pre-required by SAP (requests, tornado or any future one) are already there, and you just add your custom libs you need for your custom operator.
Best regards,
Henrique.
Is the $com.sap.opensuse.python36 an actual opensuse leap image? I'm having trouble doing basic zypper commands on it. I'd like to see what's actually in this opensuse python36 image. When I go into the repository and find its dockerfile, I get no clues. This is the only thing in the dockerfile for com.sap.opensuse.python36:
FROM §/com.sap.datahub.linuxx86_64/vflow-python36:2.5.29
It’s prebuilt by SAP and delivered as an image in the docker repo (e.g. ECR in case of AWS) and not as a dockerfile. But it doesn’t have any zypper repos defined. This was working in 2.5 and 2.6:
But I heard they’re removing zypper from the opensuse images due to some open source license requirement, so I wouldn’t rely on it too much… And long term I believe the plan is to switch from opensuse to SLES, so it shouldn’t be a problem for too long.
On DH2.5, in my docker file, if I do..
FROM $com.sap.opensuse.python36
COPY hdbcli-2.3.144.tar.gz /tmp/SAP_HANA_CLIENT
RUN pip install /tmp/SAP_HANA_CLIENT/hdbcli-2.3.144.tar.gz
I get the following error…
The command ‘/bin/sh -c pip install /tmp/SAP_HANA_CLIENT/hana_ml-1.0.5.tar.gz’ returned a non-zero code: 1
But this combination works without errors..
FROM $com.sap.python36
COPY hdbcli-2.3.144.tar.gz /tmp/SAP_HANA_CLIENT
RUN pip install /tmp/SAP_HANA_CLIENT/hdbcli-2.3.144.tar.gz
And I used the toronado 5.0.2 tag explicitly to get my operator/pipeline to work
$com.sap.opensuse.python36 package may not be fully tested yet for all scenarios
Ravi,
pip is not directly available in the opensuse image. And even for the com.sap.python36, it should be pip3 (or else you might be installing it on python27).
On the opensuse image, you can do this:
Hi,
on SAP data intelligence when using any of the prebuild SAP python images, I get this error when running thy zypper command:
“ERROR: zypper was removed due to licensing reasons.
It depends on ‘rpm’ module, which is licensed under sleepycat”
However, it seems that the installation of e.g. c compilers etc needs the zypper command.
Is there a way to resolve this issue ?
Any help appreciated.
I am having the same problem