Technology Blogs by Members
Explore a vibrant mix of technical expertise, industry insights, and tech buzz in member blogs covering SAP products, technology, and events. Get in the mix!
cancel
Showing results for 
Search instead for 
Did you mean: 
RichT
Explorer
Hi everyone, in my last blog post I went through the various methods/approaches of how you could create an SCD type 2 in SAP Data Warehouse Cloud.

In this blog post I would like to continue with the same theme but this time we will look at how this can be achieved with SAP Data Intelligence.

The graph starts by adding two "Read File" operators, one to read the incoming source file and another to Read the existing Dimension file.  The structures of the files should be identical but the Dimension file will also have the additional fields "DimKey","ValidFrom","ValidTo" and "IsCurrent".


Once Read File operator has been configured you will need to add a "From File" operator to both outputs to convert from file to message.


I have decided to develop the SCD type 2 using the Python3 operator and the main library that will be utilised is Pandas.


Add the Python3 operator to the graph and add two input ports (sourceInput & dimInput) and an output port (dimOutput) all with a data type set to Basic Message.

Connect the files to the input ports of the Python3 operator


The code below will now do the remaining work, I have commented at each step so that it is clear what is being done at each stage.


 
#Import libraries
from io import StringIO
import pandas as pd
import numpy as np

def on_input(sourceInput, dimInput):
# Read CSV's using StringIO
sourceData = StringIO(str(sourceInput.body,'utf-8'))
dimData = StringIO(str(dimInput.body,'utf-8'))

# Read CSV's into dataframe
df_sourceData = pd.read_csv(sourceData)
df_dimData = pd.read_csv(dimData)

#Get Max DimKey
maxDimKey = df_dimData['DimKey'].max()

#Filter only Currnet records from Dimension
df_Dim_Is_Current = df_dimData[(df_dimData["IsCurrent"]==1)]

#Left Join dataframes on keyfields
df_merge_col = pd.merge(df_sourceData, df_dimData, on='circuitId', how='left')

#Fix Datatypes
df_merge_col['DimKey'] = df_merge_col['DimKey'].astype(pd.Int64Dtype())
df_merge_col['ValidFrom'] = df_merge_col['ValidFrom'].astype(pd.Int64Dtype())
df_merge_col['ValidTo'] = df_merge_col['ValidTo'].astype(pd.Int64Dtype())
df_merge_col['IsCurrent'] = df_merge_col['IsCurrent'].astype(pd.Int16Dtype())

#Identify new records By checking if DimKey IsNull
new_records_filter = pd.isnull(df_merge_col["DimKey"])

#Create dataframe for new records
df_new_records = df_merge_col[new_records_filter]

#Join datafrome and exclude duplicates (remove new records)
df_excluding_new = pd.concat([df_merge_col, df_new_records]).drop_duplicates(keep=False)

##Identify SCD Type 2 records By comparing SCD2 fields in source and dimension
df_scd2_records = df_excluding_new[
(df_excluding_new["name_x"]!=df_excluding_new["name_y"]) |
(df_excluding_new["circuitRef_x"]!=df_excluding_new["circuitRef_y"])|
(df_excluding_new["location_x"]!=df_excluding_new["location_y"])|
(df_excluding_new["country_x"]!=df_excluding_new["country_y"])]

#Join datafrome and exclude duplicates (remove scd2 records)
df_excluding_new_scd2 = pd.concat([df_excluding_new, df_scd2_records]).drop_duplicates(keep=False)

#Identify SCD Type 1 Records By comparing SCD1 fields in source and dimension
df_scd1_records = df_excluding_new[(df_excluding_new["url_x"]!=df_excluding_new["url_y"])]

#Join datafrome and exclude duplicates (remove scd1 records - no change records remaining)
df_no_change_records = pd.concat([df_excluding_new_scd2, df_scd1_records]).drop_duplicates(keep=False)

#Rename required No Change Fields
df_no_change_rename = df_no_change_records.rename(columns={
"circuitRef_x":"circuitRef","name_x":"name" ,"location_x":"location","country_x":"country","url_x":"url"
})

#Select required No Change Fields fields
df_no_change_final = df_no_change_rename[[
'DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent'
]]

#Rename required SCD1 Fields
df_scd1_rename = df_scd1_records.rename(columns={
"circuitRef_x":"circuitRef","name_x":"name","location_x":"location","country_x":"country","url_x":"url"
})

#Select required SCD1 Fields
df_scd1_final = df_scd1_rename[[
'DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent'
]]

#Rename required SCD2 New Fields
df_scd2New_rename = df_scd2_records.rename(columns={
"circuitRef_x":"circuitRef","name_x":"name","location_x":"location","country_x":"country","url_x":"url"
})

#Update SCD2 New ValidFrom
df_scd2New_rename["ValidFrom"] = pd.to_datetime('today').strftime("%Y%m%d")
df_scd2New_rename["IsCurrrent"] = '1'

#Select required SCD2 New Fields
df_scd2new_final = df_scd2New_rename[[
'DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent'
]]

#Rename required SCD2 Old Fields
df_scd2Old_rename = df_scd2_records.rename(columns={
"circuitRef_y":"circuitRef","name_y":"name","location_y":"location","country_y":"country","url_y":"url"
})

#Update SCD2 Old ValidTo and IsCurrent
TodaysDate = pd.Timestamp('today')
df_scd2Old_rename["ValidTo"] = pd.to_datetime('today').strftime("%Y%m%d")
df_scd2Old_rename["IsCurrent"] = 0

#Select required SCD2 Old Fields
df_scd2old_final = df_scd2Old_rename[['DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent']]

#Rename required New record Fields
df_New_rename = df_new_records.rename(columns={
"circuitRef_x":"circuitRef","name_x":"name","location_x":"location","country_x":"country","url_x":"url"
})

#Update New records ValidFrom
df_New_rename["ValidFrom"] = 19000101
df_New_rename["ValidTo"] = 99991231
df_New_rename["IsCurrent"] = 1


#Select required New record Fields
df_new_final = df_New_rename[['DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent']]

#Union scd2 new and new records set DimKey
df_new_new_scd2 = df_allframes = [df_scd2new_final,df_new_final]
df_new_new_scd2_concat = pd.concat(df_new_new_scd2)
df_new_new_scd2_concat['DimKey'] = np.arange(len(df_new_new_scd2_concat))+1+maxDimKey

#Union All Dataframes
df_allframes = [df_scd2old_final,df_scd1_final,df_no_change_final,df_new_new_scd2_concat]
df_allframes_concat = pd.concat(df_allframes)

#Sort the data
df_all_sort = df_allframes_concat.sort_values(["circuitId", "ValidFrom","ValidTo"], ascending = (True,True,True))

#Write to CSV
csv = df_all_sort.to_csv(index=False)

#Send csv to port dimOutput
api.send("dimOutput", csv)

api.set_port_callback(["sourceInput","dimInput"], on_input)

 

Once the script has been completed you can then write new Dimension file, you will need to add a "To File" operator prior to the "Write File" operator to turn the message to a file format.



There you have it, an SCD Type 2 in SAP DI using a Python operator.

Please like this post if you found this to be useful and please get in contact if you have any questions.

 

 

 

 
3 Comments
Labels in this area