Skip to Content
Technical Articles
Author's profile photo Ian Henry

Using the SAP Data Intelligence Pipeline Python Operator

The example below should be straight forward for you to modify for many python use cases.  There’s only really a couple of steps, create a docker (if you need additional Python libraries), configure the Python operator, code, plus input and outputs.

Building a docker

There is a great existing blog that describes how to create a simple docker, so I won’t repeat that here.  Below you can see my docker definition.

# Use an official Python 3.6 image as a parent image
FROM python:3.6.4-slim-stretch

# Install python libraries
RUN pip install pandas​
RUN pip install tornado==5.0.2

# Add vflow user and vflow group to prevent error 
# container has runAsNonRoot and image will run as root
RUN groupadd -g 1972 vflow && useradd -g 1972 -u 1972 -m vflow
USER 1972:1972
WORKDIR /home/vflow
ENV HOME=/home/vflow

Lets take the pipeline that we previously developed but now we will switch the JavaScript for Python.

Placing the Python3Operator on the canvas, shows no inputs and no outputs, for most pipelines you would want to modify this.  The above JavaScript operator has an input called input(message) and an output called output(message), we would need something similar for Python.

I found acquiring the data into Python as a blob to be the easiest, as I had experienced character encoding issues, using the blob data type avoided this issue.  The HTTP Client provides a blob output, which we will connect to.

We want the output of the python operator to be a message so that we can stop the pipeline running as before.

Now we have a Python operator with our input and output defined

Here’s the Python3 code that I used within the operator, the code is equivalent to the JavaScript example I shared previously

import pandas as pd
from io import BytesIO

def on_input(data):
    # Acquire Data as Bytes
    dataio = BytesIO(data)
    # Load data into Pandas Data Frame, skipping 5 rows
    df = pd.read_table(dataio, sep=',',skiprows=5, encoding='latin1', names=['ER_DATE','EXCHANGE_RATE'])
    # Replace the "-" characters with Null
    df['EXCHANGE_RATE'].replace('-', None, inplace=True)
    df = df.to_csv(index=False,header=False)
    
    # Create a DH Message - Data Hub api.Message
    attr = dict()
    attr["message.commit.token"] = "stop-token"
    messageout = api.Message(body=df, attributes=attr)
    api.send("outmsg", messageout)

api.set_port_callback("input", on_input)

The easiest way I found to specify that my Python3Operator should use the pandas docker image,  was to use the “Group” feature.  We can then tag the group with the same tags as my docker to link them both together.  Just right click on the python operator and choose Group. Now we can see the tags.

With that the pipeline is completed, we can save it (with a new name) and run it.
All being well, the pipeline should complete and we will see the same data as before.

 

Here’s a couple of links you may want to refer to.

Develop a custom Pipeline Operator with own Dockerfile

Automating Web Data Acquisition With SAP Data Intelligence

Hope it was useful for someone. 🙂

Thanks, Ian.

Assigned Tags

      4 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo David Pugh
      David Pugh

      Nice post Ian. Thanks for sharing.

       

      Regards

       

      Dave

       

      Author's profile photo Henning Kropp
      Henning Kropp

      Nice post Ian!

      How do groups in SDH work and can they also be used to influence node placement? Any references to documentation would be appreciated.

      Thanks,
      Henning

      Author's profile photo Tatiana Signe Nguelok
      Tatiana Signe Nguelok

      Hello Ian Henry,

      I did the tutorial on how to use a python operator on SAP Data Hub Pipeline (with the developer edition). During the execution of my graph I get this issu "error while starting subengine: exit Status 127". The Python3Operator process is dead.

      I would like to mention that I receive this error regardless of the use of this Operator in any graph.

      I hope to have a solution to my Problem.

      Thanks

       

      Tatiana

       

      Author's profile photo Ian Henry
      Ian Henry
      Blog Post Author

      I would first try creating a new simple python docker, check if that works with the appropriate tags.

      Then try creating a new operator with the correct python version and associate that with your docker.