The example below should be straight forward for you to modify for many python use cases. There’s only really a couple of steps, create a docker (if you need additional Python libraries), configure the Python operator, code, plus input and outputs.
Building a docker
There is a great existing blog that describes how to create a simple docker, so I won’t repeat that here. Below you can see my docker definition.
# Use an official Python 3.6 image as a parent image FROM python:3.6.4-slim-stretch # Install python libraries RUN pip install pandas RUN pip install tornado==5.0.2 # Add vflow user and vflow group to prevent error # container has runAsNonRoot and image will run as root RUN groupadd -g 1972 vflow && useradd -g 1972 -u 1972 -m vflow USER 1972:1972 WORKDIR /home/vflow ENV HOME=/home/vflow
I found acquiring the data into Python as a blob to be the easiest, as I had experienced character encoding issues, using the blob data type avoided this issue. The HTTP Client provides a blob output, which we will connect to.
We want the output of the python operator to be a message so that we can stop the pipeline running as before.
Now we have a Python operator with our input and output defined
import pandas as pd from io import BytesIO def on_input(data): # Acquire Data as Bytes dataio = BytesIO(data) # Load data into Pandas Data Frame, skipping 5 rows df = pd.read_table(dataio, sep=',',skiprows=5, encoding='latin1', names=['ER_DATE','EXCHANGE_RATE']) # Replace the "-" characters with Null df['EXCHANGE_RATE'].replace('-', None, inplace=True) df = df.to_csv(index=False,header=False) # Create a DH Message - Data Hub api.Message attr = dict() attr["message.commit.token"] = "stop-token" messageout = api.Message(body=df, attributes=attr) api.send("outmsg", messageout) api.set_port_callback("input", on_input)
The easiest way I found to specify that my Python3Operator should use the pandas docker image, was to use the “Group” feature. We can then tag the group with the same tags as my docker to link them both together. Just right click on the python operator and choose Group. Now we can see the tags.
With that the pipeline is completed, we can save it (with a new name) and run it.
All being well, the pipeline should complete and we will see the same data as before.
Here’s a couple of links you may want to refer to.
Hope it was useful for someone. 🙂