Skip to Content
Technical Articles

Data Intelligence & SQL Like Operations with Pandas – Part 2

Introduction

This blog is a continuation on my previous blog where we discussed some SQL like operations using Pandas in Data Intelligence. Incase you haven’t check this out please refer to the blog here.

In this blog we would look at some more interesting cases like Selecting time ranges , Performing Insert operations , Deletions and Update operations on our dataset in DI. Since this would be a pure technical write up and an enhancement on my previous blog , I would directly jump to these operations

SQL Like Operations with Pandas

Incase you would like to refer to the dataset used in this blog , please check this out in this GitHub repository.

The SAP DI graphs folder in the repository contains JSON files and can be directly uploaded into the SAP DI modeler for execution.

In my previous blog, all the examples and graphs were based on the approach of reading the file using the HDFS library. SAP DI provides an inbuilt operator which the most suitable approach however there can be scenarios when using the inbuilt operator is completely out of scope.

There can be scenarios when the file needs to be read dynamically based on some conditions which are generated within the python operator and in such cases the HDFS library is very helpful.

Mentioned below is a more traditional and suitable way of selecting all the records from the SDL and writing the data to a Pandas Data Frame.

The Code for the Python operator would be as below :

import  pandas as pd
from io import StringIO

def read_file(data):
    
    attr = dict()
    #convert String data input to a CSV Readable format
    data = StringIO(data)
    df = pd.read_csv(data, sep=",")
    api.send("out2",str(df))
        
api.set_port_callback("input", read_file)

 

Please check out the graph JSON here for more details. This graph would be the premise of our whole blog going forward , however its absolutely up to your scenario if you can go ahead with an existing operator or you would need a custom approach to read data via the HDFS lib.

Select Subset with Where conditions based on Time Range

In the previous blog we discussed the Select on where based on a fixed value selection and using Like operations along with Logical Operators like And / Or.

Now lets look at an example of a Select based on a Time Range. If we look at the data in our SFLIGHT table , there is a field called FLDATE which consists of time ranges and contains data for Year 2002 and 1995. Lets say we want all the data records from the SFLIGHT table where the Year is 1995. A Select Statement for such a statement would be like :

Select * from SFLIGHT where FLDATE > 01.01.1995 & FLDATE <  31.12.1995

Now lets do this in Pandas.

Before we perform any operation what’s important is to perform operations on the right datatype. The Data Frame often converts Data with a mix of string and integer to Data Type ‘Object’.

To get the datatype we can use the dtype operation like below :

api.send("out2", str(df['FLDATE'].dtypes))

The above operation would show the output of the data type for the Column FLDATE. DF.info() method is a great way to get the overall overview of the DF but since we are operating with a pipeline the output isn’t shown with this option.

To convert data to a Time Dimension data type we can use the method astype as below :

 df['FLDATE'] = df['FLDATE'].astype('datetime64') 

Below is an output of the data type as read by the Pandas dataframe initially and later after the conversion using the astype method.

Now once the datatype issue is tackled , lets look at how we can filter the whole data based on time range. We would need to import the Timestamp library for the below mentioned code

# generate all values equal to and after 01.01.1995
is_val_before = df['FLDATE'] >=pd.Timestamp(datetime.date(1995,1,1))
# generate all values before and equal to 31.12.1995
is_val_after = df['FLDATE'] <=pd.Timestamp(datetime.date(1995,12,31))
df = df [ is_val_before &  is_val_after  ]

the result of this operation will list all the values in the Year 1995. For more details on the graph please check the JSON file here. Below is the output of the graph.

For more details on the type conversions , please check the Python documentation.

Insert Operations

Before we go ahead , lets take a brief look at our data. The data has a column called IUUC_OPERATIONS which is basically an additional field for handling the record type.

The IUUC_OPERATIONS can have below values :

” – (Blank) : This basically means its a new record loaded by the SLT Init operation

