Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
Domnic
Advisor
Advisor

Snowflake as a data warehouse, uses, Amazon Web Services, Google Cloud Platform or Microsoft Azure cloud infrastructure, allowing large data to be stored and compute while offering scalability. Enterprise supply chain planning often require data consolidation from multiple data sources. A data lake becomes handy in such situations. Snowflake is considered as a data lake by many SAP Customers. With SAP Integrated Business Planning as a de-facto industry standard tool for supply chain planning, customers request for integration feasibility between SAP IBP and Snowflake.

This blog exclusively describes how we can read forecast data from SAP IBP and write them to a data base table in Snowflake using SAP Cloud Integration as a middleware. To support large data volumes, we use SAP Remote Functions to extract data from SAP IBP into SAP Cloud Integration for ETL purposes. SAP offers an open connectors platform which hosts a marketplace for 3rd party vendors to resell their integration artifacts. Here we use a 3rd party developed, JDBC based open connector, between SAP Cloud Integration and Snowflake. This blog is split in 2 parts.

  1. Reading data from Snowflake and writing to SAP IBP
  2. Writing forecast data from SAP IBP into Snowflake Database.

Prerequisites

Let’s start with the following prerequisites in place:

  1. Have a forecast run on SAP IBP completed. Identify a keyfigure which stored the computed forecast. In my example I considered CONSDEMANDQTY as a sample keyfigure with its own set of root attributes.
  2. Make sure you have access to the Open connectors platform. This can be access from the welcome page of the SAP Cloud Integration when you click on the pad – Discover Connectors.
  3. Create a database, schema, role and assign this to a user on Snowflake if this is applicable for your scenario. In case you want to just try out – you can use the default account admin use on your Snowflake trial account.
  4. Writing large volumes of data to Snowflake is an Asynchronous process. First, we need a staging area for loading data. This data from the staging area is then copied into a table. There are two types of staging integrations supported in Snowflake – Internal and External. We use an external staging integration in this use case.
  5. To support external staging, we used the Object Store service which is available on SAP BTP to create an AWS S3 bucket. The credentials for this bucket are available when you create a service key on your sub account in BTP. Follow the above link to create your store for an AWS Bucket. You would not need to bind this service key to any application. Just the key details are needed to create a AWS S3 Open Connector instance. 
  6. SAP IBP with its 2402 release has come up with a set of helper iFlows in SAP Cloud Integration. These iFlows are for communicating with SAP IBP using web-socket RFC based interfaces – make sure you have these iFlows imported into your Cloud Integration instance.

Reading data from SAP IBP and writing to a Snowflake database

As a prerequisite, we need a Snowflake database user who has the right role and privilege to write data to  a table. This user can be also configured with RSA key pair or basic authentication can be used. Details on how to do this is well documented in this security guide from Snowflake.  

Staging integration in Snowflake

As mentioned before, we need an external or an internal Staging integration setup in Snowflake. You can use the Snowflake internal staging mechanism for loading your data. But, in this blog we tried to set up Amazon S3 as an external staging integration. Amazon S3 is optional. You can also use Google Cloud Platform or a Microsoft Azure as external staging integration. You can try the following steps to create an external stage using Amazon S3 bucket which we created as an Object store in one of our previous steps. 

  • 1. Sign into your Snowflake account. Use the navigation menu on the left to select Data -> Databases and select your database and schema. 
  • Use the Create -> Stage navigation on the topy right and select External Stage -> Amazon S3
  • Enter a Stage Name and URL of your Amazon S3 bucket. Example: s3://hcp-9XXXX-XXXX-XXXX-XXXX-XXXXXXXXX.
  • In the Authentication section, select Credentials and then enter the AWS Key ID and the AWS Secret Key. All of these details can be obtained from the Service key - Credentials JSON file which you can download from the BTP sub account once you have subscribed to the Object store service or you can bring your own credentials. 
  • Press Save. If the credentials are right, you would now be able to see the file structure of your S3 Bucket in your Snowflake external staging area which you just saved. If it is a fresh configuration the folder would be empty.

Snowflake connector instance

Once you are on the landing page of the SAP Cloud Integration section, navigate to Capabilities and then Extend Non-SAP Connectivity pad and you can see the Discover Connectors link in that pad. Click on this to log in at SAP Integration Suite - Open Connectors platform. 

