Skip to Content
Technical Articles
Author's profile photo ANKIT SHARMA

Loading Data Into Snowflake Database through SAP DI Custom Operator

While data is a critical asset for modern businesses, the ability of technology to scale has resulted in a flood of big data. Data management and storage has evolved into a necessary component for modern operational processes.

Snowflake, a cloud data warehouse that is lauded for its ability to support multi-cloud infrastructure environments, is one of the most popular data platforms. Snowflake is a data warehouse that runs on top of Amazon Web Services or Microsoft Azure cloud infrastructure, allowing storage and compute to scale independently.

Integrating Snowflake Database with SAP Data Intelligence

Both SAP DI On-Premises and Cloud version provides connectivity with Snowflake Database.

However, till SAP DI 3.2 On-Premises version there is no inbuilt operator available in DI modeler to support the data load into Snowflake Database.

In SAP DI Cloud version there is Generation 2 Cloud Table Producer operator which support data load into Snowflake Database.

There are certain limitations mentioned below with Cloud Table Producer operator:

  1. To load the data into Snowflake, we must use one of the staging locations (Amazon S3 or WASB). You cannot use this operator if your current landscape does not include any of this cloud storage.
  2. We cannot customize this operator as per the requirements, for example if we want to perform UPSERT in snowflake then this operator doesn’t support this mode.
  3. This operator is only compatible with GEN2 operators, so we have to design the pipelines accordingly.
  4. Prevent record loss due to graph failure. Assume that while loading data into Snowflake, the graph/pipeline get fails due to an error and some records are not loaded. How will you deal with the failed records?

To Overcome this type of challenges / Requirement, there is option available in SAP Data Intelligence to build a custom Snowflake operator for loading data.

Building Custom Snowflake Operator in SAP Data Intelligence.

To build the custom operator in SAP DI, we must first fulfil a few steps.

1.Build a docker file in SAP DI to Install a Snowflake python library.

     

Go to Repository tab and click on create Docker File.

Write the commands as mentioned in screenshot, save it and click on build.

2. Create a new custom operator

Go to Operator tab and select Generation 2 operator. Click on Plus sign to create new operator. Select Base Operator as Python3 (Generation 2)

 

3.Go to configuration tab and define Parameters.

You can create parameters by clicking on edit button and define the details.

4.Select an operator icon image and save the operator. You can view this custom operator visible under Generation 2 operator tab.

 

Create a Graph to load the Data from S4H CDS Views into Snowflake Database.

We will create one graph to extract the data from S4 Hana CDS view and load it into Snowflake table using Custom Snowflake Operator.

1.Create Gen2 Graph and Drag “Read Data from SAP” Operator in the graph. Here we are assuming that to connect with S4 Hana CDS view , we have created ABAP RFC Connection type.

Select the Connection, Object Name and Transfer Mode in operator configuration.

2.Drag the Data Transform operator in the graph and connect it with Read Data from SAP Operator, define the output columns mapping in the transformation operator.

Here in Target, I have added one new column LAST_UPDATE_DATE in order to populate the timestamp while loading data.

3.Now drag the custom Snowflake operator that we have created. Create one input port with Data Type as TABLE.

4.Connect the output port of Data Transform operator with the Input port of Custom Snowflake Operator and finally add the graph terminator, terminal operator to the output of snowflake operator. Also, Group the snowflake operator with the Docker file that we have created for snowflake.

Now complete graph will look like this.

5.Open the configuration of Snowflake Gen2 operator and mention the required details in the parameters.

6.Click on Script option in Snowflake Operator and write the python code to transform and load the data from CDS views to Snowflake Table.

import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from io import StringIO
import csv
import json
from datetime import datetime
snowflake.connector.paramstyle='qmark'

if api.config.Truncate == "Yes":
conn_sf = snowflake.connector.connect(
    account = api.config.Account,
    user = api.config.Username,
    warehouse = api.config.Warehouse,
    password = api.config.Password,
    database = api.config.Database,
    schema = api.config.Schema)
    table=[api.config.Snowflake_Table_Name]
    conn_sf.cursor().execute("delete from IDENTIFIER(?)",table)
else:
conn_sf = snowflake.connector.connect(
    account =  api.config.Account,
    user = api.config.Username,
    warehouse = api.config.Warehouse,
    password = api.config.Password,
    database = api.config.Database,
    schema = api.config.Schema)
        
    
    
