Skip to Content
Technical Articles
Author's profile photo Andy Yin

Add third party database support in SAP Data Intelligence

People often asked how to load data into target databases which have no built-in support in SAP Data Intelligence. As SAP Data Intelligence is an open and extensible data platform, we can actually write a custom operator or code to support specific database. In this blog, I will take PostgresSQL database as an example to show how to support it as source or target.

1. Create a Dockerfile

Psycopg2 is a library for PostgreSQL database adapter in python. We can use it to access PostgreSQL database. To use it in SAP Data Intelligence, we need a custom Docker image that provides Python with that library. You can refer creating dockerfiles for the detailed steps of creating a Dockerfile in SAP Data Intelligence. The Dockerfile is as below.

FROM python:3.6.4-slim-stretch

RUN apt-get update && apt-get install -y libpq-dev gcc
RUN pip3 install psycopg2==2.8.5
RUN pip3 install tornado==5.0.2
RUN apt-get autoremove -y gcc

# Add vflow user and vflow group to prevent error
# container has runAsNonRoot and image will run as root
RUN groupadd -g 1972 vflow && useradd -g 1972 -u 1972 -m vflow
USER 1972:1972
WORKDIR /home/vflow
ENV HOME=/home/vflow

Add tags as the below figure illustrated.

After successfully build the docker image, we can use it in SAP Data Intelligence Modeler graph.

2. Write custom code to access PostgreSQL database

We can write a custom operator in SAP Data Intelligence with that docker image for ease of use and maximum reusability for others. But for the sake of simplicity for this demo purpose, we will simply write a custom python script to show how it is easy to use PostgresSQL in SAP Data Intelligence.

First, we create a graph in SAP Data Intelligence Modeler like below.

For the Python3 operator, we add an string type input port named “sql” which will accept SQL statement issued from the upstream Terminal operator. A string type output port named “output” is added to the graph to output the result of executing the SQL query. It is connected to a downstream Wiretap operator for showing the result. Another output port named “debug” is connected back to the Terminal operator for showing the debug information. Note that the Python3 operator is grouped to used the tags we mentioned earlier to help the pipeline engine to locate the docker image we built above.

Here is the scrip code we write for the Python3 operator.

import re
import psycopg2

dbname = 'DB_Name'
user = 'User_Name'
host = 'Host_name'
port = 5432 # port number, default 5432
password = 'Password'

delimiter = ','# Delimiter to separate postrgres columns in output
outInbatch = False
outbatchsize = 3

def handle_query(query):
    connection = None
    try:
        # Connect to an existing database
        connect_str = "dbname=" + dbname + " " + \
            "user=" + user + " " + \
            "host=" + host + " " + \
            "password=" + password
        connection = psycopg2.connect(connect_str)
        
        # Open a cursor to perform database operations
        cursor = connection.cursor()
        cursor.execute(query)

        outStr = ""
        rows = []
        if not outInbatch:
            rows = cursor.fetchall()
            for r in rows:
                for i, c in enumerate(r):
                    outStr += str(c) + \
                        ('' if i==len(r)-1 else delimiter)
                outStr += "\n"
            api.send("output", outStr)
        else:
            rows = cursor.fetchmany(outbatchsize)
            while(rows):
                for r in rows:
                    for i, c in enumerate(r):
                        outStr += str(c) + \
                            ('' if i==len(r)-1 else delimiter)
                    outStr += "\n"
                api.send("output", outStr)
                outStr = ""
                rows = cursor.fetchmany(outbatchsize)
    except (Exception, psycopg2.Error) as error:
        api.send("debug", f"Oops! An exception has occured: {error}")
    finally:
        # closing database connection.
        if (connection):
            cursor.close()
            connection.close()
            api.send("debug","PostgreSQL connection is closed")

def handle_modify(query):
    connection = None
    try:
        connect_str = "dbname=" + dbname + " " + \
            "user=" + user + " " + \
            "host=" + host + " " + \
            "password=" + password
        connection = psycopg2.connect(connect_str)
        cursor = connection.cursor()
        cursor.execute(query)
        connection.commit()
    except (Exception, psycopg2.Error) as error:
        api.send("debug", f"Oops! An exception has occured: {error}")
    finally:
        if (connection):
            cursor.close()
            connection.close()
            api.send("debug","PostgreSQL connection is closed")

def on_sql(query):
    if query:
        m = re.match(r'\s*select', query, flags=re.IGNORECASE)
        if m:
            handle_query(query)
        else:
            handle_modify(query)
    else:
        api.send("debug", "Input is empty.")

api.set_port_callback("sql", on_sql)

Change the configuration at the top few lines as per your use case and run the graph. Now you can enter the query like below in the terminal GUI.

 

summary

SAP Data Intelligence supports various languages including ABAP, C++, Node.js, and Python to write custom code. You can add database support like we mentioned above simply by using your preferred language with the corresponding database adapter library for your choice.

Assigned Tags

      1 Comment
      You must be Logged on to comment or reply to a post.
      Author's profile photo Werner Dähn
      Werner Dähn

      I am not sure I would propose to actually use this approach due to multiple potential time bombs.

      You use the comma as column separator, which results in total havoc if you read e.g. an address line column with a comma in it. Escape it!

      1st North Str, Building 5

      The performance will suck. Reading with Python, all payload is converted to strings, strings are concatenated, the downstream operator must do the very expensive CSV parsing to break the strings up into columns again. Since this is a docker image of its own, the data transfer goes down to the network stack, so lots of memcopies at least. Not to mention it does use a single connection only, no parallelism.

      Further more, all information about data types is lost. How will the database return a date? Will the subsequent process deal with it correctly? Best would be the query itself adds all the data type conversion to strings to be certain.

      So much coding just to interface with a standard database! https://www.cdata.com/kb/tech/postgresql-jdbc-talend.rst