SF Instance config.png

On the left menu section of your Open connectors landing page, you would see Instances. Click on it and you would see all the instances you have created.

  • On the top right corner, you can click on the Create Instance option to create a new instance. Search for the Snowflake connector instance and fill in the above details. When you are done, you can press the blue button – Create Instance.
  • This would use the credentials you had provided on the hostname and create a new JDBC client instance. It would also generate a set of APIs which you can use as REST end points from your iFlow or test using Postman.  
  • Once the instance is created, you can see the API Docs link in a new page. Open this link to see a list of table names which are in the database you had provided to create the instance. These tables are wrapped with HTTP methods in the API docs.
  • Use any GET call to read a table. To do this, press the “Try it” button on the top right. Then when you press the “Execute” button which is visible when you scroll down, this API call would generate a GET request and a sample cURL request can be seen in the screen.
  • In that request you would also see the User, Element and the Organization header parameters which you would need to create a new user credentials in the security material app in SAP Cloud Integration.

Amazon S3 connector instance

Using the object store instance in your BTP Sub account, you can create a service key for AWS S3. The credentials JSON file from the service key download is used for creating an instance of the Open Connector for AWS S3. Create an instance with this following details.

AWS S3 Instance config.png

After filling up the details, you can use the button "Create Instance" to generate the API Docs for your instance. You can try out an API call to get the User, Element and Organization details like the previous step. User, Element and Organization parameter values are used to create a new Security Material. For this,

  1. Log into the SAP Cloud Integration instance.
  2. Open the Security Materials pad from Monitor-> Integrations and APIS screen and click on Create -> User Credentials.
  3. Enter the name, description, select the type as OpenConnectors.
  4. The User, Organization and Element details come from the previous step when you created the Open Connector instance using the Snowflake Adapter.
  5. Deploy this user credentials.
  6. Repeat steps 2 to 5 for the Amazon S3 connector instance.

You have now created two Open Connector instances and configured this instance on SAP Cloud Integration to be consumed in an iFlow. We will need this user Credentials name and the URL of the Open Connector instance, for later usage.

Communication interfaces in SAP IBP.

The 2402 IBP release comes with a set of RFC interfaces which can be used with iFlows in SAP Cloud Integration for extracting key figure and master data. These are packed under the communication scenario SAP_COM_0931. Create a new arrangement, a system, and a user for it. These function modules are intended to be used by the SAP Cloud Integration only.

Destination on BTP

You can create a BTP destination with the communication credentials for the IBP instance in your BTP sub account where the SAP Cloud Integration runtime process is instantiated.

domnicsavio_benedict_1-1708528065442.png

Fig.1 Screen shot of BTP Destination service configuration.

Here is a screen shot of the destination configuration. In the above example, the destination name is called IBP_G5R_100. This name is then later used in the iFlows to make WS-RFC calls to SAP IBP instance which is configured in the jco.client.wshost property.

Once the above sections were done, then we take a closer look into the reading from IBP and writing to Snowflake. The diagram below would give you a high-level overview of the entire process.

READ_from_IBP.png

Fig.2 High level overview of components.

In the above picture, we want to read the CONSDEMANDQTY which was calculated during a forecast run in SAP IBP. This is read using a set of helper iFlows from Cloud Integration. This data is then stored in an external staging area in AWS S3. Once the file is stored in the S3 bucket, we copy the data from S3 to a table - IBPFORECAST in a Snowflake database using a SQL command via the Open Connector.

Reading from SAP IBP

Data is read from IBP in three steps. It is an asynchronous process. The helper iFlows are used for all of these steps. First, a select query is initialized. The helper iFlow is invoked via a process direct – SAP_IBP_READ_-_Initialize. The following payload is sent as a request.

 

<IBPReads>
<IBPRead Key="${header.IBPQueryKey1}"
              Destination="${header.IBPDestination}"
              PackageSizeInRows="${header.IBPPackageSizeInRows}"
              Select="${header.IBPQuerySelect}"
              OrderBy="${header.IBPQueryOrderBy}"
              Filter="${header.IBPQueryFilterString}"
              TypeOfData="${header.IBPQueryTypeOfData}"
              TimeAggregationLevel="2"/> 