def on_input(msg_id, header, data):

    tbl = data.get()
    x=header['com.sap.headers.batch']
    y=x[1]
    if x[1]==False: # Check Last Batch - False, If True then Send signal to terminate the Graph
        api.outputs.log.publish(str("Graph Started"))
        tbl_info = api.type_context.get_vtype(tbl.type_ref)
        col_names = list(tbl_info.columns.keys())
        df = pd.DataFrame(tbl.body,columns = col_names,dtype=str)
        table=api.config.Snowflake_Table_Name
        api.outputs.log.publish(str("Load Started"))
        
        try:

            inserted_rows = write_pandas(conn = conn_sf, df = df,table_name = table,compression="snappy",parallel=90, quote_identifiers = False)
            api.outputs.log.publish(str(inserted_rows))
            api.outputs.log.publish(str("Load Completed"))
            api.logger.info(str(inserted_rows))
        except Exception as e:
            
            api.outputs.log.publish(str('Error while loading data'))
            date = datetime.now().  strftime("%Y_%m_%d-%I:%M:%S")
            df.to_csv(r'/vrep/vflow/Error_File_Name.csv_{}'.format(date), index=False)
            api.outputs.log.publish(str('Failed data pushed to error file in path /vrep/vflow/'))
            api.outputs.log.publish(str(e))
            api.logger.info(str(e))
            api.propagate_exception(e)

                
    else:
        api.outputs.log2.publish(str('no data'))

Snowflake Connector Write Pandas

Here in this custom operator, we have used the standard snowflake write pandas library. The connector provides API methods for writing data to a Snowflake database from a Pandas DataFrame.

One of the main disadvantages of this connector is Target table needs to be existed in Snowflake database, then only it will load the data into the table. This connector doesn’t create the table automatically.

Once Target table is defined, we will use write pandas function to append the data, which performs some SQL trick behind the scenes. It first uploads the data to a temporary storage location, then uses COPY INTO to move the data from that location to the table.

Here, we don’t have to specify external storage location like we have to do in Table Cloud producer operator.

Once you execute the graph, data will be extracted from CDS view and get loaded into the Snowflake table with TIMESTAMP. Below is snapshot of data loaded into target table.

Limitations of this approach

1.As mentioned above, target table needs to be created first in Snowflake before executing the graph. However, there is approach to create the table from DI graph as well using SQLAlchemy package. We will discuss that in new blogs.

2. Require good knowledge of Python script to define the logic and flow

In Conclusion , Custom Snowflake operator is good approach if we do not have external staging location like S3,WASB to store the files and wants to load data into Snowflake table using Internal staging. Also, if we want to implement additional requirements like UPSERT mechanism which is not available in Table cloud producer operator , we can leverage this option and write the custom code inside the operator. It also provide flexibility to write exception handling mechanism according to requirements.

Please share your feedback or thoughts in a comment.

For more information on SAP Data Intelligence, you can refer and follow the below pages:

 

 

Assigned Tags

      3 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Armaan Singla
      Armaan Singla

      Hi Ankit,

       

      This is very informative blog and thanks for sharing the knowledge.

       

      I have question related to 'Delta' handling using Gen 2 operator while reading the data from S/4 HANA source. What are the best possible options to schedule the graph and pause/resume automatically if we don't want to run the graph 24/7?

      Kindly share your perspective on handling this scenario

       

      Regards,

      Armaan

      Author's profile photo ANKIT SHARMA
      ANKIT SHARMA
      Blog Post Author

      Hi Armaan,

       

      Regarding Delta Handling using Gen2 operator , Below is one of custom solution approach :

      1.In Order to pause/resume graph automatically we have to use SAP Data Intelligence API available in API business hub. Please check below discussion once.

      https://answers.sap.com/questions/13750352/sap-data-intelligence-starting-graphs-by-rest-api.html

      2. We can create one graph with python operator and within that we can execute API's to pause/resume the multiple graphs. Each graph has unique restart  id which can be use in the API url.

      3. This is the sample request script that you have to write for resume/pause graph  requests.post('https://*******.net/app/pipeline-modeler/service/v1/runtime/restartgroups/restartid', verify = False, headers = header).

       

      We can schedule this graph which in turn will execute or pause the actual GEN2 graphs. But when to pause the graph in Delta mode as there is no Last Batch indicator ?. It still not straightforward , again we have to use add_timer like function in python script and need to build the solution.

      Please let me know if you have any other approaches to handle the delta load.

       

      Thanks,

      Ankit

       

       

      Author's profile photo Poshan Reddy
      Poshan Reddy

      Hi Ankit,

      Thanks for the wonderful article. You made very clear on the abilities of Cloud Table Producer Operator.

      Practically we will not be able to use this operator for delta replications because without UPSERTS functionality there is no point in considering this method/operator

      The other options that were suggested above, for large data volumes like hourly 1 of TB data,  handling delta is not a viable option

      Hoping for a best solution from SAP.

       

      Thanks

      Poshan