Technology Blogs by Members
Explore a vibrant mix of technical expertise, industry insights, and tech buzz in member blogs covering SAP products, technology, and events. Get in the mix!
cancel
Showing results for 
Search instead for 
Did you mean: 
ansharm71
Participant
While data is a critical asset for modern businesses, the ability of technology to scale has resulted in a flood of big data. Data management and storage has evolved into a necessary component for modern operational processes.

Snowflake, a cloud data warehouse that is lauded for its ability to support multi-cloud infrastructure environments, is one of the most popular data platforms. Snowflake is a data warehouse that runs on top of Amazon Web Services or Microsoft Azure cloud infrastructure, allowing storage and compute to scale independently.

Integrating Snowflake Database with SAP Data Intelligence

Both SAP DI On-Premises and Cloud version provides connectivity with Snowflake Database.


However, till SAP DI 3.2 On-Premises version there is no inbuilt operator available in DI modeler to support the data load into Snowflake Database.

In SAP DI Cloud version there is Generation 2 Cloud Table Producer operator which support data load into Snowflake Database.


There are certain limitations mentioned below with Cloud Table Producer operator:

  1. To load the data into Snowflake, we must use one of the staging locations (Amazon S3 or WASB). You cannot use this operator if your current landscape does not include any of this cloud storage.

  2. We cannot customize this operator as per the requirements, for example if we want to perform UPSERT in snowflake then this operator doesn’t support this mode.

  3. This operator is only compatible with GEN2 operators, so we have to design the pipelines accordingly.

  4. Prevent record loss due to graph failure. Assume that while loading data into Snowflake, the graph/pipeline get fails due to an error and some records are not loaded. How will you deal with the failed records?


To Overcome this type of challenges / Requirement, there is option available in SAP Data Intelligence to build a custom Snowflake operator for loading data.

Building Custom Snowflake Operator in SAP Data Intelligence.

To build the custom operator in SAP DI, we must first fulfil a few steps.

1.Build a docker file in SAP DI to Install a Snowflake python library.

     


Go to Repository tab and click on create Docker File.

Write the commands as mentioned in screenshot, save it and click on build.

2. Create a new custom operator

Go to Operator tab and select Generation 2 operator. Click on Plus sign to create new operator. Select Base Operator as Python3 (Generation 2)


 

3.Go to configuration tab and define Parameters.



You can create parameters by clicking on edit button and define the details.


4.Select an operator icon image and save the operator. You can view this custom operator visible under Generation 2 operator tab.


 

Create a Graph to load the Data from S4H CDS Views into Snowflake Database.

We will create one graph to extract the data from S4 Hana CDS view and load it into Snowflake table using Custom Snowflake Operator.

1.Create Gen2 Graph and Drag “Read Data from SAP” Operator in the graph. Here we are assuming that to connect with S4 Hana CDS view , we have created ABAP RFC Connection type.


Select the Connection, Object Name and Transfer Mode in operator configuration.

2.Drag the Data Transform operator in the graph and connect it with Read Data from SAP Operator, define the output columns mapping in the transformation operator.


Here in Target, I have added one new column LAST_UPDATE_DATE in order to populate the timestamp while loading data.


3.Now drag the custom Snowflake operator that we have created. Create one input port with Data Type as TABLE.


4.Connect the output port of Data Transform operator with the Input port of Custom Snowflake Operator and finally add the graph terminator, terminal operator to the output of snowflake operator. Also, Group the snowflake operator with the Docker file that we have created for snowflake.


Now complete graph will look like this.


5.Open the configuration of Snowflake Gen2 operator and mention the required details in the parameters.


6.Click on Script option in Snowflake Operator and write the python code to transform and load the data from CDS views to Snowflake Table.
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from io import StringIO
import csv
import json
from datetime import datetime
snowflake.connector.paramstyle='qmark'

if api.config.Truncate == "Yes":
conn_sf = snowflake.connector.connect(
account = api.config.Account,
user = api.config.Username,
warehouse = api.config.Warehouse,
password = api.config.Password,
database = api.config.Database,
schema = api.config.Schema)
table=[api.config.Snowflake_Table_Name]
conn_sf.cursor().execute("delete from IDENTIFIER(?)",table)
else:
conn_sf = snowflake.connector.connect(
account = api.config.Account,
user = api.config.Username,
warehouse = api.config.Warehouse,
password = api.config.Password,
database = api.config.Database,
schema = api.config.Schema)



def on_input(msg_id, header, data):

tbl = data.get()
x=header['com.sap.headers.batch']
y=x[1]
if x[1]==False: # Check Last Batch - False, If True then Send signal to terminate the Graph
api.outputs.log.publish(str("Graph Started"))
tbl_info = api.type_context.get_vtype(tbl.type_ref)
col_names = list(tbl_info.columns.keys())
df = pd.DataFrame(tbl.body,columns = col_names,dtype=str)
table=api.config.Snowflake_Table_Name
api.outputs.log.publish(str("Load Started"))

try:

inserted_rows = write_pandas(conn = conn_sf, df = df,table_name = table,compression="snappy",parallel=90, quote_identifiers = False)
api.outputs.log.publish(str(inserted_rows))
api.outputs.log.publish(str("Load Completed"))
api.logger.info(str(inserted_rows))
except Exception as e:

api.outputs.log.publish(str('Error while loading data'))
date = datetime.now(). strftime("%Y_%m_%d-%I:%M:%S")
df.to_csv(r'/vrep/vflow/Error_File_Name.csv_{}'.format(date), index=False)
api.outputs.log.publish(str('Failed data pushed to error file in path /vrep/vflow/'))
api.outputs.log.publish(str(e))
api.logger.info(str(e))
api.propagate_exception(e)


else:
api.outputs.log2.publish(str('no data'))

Snowflake Connector Write Pandas

Here in this custom operator, we have used the standard snowflake write pandas library. The connector provides API methods for writing data to a Snowflake database from a Pandas DataFrame.

One of the main disadvantages of this connector is Target table needs to be existed in Snowflake database, then only it will load the data into the table. This connector doesn’t create the table automatically.

Once Target table is defined, we will use write pandas function to append the data, which performs some SQL trick behind the scenes. It first uploads the data to a temporary storage location, then uses COPY INTO to move the data from that location to the table.

Here, we don’t have to specify external storage location like we have to do in Table Cloud producer operator.

Once you execute the graph, data will be extracted from CDS view and get loaded into the Snowflake table with TIMESTAMP. Below is snapshot of data loaded into target table.


Limitations of this approach

1.As mentioned above, target table needs to be created first in Snowflake before executing the graph. However, there is approach to create the table from DI graph as well using SQLAlchemy package. We will discuss that in new blogs.

2. Require good knowledge of Python script to define the logic and flow

In Conclusion , Custom Snowflake operator is good approach if we do not have external staging location like S3,WASB to store the files and wants to load data into Snowflake table using Internal staging. Also, if we want to implement additional requirements like UPSERT mechanism which is not available in Table cloud producer operator , we can leverage this option and write the custom code inside the operator. It also provide flexibility to write exception handling mechanism according to requirements.

Please share your feedback or thoughts in a comment.

For more information on SAP Data Intelligence, you can refer and follow the below pages:

 

 
3 Comments
Labels in this area