</IBPReads>

 

In this example, the IBPQueryKey1 is the CONSDEMANDQTY – the key figure which we wanted to read, IBPDestination is the destination service definition on the BTP sub account for the IBP backend. IBPPackageSizeInRows defines the number of rows you want to read from this call. IBPQuerySelect is the actual select parameters. For example: - PRDID, CUSTID, LOCID, CONSDEMANDQTY, PERIOD_LEVEL_3, UOMID. In this example, all names are root attributes for the key figure CONSDEMANDQTY key figure defined in the planning area in IBP. IBPQueryTypeOfData is KeyFigures – type of data we want to read (Key figures vs Master data). You can also use the IBPQueryFilterString and the IBPQueryOrderBy to define the select filter and order. For example:- CONSENSUSDEMANDQTY gt 0 AND UOMTOID eq ‘EQ’ can be the query select string.

In the second step, we check the status of the attribute Status inside the IBPQuery element which is returned from the helper iFlow. This status could be either a PRE-INIT or FETCH_DATA. If the helper iFlow returned with the PRE-INIT, then there was an exception thrown during the query initialization stage. If the status was FETCH_DATA, then the initialization was successful and now we can fetch the data. The helper iFlow waits in a loop till it gets the status of the query init call. The response if success also has a UUID which we can use to fetch the data. The following payload is used in making the fetch data call to the helper iFlows. It is called using the Process Direct SAP_IBP_Read_-_Fetch_Data.

 

<IBPRead>
			<xsl:attribute name="Key">
            			<xsl:value-of select="$IBPQueryKey1"/>
            		</xsl:attribute>
			<xsl:attribute name="Offset">
            			<xsl:value-of select="$IBPQueryOffset1"/>
            		</xsl:attribute>
			<xsl:attribute name="PackageSizeInRows">
            			<xsl:value-of select="$IBPPackageSizeInRows"/>
            		</xsl:attribute>
			<xsl:attribute name="ParallelThread">
            			<xsl:value-of select="$IBPQueryParallelThread"/>
            		</xsl:attribute> 
        </IBPRead>

 

You can use this call to read data in batches. Using the package size in rows you can read multiple batches. In this example, the IBPQueryOffset1 is the offset number which you can use to skip the number of rows you have already read. The result of this call would return the key figure data using the helper iFlows.

Storing data in Staging

The XML response from SAP IBP WS-RFC calls contain forecast data for the key figure. It is then converted into JSON using the XML to JSON converter. A groovy script is used to map this data to the column names in target table in Snowflake. A JSONBuilder is used to collect all the rows from the JSON array. You can add your own custom logic or extend your existing processes to adapt to these activities in your own iFlow like I do it here. In addition, there are a few attributes needed for the subsequent calls – for example, S3 file name, the name you want to store and the destination name for Snowflake and Amazon S3. We are using a separate process directs for these calls. It is ideal to store these values as JSON key value pairs and pass them as a payload. Forecast data from IBP is stored in S3 bucket as a JSON file. This file is generated automatically during runtime but never stored in the SAP Cloud Integration. To do this, the following groovy function is used,

 

// Data payload from the previous step
JSONArray valueJSONArray = input.get("AWSPayload");
def prettyBody = valueJSONArray.toString() as String;
def bytes = prettyBody.getBytes(); 
// New Multipart MINE envelope
MimeBodyPart bodyPart = new MimeBodyPart()

// Data source envelope with input as bytes    
ByteArrayDataSource dataSource = new ByteArrayDataSource(bytes, 'application/json')
// Data handler envelop with the data source
DataHandler byteDataHandler = new DataHandler(dataSource)
// Miltipart MIME body with the data 
bodyPart.setDataHandler(byteDataHandler)
// File name for the data       
String fileName = input.AWSFileName        
bodyPart.setFileName(fileName)
// File type as JSON
String fileType = input.AWSFileType
// Second Body part definition
bodyPart.setDisposition('form-data; name="file"')
bodyPart.setHeader("Content-Type",fileType)
        
