Skip to Content
Technical Articles
Author's profile photo Richard Thomas

SAP Data Intelligence – How to Create a Slowly Changing Dimension using Python/Pandas

Hi everyone, in my last blog post I went through the various methods/approaches of how you could create an SCD type 2 in SAP Data Warehouse Cloud.

In this blog post I would like to continue with the same theme but this time we will look at how this can be achieved with SAP Data Intelligence.

The graph starts by adding two “Read File” operators, one to read the incoming source file and another to Read the existing Dimension file.  The structures of the files should be identical but the Dimension file will also have the additional fields “DimKey”,”ValidFrom”,”ValidTo” and “IsCurrent”.

Once Read File operator has been configured you will need to add a “From File” operator to both outputs to convert from file to message.

I have decided to develop the SCD type 2 using the Python3 operator and the main library that will be utilised is Pandas.

Add the Python3 operator to the graph and add two input ports (sourceInput & dimInput) and an output port (dimOutput) all with a data type set to Basic Message.

Connect the files to the input ports of the Python3 operator

The code below will now do the remaining work, I have commented at each step so that it is clear what is being done at each stage.

 

#Import libraries
from io import StringIO
import pandas as pd
import numpy as np

def on_input(sourceInput, dimInput):
    # Read CSV's using StringIO
    sourceData = StringIO(str(sourceInput.body,'utf-8'))
    dimData = StringIO(str(dimInput.body,'utf-8'))
    
    # Read CSV's into dataframe
    df_sourceData = pd.read_csv(sourceData)
    df_dimData = pd.read_csv(dimData)
    
    #Get Max DimKey
    maxDimKey = df_dimData['DimKey'].max()
    
    #Filter only Currnet records from Dimension
    df_Dim_Is_Current = df_dimData[(df_dimData["IsCurrent"]==1)]
    
    #Left Join dataframes on keyfields 
    df_merge_col = pd.merge(df_sourceData, df_dimData, on='circuitId', how='left')
    
    #Fix Datatypes
    df_merge_col['DimKey'] = df_merge_col['DimKey'].astype(pd.Int64Dtype())
    df_merge_col['ValidFrom'] = df_merge_col['ValidFrom'].astype(pd.Int64Dtype())
    df_merge_col['ValidTo'] = df_merge_col['ValidTo'].astype(pd.Int64Dtype())
    df_merge_col['IsCurrent'] = df_merge_col['IsCurrent'].astype(pd.Int16Dtype())
    
    #Identify new records By checking if DimKey IsNull
    new_records_filter = pd.isnull(df_merge_col["DimKey"])
    
    #Create dataframe for new records
    df_new_records = df_merge_col[new_records_filter]
    
    #Join datafrome and exclude duplicates (remove new records)
    df_excluding_new = pd.concat([df_merge_col, df_new_records]).drop_duplicates(keep=False)
    
    ##Identify SCD Type 2 records By comparing SCD2 fields in source and dimension
    df_scd2_records = df_excluding_new[
    (df_excluding_new["name_x"]!=df_excluding_new["name_y"]) |
    (df_excluding_new["circuitRef_x"]!=df_excluding_new["circuitRef_y"])|
    (df_excluding_new["location_x"]!=df_excluding_new["location_y"])|
    (df_excluding_new["country_x"]!=df_excluding_new["country_y"])]
    
    #Join datafrome and exclude duplicates (remove scd2 records)
    df_excluding_new_scd2 = pd.concat([df_excluding_new, df_scd2_records]).drop_duplicates(keep=False)
    
    #Identify SCD Type 1 Records By comparing SCD1 fields in source and dimension
    df_scd1_records = df_excluding_new[(df_excluding_new["url_x"]!=df_excluding_new["url_y"])]
    
    #Join datafrome and exclude duplicates (remove scd1 records - no change records remaining)
    df_no_change_records = pd.concat([df_excluding_new_scd2, df_scd1_records]).drop_duplicates(keep=False)
    
    #Rename required No Change Fields
    df_no_change_rename = df_no_change_records.rename(columns={
    "circuitRef_x":"circuitRef","name_x":"name" ,"location_x":"location","country_x":"country","url_x":"url"
    })
    
    #Select required No Change Fields fields
    df_no_change_final = df_no_change_rename[[
    'DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent'
    ]]
    
    #Rename required SCD1 Fields
    df_scd1_rename = df_scd1_records.rename(columns={
    "circuitRef_x":"circuitRef","name_x":"name","location_x":"location","country_x":"country","url_x":"url"
    })
    
    #Select required SCD1 Fields 
    df_scd1_final = df_scd1_rename[[
    'DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent'
    ]]
    
    #Rename required SCD2 New Fields
    df_scd2New_rename = df_scd2_records.rename(columns={
    "circuitRef_x":"circuitRef","name_x":"name","location_x":"location","country_x":"country","url_x":"url"
    })
    
    #Update SCD2 New ValidFrom
    df_scd2New_rename["ValidFrom"] = pd.to_datetime('today').strftime("%Y%m%d")
    df_scd2New_rename["IsCurrrent"] = '1'
    
    #Select required SCD2 New Fields
    df_scd2new_final = df_scd2New_rename[[
    'DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent'
    ]]
    
    #Rename required SCD2 Old Fields
    df_scd2Old_rename = df_scd2_records.rename(columns={
    "circuitRef_y":"circuitRef","name_y":"name","location_y":"location","country_y":"country","url_y":"url"
    })
    
    #Update SCD2 Old ValidTo and IsCurrent
    TodaysDate = pd.Timestamp('today') 
    df_scd2Old_rename["ValidTo"] = pd.to_datetime('today').strftime("%Y%m%d")
    df_scd2Old_rename["IsCurrent"] = 0
    
    #Select required SCD2 Old Fields
    df_scd2old_final = df_scd2Old_rename[['DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent']]
    
    #Rename required New record Fields
    df_New_rename = df_new_records.rename(columns={
    "circuitRef_x":"circuitRef","name_x":"name","location_x":"location","country_x":"country","url_x":"url"
    })
    
    #Update New records ValidFrom
    df_New_rename["ValidFrom"] = 19000101
    df_New_rename["ValidTo"] = 99991231
    df_New_rename["IsCurrent"] = 1
    
    
    #Select required New record Fields
    df_new_final = df_New_rename[['DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent']]
    
    #Union scd2 new and new records set DimKey
    df_new_new_scd2 = df_allframes = [df_scd2new_final,df_new_final]
    df_new_new_scd2_concat = pd.concat(df_new_new_scd2)
    df_new_new_scd2_concat['DimKey'] = np.arange(len(df_new_new_scd2_concat))+1+maxDimKey
    
    #Union All Dataframes
    df_allframes = [df_scd2old_final,df_scd1_final,df_no_change_final,df_new_new_scd2_concat]
    df_allframes_concat = pd.concat(df_allframes)
    
    #Sort the data
    df_all_sort = df_allframes_concat.sort_values(["circuitId", "ValidFrom","ValidTo"], ascending = (True,True,True))
    
    #Write to CSV
    csv = df_all_sort.to_csv(index=False)
    
    #Send csv to port dimOutput
    api.send("dimOutput", csv)
    