‘D’ – This value indicates that the record is marked for Deletion

‘U’ – This value indicates that the record is marked for Update

‘I’ – This value indicates that the record is marked for an Insert

Now lets take a look once again in the data quality. If we take a look closely at records with CARRID SQ , there are records where Currency is blank. This is somewhat of a data quality issue.

There are two solutions that , I can quickly think of to go ahead. Either remove all the records or update the Currency of this CARRID + CONNID combination based on value from any other record.

For the beginning, lets skip these records and once we have looked at the Insert /Update/Delete operations we can tackle scenario 2 as well.

So lets look ahead to Insert operation. For reference we would need a delta data file which you can find here and will be used throughout the rest of this blog for delta specific operations.

Below is the screenshot of the data and the field IUUC_OPERATION indicates we have 1 record each for each case

Lets now focus on first fixing the quality of data and saving the file back into the SDL by removing all the records in the file where Currency is blank (with a different name so we can try to solve it again with our second scenario).

Since we need to remove all the records where Currency = Blank which also means selecting all the records where Currency ! = Blank. This could be considered another way of deleting the data based on a select statement. The commands which would be used for this scenario are as below and also stated is the screenshot of the output where the extra 3 records with Currency = Blank are removed from the DF

df = pd.read_csv(data, sep=",")
    df.fillna('' , inplace = True)
    df = df[df.CURRENCY != '']

Once the data has been deleted we can store the file with a new name in the SDL. We can achieve this either with HDFS or with the prebuilt write operator from SAP. I would have no recommendations regarding the performance but I would use HDFS lib for the sheer purpose of displaying another flexible way of dealing with the requirements and incase of complex scenarios this could be a fall back / primary approach (based on your personal decision)

The code for fixing the data quality and storing it with a different name is as below

import  pandas as pd
import datetime
from io import StringIO
from hdfs import InsecureClient , HdfsError
client = InsecureClient('http://datalake:50070')

def read_file(data):
    
    attr = dict()
    #convert String data input to a CSV Readable format
    data = StringIO(data)
    df = pd.read_csv(data, sep=",")
    df.fillna('' , inplace = True)
    df = df[df.CURRENCY != '']
    
    sdl_path = '/shared/SLT/SFLIGHT/' 
    fname =  '/tmp/data_refined.csv'
    df.to_csv(fname, sep = ',' , encoding='utf-8', index=False , header = True)
    try:
        client.upload(sdl_path, fname)
        res_out = 'File uploaded to SDL with file name' + fname
        api.send("out2",res_out ) 
    except HdfsError as er :
         api.send("out2", 'Error in uploading file to SDL')    
    
    api.send("out2", str(df))
    
    
        
api.set_port_callback("input", read_file)

The above code would store a new file named data_refined.csv with the updated data frame and we can read now this now in another pipeline to further process the data. Below is a screenshot of the SDL after the execution of the graph :

The JSON for the graph can be checked from here

Now since the data Quality is improved, lets look at the Insert Scenario. We would now load 2 files in our graphs from the DI DataLake. The pipeline would look like below and the config would have 2 files –

File 1 : Data_refined.csv – This is the whole dataset with enhanced quality of data.

File 2 : Delta.csv – This is the delta data from the SDL.

 

The insert operation is the simplest of all and can be performed with the append statement as below

df_data = df_data.append(df_Insert)

The whole code for appending the insert relevant records from the Delta file to the data retrieved from Data_retrieved file and storing the output to a new file Data_DeltaMerge.csv is as below

JSON for the graph can be downloaded from here.

 

import  pandas as pd
import datetime
from io import StringIO
from hdfs import InsecureClient , HdfsError
client = InsecureClient('http://datalake:50070')
df_delta = ''
lock = 'Y'

def read_delta(data):
    global df_delta
    global lock
    data = StringIO(data)
    df_delta = pd.read_csv(data, sep=",")
    lock = 'N'
    
