Skip to Content
Technical Articles
Author's profile photo Antonio Maradiaga

Manage your integration flows via the SAP Cloud Integration OData APIs

In this blog post, I cover how the SAP Cloud Integration OData APIs can be used to interact with the integration flows deployed in your SAP Cloud Integration instance. The OData APIs 

⚠️ Disclaimer: The code shared here is provided as-is. Use it at your own risk.

Lately, I’ve been working on a project which involved creating a few integration flows, which in essence where running the same flow but required different parameters to run on a schedule. Updating something in the “main” integration flow meant that the same change had to be reflected in all other integration flows. As you can imagine, the task can be quite repetitive and prone to error because of the large number of integration flows that required updating. Also, it was incredibly time consuming. Automating this simplified and speed up my testing of the new developments.

You can find details of the SAP Cloud Integration OData APIs in the SAP API Business Hub – https://api.sap.com/api/IntegrationContent/resource

SAP Cloud Integration OData APIs to the rescue ⚡️

The SAP Cloud Integration OData APIs allow us to interact with the integration content available in an SAP Cloud Integration tenant. It is possible for us to carry the below actions on an integration flow:

  • Get configuration (externalised parameters)
  • Update configuration
  • Create an integration flow
  • Delete an integration flow
  • Save a new version
  • Update a resource, e.g. Groovy script
  • Deploy an integration flow
  • Undeploy an integration flow
class SAPCloudIntegration:

    def __init__(self, session, api_endpoint, base64AuthString): 
        self.sess = session
        self.api_endpoint = api_endpoint
        self.base64AuthString = base64AuthString
        
        self.CSRF_Token = self.get_csrf_token()

        print(f"Token {self.CSRF_Token}")

    def get_csrf_token(self):
        headers = {
            'Authorization': f"Basic {self.base64AuthString}",
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'X-CSRF-Token': 'Fetch'
        }

        response = self.sess.get(self.api_endpoint, headers=headers)
        token = response.headers['X-CSRF-Token']

        return token

    def get_authentication_headers(self):
        return {
            'Authorization': f"Basic {self.base64AuthString}",
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'X-CSRF-token': self.CSRF_Token
        }

    def create_iflow(self, name, iflow_id, package_id, zip_file_path):
        headers = self.get_authentication_headers()

        encodedStr = ""
        with open(zip_file_path, "rb") as zip_file:
            data = zip_file.read()
            encodedStr = base64.b64encode(data).decode('utf-8')

        payload = {
            "Name": name,
            "Id": iflow_id,
            "PackageId": package_id,
            "ArtifactContent": encodedStr
        }

        url = self.api_endpoint + "/IntegrationDesigntimeArtifacts"

        response = sess.post(url, headers=headers, data=json.dumps(payload))
        
        print(f"create_iflow -> {response.status_code}")

        return response


    def delete_iflow(self, iflow_id, version):

        headers = self.get_authentication_headers()

        url = self.api_endpoint + f"/IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')"

        response = sess.delete(url, headers=headers)

        print(f"delete_iflow -> {response.status_code}")

        return response


    def get_iflow_configuration(self, iflow_id, version):
        url = self.api_endpoint + f"/IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')/Configurations"

        response = self.sess.get(url, headers=self.get_authentication_headers())

        print(f"get_iflow_configuration -> {response.status_code}")

        return response


    def update_iflow_configuration(self, iflow_config, new_config, iflow_id, version):
        batch_id = str(uuid.uuid4())
        change_set_id = str(uuid.uuid4())

        headers = self.get_authentication_headers()

        headers['Content-Type'] = f"multipart/mixed; boundary=batch_{batch_id}"

        # print(f"Content-Type: {headers['Content-Type']}\n\n")

        # Escaping all characters in the payload with \r\n as only using \n will return a 400 error
        payload = f"--batch_{batch_id}\r\n"
        payload += f"Content-Type: multipart/mixed; boundary=changeset_{change_set_id}\r\n\r\n"

        for param in iflow_config:
            
            key = param['ParameterKey']

            param_value_json = {
                "ParameterKey" : key,
                "ParameterValue" : f"{new_config[key]}",
                "DataType" : param['DataType']
            }

            # Construct payload for parameter change set
            payload += f"--changeset_{change_set_id}\r\n"
            payload += f"Content-Type: application/http\r\n"
            payload += f"Content-Transfer-Encoding: binary\r\n\r\n"
            
            payload += f"PUT IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')/$links/Configurations('{key}') HTTP/1.1\r\n"
            payload += f"Accept: application/json\r\n"
            payload += f"Content-Type: application/json\r\n\r\n"
            
            payload += json.dumps(param_value_json, indent=0) + "\r\n\r\n"
            
        # End change set and batch
        payload += f"--changeset_{change_set_id}--\r\n"
        payload += f"--batch_{batch_id}--\r\n"

        # print(payload)

        url = self.api_endpoint + f"/$batch"

        response = self.sess.post(url, headers=headers, data=payload)

        print(f"update_iflow_configuration -> {response.status_code}")

        return response

    
    def deploy_iflow(self, iflow_id, version):

        url = self.api_endpoint + f"/DeployIntegrationDesigntimeArtifact?Id='{iflow_id}'&Version='{version}'"

        response = sess.post(url, headers=self.get_authentication_headers())

        print(f"deploy_iflow -> {response.status_code}")

        return response


    def save_version_iflow(self, iflow_id, new_version):

        url = self.api_endpoint + f"/IntegrationDesigntimeArtifactSaveAsVersion?SaveAsVersion='{new_version}'&Id='{iflow_id}'"

        response = sess.post(url, headers=self.get_authentication_headers())

        print(f"save_version_iflow -> {response.status_code}")

        return response


    def undeploy_iflow(self, iflow_id):

        url = self.api_endpoint + f"/IntegrationRuntimeArtifacts('{iflow_id}')"

        response = sess.delete(url, headers=self.get_authentication_headers())

        print(f"undeploy_iflow -> {response.status_code}")

        return response


    def update_resource_in_iflow(self, iflow_id, version, resource, resource_type, file_path):

        encodedStr = ""
        with open(file_path, "rb") as file:
            data = file.read()
            encodedStr = base64.b64encode(data).decode('utf-8')

        payload = {
            "ResourceContent": encodedStr
        }
        
        url = self.api_endpoint + f"/IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')/$links/Resources(Name='{resource}',ResourceType='{resource_type}')"

        response = sess.put(url, headers=self.get_authentication_headers(), data=json.dumps(payload))

        print(f"update_resource_in_iflow -> {response.status_code}")

        return response