MimeMultipart multipart = new MimeMultipart()
// Add body parts to the Multipart      
multipart.addBodyPart(bodyPart)
// Convert the body parts to byte array output stream       
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
multipart.writeTo(outputStream)
message.setBody(outputStream)

// Boundries for the body part
String boundary = (new ContentType(multipart.contentType)).getParameter('boundary')
message.setHeader('Content-Type', "multipart/form-data; boundary=${boundary}")
// Return the message with header set - body length and body type
message = setHeader(message); 
return message;

 

Now the data is POST-ed to the Open connectors directly using a HTTP adapter. We use the setHeaders method to create Authentication parameters using the User, organization and element in the following way:-

 

def Message setHeader(Message message) {
  def properties = message.getProperties();
  
  String user    = properties.get("OCUser") as String;
  String org     = properties.get("OCOrg") as String;
  String element = properties.get("OCElement") as String;

  message.setHeader("Authorization", "User " + user + ", Organization " + org + ", Element "  + element); 

  return message;
}

 

Loading data to Snowflake Database

Data from the staging Amazon S3 bucket has to be moved to the snowflake database table  - IBPFORECAST. This is done my calling an SQL command on the Snowflake warehouse. We created a separate local flow which is making this call using the Open Connector Snowflake instance and passed on the following SQL command.

 

COPY INTO "TEST_DB"."PUBLIC"."IBPFORECAST"
FROM '@"TEST_DB"."PUBLIC"."INTSTAGE"'
FILES = ('forecast.json.gz')
FILE_FORMAT = (
    TYPE=JSON,
    STRIP_OUTER_ARRAY=TRUE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
)
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
ON_ERROR=CONTINUE

 

It is also possible to store multiple files from a folder and then issue one COPY TO command for the folder. This would copy all the files from that folder path into the Snowflake database table. There is also a default delta load comparison in that integration step on Snowflake. Only new files or updated files are loaded in the COPY TO process. The SQL command is prepared in a Groovy script like the following function and then passed as a payload to the Open connector adapter.

 

def Message handleRequestToLoadSnowflake(Message message) {
    def body = message.getBody(java.io.Reader);
    def input = new JsonSlurper().parse(body);
    
    // Open connectors
    message.setProperty("OCURL", input.OCURL);  
    // Snowflake
    message.setProperty("SFTarget", input.SFTarget);
    message.setProperty("SFStaging", input.SFStaging);
    message.setProperty("SFDestination", input.SFDestination); 
    
    String sqlScript = "COPY INTO " + input.SFTarget + " FROM " + input.SFStaging + " FILE_FORMAT = ( TYPE=JSON, STRIP_OUTER_ARRAY=TRUE,REPLACE_INVALID_CHARACTERS=TRUE, DATE_FORMAT=AUTO, TIME_FORMAT=AUTO, TIMESTAMP_FORMAT=AUTO ) MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE ON_ERROR=CONTINUE";
    
    JSONObject requestObject = new JSONObject();
    requestObject.put("script", sqlScript);
    
    def prettyBody = requestObject.toString() as String;
    
    message.setBody(prettyBody);
       
    return message;
}

 

Once the data is copied into the Snowflake database, the following response is received form Snowflake.

 

[
    {
        "rows_loaded": 10000,
        "errors_seen": 0,
        "file": "s3://< S3 bucket name >/data/sap/forecast.json",
        "error_limit": 10000,
        "rows_parsed": 10000,
        "status": "LOADED"
    }
]

 

The above JSON response shows that 10000 rows of data were inserted into the Snowflake database table from the Amazon S3 bucket using the external staging integration. 

Conclusion

In my previous blog we saw how data can be read from Snowflake and written to IBP.  this blog, I shared a few details on how I,

  1. Configured the Snowflake and Amazon S3 adapter instances using the SAP Open Connectors platform.
  2. Reading data from SAP IBP using WS-RFC adapter – which is wrapped in the helper iFlows which are part of the 2402 release of SAP IBP.
  3. Writing data into an external staging integration and 
  4. Loading data form the external staging area into the Snowflake database table using the COPY TO function.

Although we used an external staging integration in this exercise, it is also possible to store data in an internal staging environment in Snowflake or even Streaming data. But I leave this to the reader to experiment that approach or more advanced trials.