api.set_port_callback(["sourceInput","dimInput"], on_input)

 

Once the script has been completed you can then write new Dimension file, you will need to add a “To File” operator prior to the “Write File” operator to turn the message to a file format.

There you have it, an SCD Type 2 in SAP DI using a Python operator.

Please like this post if you found this to be useful and please get in contact if you have any questions.

 

 

 

 

Assigned Tags

      3 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Mike Hancock
      Mike Hancock

      Great work, really useful ... keep them coming

      Author's profile photo Werner Dähn
      Werner Dähn

      Isn't there a better way in Data Intelligence than coding it? Even in SAP's own Data Services product it takes 30 seconds to build such and does not require any coding.

      SCD2.JPG

      https://blogs.sap.com/2013/07/05/scd-type-implementation-in-bods/

       

      Author's profile photo Richard Thomas
      Richard Thomas
      Blog Post Author

      I come from a BODS background so fully aware of the simplicity that the task can be completed with BODS.

      At the moment both SAP Data Warehouse Cloud and SAP Data Intelligence don't have a defined operator/transform that will allow you to do this.

      Data Intelligence allows you to build your own custom operators, this is v1, my plan for v2 is to  tweak the code so that the same task can be achieved for any table structure with the use of few parameters (like the BODS Table Comparison).