Skip to Content
Technical Articles
Author's profile photo Andy Yin

Add third party database support in SAP Data Intelligence

People often asked how to load data into target databases which have no built-in support in SAP Data Intelligence. As SAP Data Intelligence is an open and extensible data platform, we can actually write a custom operator or code to support specific database. In this blog, I will take PostgresSQL database as an example to show how to support it as source or target.

1. Create a Dockerfile

Psycopg2 is a library for PostgreSQL database adapter in python. We can use it to access PostgreSQL database. To use it in SAP Data Intelligence, we need a custom Docker image that provides Python with that library. You can refer creating dockerfiles for the detailed steps of creating a Dockerfile in SAP Data Intelligence. The Dockerfile is as below.

FROM python:3.6.4-slim-stretch

RUN apt-get update && apt-get install -y libpq-dev gcc
RUN pip3 install psycopg2==2.8.5
RUN pip3 install tornado==5.0.2
RUN apt-get autoremove -y gcc

# Add vflow user and vflow group to prevent error
# container has runAsNonRoot and image will run as root
RUN groupadd -g 1972 vflow && useradd -g 1972 -u 1972 -m vflow
USER 1972:1972
WORKDIR /home/vflow
ENV HOME=/home/vflow

Add tags as the below figure illustrated.

After successfully build the docker image, we can use it in SAP Data Intelligence Modeler graph.

2. Write custom code to access PostgreSQL database

We can write a custom operator in SAP Data Intelligence with that docker image for ease of use and maximum reusability for others. But for the sake of simplicity for this demo purpose, we will simply write a custom python script to show how it is easy to use PostgresSQL in SAP Data Intelligence.

First, we create a graph in SAP Data Intelligence Modeler like below.

For the Python3 operator, we add an string type input port named “sql” which will accept SQL statement issued from the upstream Terminal operator. A string type output port named “output” is added to the graph to output the result of executing the SQL query. It is connected to a downstream Wiretap operator for showing the result. Another output port named “debug” is connected back to the Terminal operator for showing the debug information. Note that the Python3 operator is grouped to used the tags we mentioned earlier to help the pipeline engine to locate the docker image we built above.

Here is the scrip code we write for the Python3 operator.

import re
import psycopg2

dbname = 'DB_Name'
user = 'User_Name'
host = 'Host_name'
port = 5432 # port number, default 5432
password = 'Password'

delimiter = ','# Delimiter to separate postrgres columns in output
outInbatch = False
outbatchsize = 3

def handle_query(query):
    connection = None
    try:
        # Connect to an existing database
        connect_str = "dbname=" + dbname + " " + \
            "user=" + user + " " + \
            "host=" + host + " " + \
            "password=" + password
        connection = psycopg2.connect(connect_str)
        
        # Open a cursor to perform database operations
        cursor = connection.cursor()
        cursor.execute(query)

        outStr = ""
        rows = []
        if not outInbatch:
            rows = cursor.fetchall()
            for r in rows:
                for i, c in enumerate(r):
                    outStr += str(c) + \
                        ('' if i==len(r)-1 else delimiter)
                outStr += "\n"
            api.send("output", outStr)
        else:
            rows = cursor.fetchmany(outbatchsize)
            while(rows):
                for r in rows:
                    for i, c in enumerate(r):
                        outStr += str(c) + \
                            ('' if i==len(r)-1 else delimiter)
                    outStr += "\n"
                api.send("output", outStr)
                outStr = ""
                rows = cursor.fetchmany(outbatchsize)
    except (Exception, psycopg2.Error) as error:
        api.send("debug", f"Oops! An exception has occured: {error}")
    finally:
        # closing database connection.
        if (connection):
            cursor.close()
            connection.close()
            api.send("debug","PostgreSQL connection is closed")

def handle_modify(query):
    connection = None
    try:
        connect_str = "dbname=" + dbname + " " + \
            "user=" + user + " " + \
            "host=" + host + " " + \
            "password=" + password
        connection = psycopg2.connect(connect_str)
        cursor = connection.cursor()
        cursor.execute(query)
        connection.commit()
    except (Exception, psycopg2.Error) as error:
        api.send("debug", f"Oops! An exception has occured: {error}")
    finally:
        if (connection):
            cursor.close()
            connection.close()
            api.send("debug","PostgreSQL connection is closed")

def on_sql(query):
    if query:
        m = re.match(r'\s*select', query, flags=re.IGNORECASE)
        if m:
            handle_query(query)
        else:
            handle_modify(query)
    else:
        api.send("debug", "Input is empty.")

api.set_port_callback("sql", on_sql)

Change the configuration at the top few lines as per your use case and run the graph. Now you can enter the query like below in the terminal GUI.

 

summary

SAP Data Intelligence supports various languages including ABAP, C++, Node.js, and Python to write custom code. You can add database support like we mentioned above simply by using your preferred language with the corresponding database adapter library for your choice.

Assigned Tags

      Be the first to leave a comment
      You must be Logged on to comment or reply to a post.