def read_file(data):
    
    attr = dict()
    global df_delta
    global lock
    #convert String data input to a CSV Readable format
    data = StringIO(data)
    df_data = pd.read_csv(data, sep=",")
    while lock != 'N':
        print('Waiting to read the delta data ')
    
    df_data.fillna('' , inplace = True)
    df_delta.fillna('' , inplace = True)
    api.send("out2", str(df_delta))
    
    Rec_insert = df_delta['IUUC_OPERATION'] == 'I'
    api.send("out2", 'Records to be inserted' )
    df_Insert = df_delta[Rec_insert ]
    api.send("out2", str(df_Insert) )
  
    if (len(df_Insert.index) > 0 ) :
        api.send("out2", 'Inserting Data' )
        df_data = df_data.append(df_Insert)
        api.send("out2", 'Insert Completed' )
    
    # Perform Insert Record operation
    
    sdl_path = '/shared/SLT/SFLIGHT/' 
    fname =  '/tmp/data_deltamerge.csv'
    df_data .to_csv(fname, sep = ',' , encoding='utf-8', index=False , header = True)
    try:
        client.upload(sdl_path, fname)
        res_out = 'File uploaded to SDL with file name' + fname
        api.send("out2",res_out ) 
    except HdfsError as er :
         api.send("out2", 'Error in uploading file to SDL')    
    
    api.send("out2", str(df_data))
    
    
        
api.set_port_callback("input", read_file)
api.set_port_callback("in2",  read_delta)

Below is the output of the code.

The file data_deltamerge.csv is created as an output from the graph.

As a side note what I find funny is that the time of kernel is not matching the time of the SDL. The Kernel is running 1 hour behind however the minutes and seconds at least seem to be in sync.

Update and Delete Operations

The objective of a technical solution is to be as optimum as possible. Incase we keep on creating new file for each scenario , we would end up consuming a lot of space. So far in the blog , i have been trying to segregate scenarios to keep it simple. For the Update and Delete operation , lets read the data from the file data_deltamerge.csv and update the data also directly in the data_deltamerge.csv

Before we proceed with the Update and Delete operations lets touch a topic which we haven’t covered soo far.

Incase you guessed it. Yes ! Primary Keys !!

Ofcourse the course concept of DB operations are based on Primary Key and this is certainly important also for us for maintaining the integrity of our data.

The Primary Key for SFLIGHT Table are as below:

CARRID CONNID FLDATE MANDT

To set the keys of a dataframe we use the set_index method like below

df.set_index(list of keys, inplace=True)

So lets check what happens when we execute the below statements

lt_keys = ['CARRID' , 'CONNID' , 'FLDATE' , 'MANDT']
df_data.set_index(lt_keys , inplace = True)

Here is the output of the data

As you can now see the data has been arranged on the basis of the Key value combination in the DF and the output is also shown along with the Keys segregated from the non key / data part.

So our first step is completed. Now lets take a look at the code which will perform the Update and Delete operations and overwrite the data

import  pandas as pd
import datetime
from io import StringIO
from hdfs import InsecureClient , HdfsError
client = InsecureClient('http://datalake:50070')
df_delta = ''
lock = 'Y'

def read_delta(data):
    global df_delta
    global lock
    data = StringIO(data)
    df_delta = pd.read_csv(data, sep=",")
    lock = 'N'
    