SAP Cloud Integration API – Class

To interact with the OData APIs, I created a Python script which handles the authentication and communication with the different methods. To run the examples below, you will need to install the following dependencies. Save the content in a file, e.g. requirements.txt, and run pip install -r requirements.txt from command line.

certifi==2020.12.5
chardet==4.0.0
idna==2.10
numpy==1.20.0
pandas==1.2.1
python-dateutil==2.8.1
pytz==2021.1
requests==2.25.1
six==1.15.0
urllib3==1.26.3

As mentioned at the beginning, I had to update multiple integration flows at a time. For this, I was storing integration flow configuration in a CSV file and the script in charge of the changes will iterate through the records in the CSV file and perform the required actions.

Examples on how to use the SAP Cloud Integration class

Read the comments inline as it explains how to instantiate the SAP Cloud Integration API class and perform different actions.

import base64
import json
import requests
import pandas as pd
import uuid
from urllib.parse import quote
from SAPCloudIntegration import SAPCloudIntegration as SCI


BASE_URL = "https://mytenant-tmn.hci.sa1.hana.ondemand.com/api/v1"

# Username:Password encoded in Base64
# $ echo -n "user@company.com:MyPassword#123" | base64
BASE64_AUTH = "dXNlckBjb21wYW55LmNvbTpNeVBhc3N3b3JkIzEyMw=="

# Request session is created to preserve the CSRF token
# between API calls
sess = requests.Session()

# Instantiate the SAP Cloud Integration API
sci = SCI(sess, BASE_URL, BASE64_AUTH)

column_types = {
    "Version": str,
    "New_Version": str,
    "Ariba_DateInterval": str,
    "Startup_Delay": str
}

config_df = pd.read_csv("FlowConfiguration.csv", sep=";", keep_default_na=False, dtype=column_types)

iflow_config = json.load(open("configuration_sample.json"))["d"]["results"]

for ix, row in config_df.iterrows():
    if row['Action'] == "X":
        print("=====================")
        print(f"Process row {ix} - {row['Id']}")

        iflow_id = row['Id']
        iflow_name = row['Name']
        version = row['Version']
        new_version = row['New_Version']

        ##########################################
        # Update script, config, and deploy
        ##########################################
        
        response = sci.update_resource_in_iflow(
            iflow_id=iflow_id, version=version, 
            resource="script5.groovy", resource_type="groovy", 
            file_path="../scripts/MyScript.groovy"
        )
        
        response = sci.save_version_iflow(
            iflow_id=iflow_id, new_version=new_version
        )
        
        response = sci.update_iflow_configuration(iflow_config, row.to_dict(), iflow_id, version)

        if response.status_code < 300:
            response = sci.deploy_iflow(iflow_id=iflow_id, version=new_version)
        else:
            print(response.text)

I imagine you want MORE examples…. here you go 👇

###################
# Get configuration
###################

response = sci.get_iflow_configuration(iflow_id, version)

print(json.dumps(response.json()))

##########################################
# Undeploy, create, configure, and deploy
##########################################

response = sci.undeploy_iflow(iflow_id)

if response.status_code > 300:
    print("iFlow not deployed")

response = sci.delete_iflow(iflow_id, version)

response = sci.create_iflow(
    name=iflow_name, iflow_id=iflow_id, 
    package_id="MyPackageId", 
    zip_file_path="IntegrationFlow.zip")

if response.status_code == 201:

    response = sci.update_resource_in_iflow(
        iflow_id=iflow_id, version=new_version, 
        resource="script5.groovy", resource_type="groovy", 
        file_path="../scripts/MyScript.groovy"
    )

    sci.save_version_iflow(
        iflow_id=iflow_id, new_version=new_version
    )

    response = sci.update_iflow_configuration(iflow_config, row.to_dict(), iflow_id, new_version)

    if response.status_code > 200:
        sci.deploy_iflow(iflow_id=iflow_id, version=version)

I hope that the code shared in this blog post helps you leverage the SAP Cloud Integration OData APIs and enables you to automate/simplify your development process in SAP Cloud Integration.

Assigned Tags

      Be the first to leave a comment
      You must be Logged on to comment or reply to a post.