Using the SAP Data Intelligence Pipeline Python Operator
The example below should be straight forward for you to modify for many python use cases. There’s only really a couple of steps, create a docker (if you need additional Python libraries), configure the Python operator, code, plus input and outputs.
Building a docker
There is a great existing blog that describes how to create a simple docker, so I won’t repeat that here. Below you can see my docker definition.
# Use an official Python 3.6 image as a parent image FROM python:3.6.4-slim-stretch # Install python libraries RUN pip install pandas RUN pip install tornado==5.0.2 # Add vflow user and vflow group to prevent error # container has runAsNonRoot and image will run as root RUN groupadd -g 1972 vflow && useradd -g 1972 -u 1972 -m vflow USER 1972:1972 WORKDIR /home/vflow ENV HOME=/home/vflow
I found acquiring the data into Python as a blob to be the easiest, as I had experienced character encoding issues, using the blob data type avoided this issue. The HTTP Client provides a blob output, which we will connect to.
We want the output of the python operator to be a message so that we can stop the pipeline running as before.
Now we have a Python operator with our input and output defined
import pandas as pd from io import BytesIO def on_input(data): # Acquire Data as Bytes dataio = BytesIO(data) # Load data into Pandas Data Frame, skipping 5 rows df = pd.read_table(dataio, sep=',',skiprows=5, encoding='latin1', names=['ER_DATE','EXCHANGE_RATE']) # Replace the "-" characters with Null df['EXCHANGE_RATE'].replace('-', None, inplace=True) df = df.to_csv(index=False,header=False) # Create a DH Message - Data Hub api.Message attr = dict() attr["message.commit.token"] = "stop-token" messageout = api.Message(body=df, attributes=attr) api.send("outmsg", messageout) api.set_port_callback("input", on_input)
The easiest way I found to specify that my Python3Operator should use the pandas docker image, was to use the “Group” feature. We can then tag the group with the same tags as my docker to link them both together. Just right click on the python operator and choose Group. Now we can see the tags.
With that the pipeline is completed, we can save it (with a new name) and run it.
All being well, the pipeline should complete and we will see the same data as before.
Here’s a couple of links you may want to refer to.
Develop a custom Pipeline Operator with own Dockerfile
Automating Web Data Acquisition With SAP Data Intelligence
Hope it was useful for someone. 🙂
Nice post Ian. Thanks for sharing.
Nice post Ian!
How do groups in SDH work and can they also be used to influence node placement? Any references to documentation would be appreciated.
Hello Ian Henry,
I did the tutorial on how to use a python operator on SAP Data Hub Pipeline (with the developer edition). During the execution of my graph I get this issu "error while starting subengine: exit Status 127". The Python3Operator process is dead.
I would like to mention that I receive this error regardless of the use of this Operator in any graph.
I hope to have a solution to my Problem.
I would first try creating a new simple python docker, check if that works with the appropriate tags.
Then try creating a new operator with the correct python version and associate that with your docker.