def read_file(data):
    
    attr = dict()
    global df_delta
    global lock
    #convert String data input to a CSV Readable format
    data = StringIO(data)
    df_data = pd.read_csv(data, sep=",")
    while lock != 'N':
        print('Waiting to read the delta data ')
    
    df_data.fillna('' , inplace = True)
    lt_keys = ['CARRID' , 'CONNID' , 'FLDATE' , 'MANDT']
    df_delta.fillna('' , inplace = True)
    
    api.send("out2",'Before Update')
    api.send("out2", str(df_data))
   
    Rec_update = df_delta['IUUC_OPERATION'] == 'U'
    Rec_delete = df_delta['IUUC_OPERATION'] == 'D'
    
    # Create  Dataframe for deletion and update
    df_comb = df_delta[ Rec_delete | Rec_update]
    
    #Update the Data type for date (which is also a Key) to the same data type
    df_comb['FLDATE'] = df_comb['FLDATE'].astype('datetime64') 
    df_data['FLDATE'] = df_data['FLDATE'].astype('datetime64') 
    
    api.send("out2",'After Update')
    
    if (len(df_comb.index) > 0 ) : 
        df_data = pd.concat([df_data,df_comb]).drop_duplicates(lt_keys,keep='last') 
        df_data.set_index(lt_keys, inplace=True)               
        #Delete all delete entries
        is_delete_index = df_data[df_data['IUUC_OPERATION'] == 'D'].index
        df_data.drop(is_delete_index,inplace=True)  

    sdl_path = '/shared/SLT/SFLIGHT/'
    sdl_fname = sdl_path + '/data_deltamerge.csv'
    fname =  '/tmp/data_deltamerge.csv'
    df_data .to_csv(fname, sep = ',' , encoding='utf-8', index=False , header = True)
    client.delete(sdl_fname)
    try:
        client.upload(sdl_path, fname)
        res_out = 'File uploaded to SDL with file name' + fname
        api.send("out2",res_out ) 
    except HdfsError as er :
         api.send("out2", 'Error in uploading file to SDL')    
    
    api.send("out2", str(df_data))
    
api.set_port_callback("input", read_file)
api.set_port_callback("in2",  read_delta)

Our files have the below situation in the DataLake before the execution of our Graph

In the file Delta.CSV, we have 2 records for Update and Delete. When these are merged into the records from data_deltamerge,csv they would Update 1 record and Delete 1 record from the Data frame which means we would receive 23 Records as a result of this operation.

Below are the records which are marked for Update and Delete in the delta.csv file

Incase you would have noticed , the data which we inserted from the Delta.csv in our Insert scenario already existed and hence we have 2 entries created for the same set of Key. In our insert scenario we did not use the primary key concept so we have this anonymous entry. So when we fix this , the latest entry for this key combination with the Insert value will be retained and as a result we would have 22 records  finally saved in our file.

Lets try to break this code down.

 

This is the part where we create a new dataframe to combine the data records from the delta file for delete and update scenario. Since we are dealing with operations relevant to primary keys and also FLDATE is one of the key , we need to ensure the datatype is the same. Hence the astype function is used.

This part of the code deals with concatenating the 2 Dataframe’s (One with the whole data set and the other with delta operations).

Once added we drop the adjacent duplicate records based on the Keys and keep the last record. This way we can retain the latest record and delete all the records which are prior to it. This is one way for the update.

To delete the data now we simply remove the record which has ‘D’ marked as the operation Flag. The rest of the code is mainly dealing with uploading the file to SDL.

Below is the output of the code with a meaningful break down of each step and number of records after the various steps in the DF.

 

 

I hope this could help . The JSON for the graph can be fownloaded from here

Incase you would have noticed , we also covered the Union and Delete adjacent duplicate part in the Update and Delete section. How cool is that :D!

JOIN

Now lets get back to our situation with Scenario 2 of Data cleansing solution :  Update the Currency of the CARRID + CONNID combination based on value from any other record where currency is blank.

I promise i would complete this section before the end of the week and update this blog soon.

Incase I had your attention until now then I am sure you had fun going through this blog. I tried to make it as simple and basic as I could. Incase you got questions , please feel free to reach out to me via comments here or via LinkedIn.

Hope this blog could be of some use to someone, somewhere, someday 🙂 ! Have a nice time ahead.

Cheers,

Pranav

3 Comments
You must be Logged on to comment or reply to a post.