SAP Data Intelligence – Extract Salesforce data with a Custom Operator
SAP Data Intelligence have some capabilities of extract data from many sources, like SAP ABAP based systems, traditional and nosql databases, streaming tools, API’s and even SAAS 3rd Party solutions, but if you need to extract data from Salesforce, there are no standard operator to achieve these requirement. Because of this, we will see in this article how to create a new SAP Data Intelligence Python based Operator to extract Salesforce data.
Salesforce Data Extraction Method and Prerequisites
One way to extract data from Salesforce is use the REST API, that consume data using SOQL language. SOQL is similar to the SELECT statement in the Structured Query Language (SQL) but is designed specifically for Salesforce data.
In this method, you only need a user in Salesforce with username, password, security token (you can enable security token in this article). If you Salesforce environment is a sandbox, you will need too the login url of the organization.
To consume Salesforce REST API, you can use the standard endpoints and code the requests or use a 3rd party library to do these hard work. The most used libraries to do these is JsForce for NodeJs and the Simple_Salesforce for Python. In this article we will use the Python library.
SAP Data Intelligence Docker Image
Every operator in SAP Data Intelligence pipelines runs on a Docker Container, and because of this, we will need a new Docker Image containing the Simple_Salesforce Library.
To do that, in SAP Data Intelligence Modeler, Repository tab, right click in folder “dockerfiles “and select the option “Create Docker File”:
Enter the name “Salesforce” for the name of the new Docker Image:
Post the bellow code in the new file that awsopen in the Editor called “Salesforce”. that code install the “Simple_Salesforce” library using PIP, the Python standard package manager:
FROM python:3.6 RUN pip install simple_salesforce RUN pip install tornado==5.0.2 RUN groupadd -g 1972 vflow && useradd -g 1972 -u 1972 -m vflow USER 1972:1972 WORKDIR /home/vflow ENV HOME=/home/vflow
After that, click in the Configure icon in the top right of the Editor and add the following tags in the Docker Image.Lastly save and click the Run icon in the top right of the Editor and wait the Build of the Docker Image:
SAP Data Intelligence Custom Operator
SAP Data Intelligence pipelines works with operators to perform some taks, like connect to data source, process data, debug, and store the data in data targets.
You can use processing operators to code and perform some tasks as you like, and even create new custom operators based on processing operators. These capability ensure that you will decouple the code and the pipeline, improving the maintainability of the operator and his reuse in other pipelines.
To create a new Operator, click on the New Icon in the Operators tab of SAP Data Intelligence Modeler:
Fill the fields in the next screen like bellow. Ensure that the Base Operator is “Python3 Operator”:
After click in “OK” button, the Editor will open with a new Operator. Add a svg icon to the Operator, and in the “Ports” tab add two new output ports of type message:
- output: these port will send each record queried in Salesforce data
- finish: these port will send data after all records are sended to output port. Can be used to finish the pipeline, as we will see in the tests
In the “Tags” tab, fill the tags with the same you configured in the SAP Data Intelligence Docker Image last step. It will ensure that the Operator will use the Docker Container previously builded:
In the “Configuration” tab, create the bellow parameters, all of type “string”:
- username: Salesforce user username
- password: Salesforce user password
- security_token: Salesforce user security token
- login_url: if in sandbox Salesforce environment, will replace the standard login url
- soql_statement: The SOQL Statement that will queried in the Salesforce
In the “Script Tab”, post the following code.
from simple_salesforce import Salesforce import json def gen(): sf = Salesforce(instance=api.config.login_url,username=api.config.username,password=api.config.password,security_token=api.config.security_token) data = sf.query_all(api.config.soql_statement) for dic in data["records"]: record = json.dumps(dic, indent = 4) api.send("output", api.Message(body=record)) api.send("finish", api.Message(body="")) api.add_generator(gen)
These code use the “Simple_Salesforce” to connect to the Salesforce Environment, query the SOQL Statement parameter and send each records as json format in the “output” port. Also it trigger the “finish” port after all records are sended to “output” port.
Save you Operator after all steps.
Create SAP Data Intelligence pipeline to test our Custom Operator
Create new SAP Data Intelligence pipeline, dropping our new Custom Operator, a Wiretap Operator connected in the “output” port and a Terminate Graph Operator connected to “finish” port:
Configure the Custom Operator, filling the parameters with you Salesforce environment. An example of SOQL Statement is “SELECT Id, Name FROM Account LIMIT 10”.
Run the pipeline and when the pipeline status is “Running”, click on it in the Status area and right click the Wiretap Operator, selecting the option “Open UI”.
A new window will appears with the json records queried from Salesforce.
If your query is too much faster, “finish” port will be triggered, finishing the pipeline, and you cannot be able to access the Wiretap content. In this case remove “Terminate Graph”, save and run the pipeline again.
SAP Data Intelligence proof again that are a robust and dynamic tool to extract data from 3rd party platform, giving the developer full freedom when they don’t have a standard connector.
If you have questions or insights about this article, fell free to write an comment. Also, if you wanna learn more about SAP Data Intelligence, fallow me and like that article.
Great way of creating a generic operator for your use case especially when you want to do bulk data. It is definitely handy to run an SQL-like statement to get data. If you have chance take a look at the blog series by Thorsten Hapke who wrote how to use CrouchDB in Data Intelligence.
Alternatively you could just the existing SAP Open Connectors capabilities on SAP BTP and Data Intelligence. There is already integrations that you can configure and use.
SAP Open Connectors - Salesforce
I tried the above steps as mentioned in blog, but facing the below error:
Group messages: Group: default; Messages: error building graph: error during init of process: component=com.sap.system.process.subengineOperator process=subengineOperator0: engine com.sap.python36 failed with error: exit status 1: Traceback (most recent call last): File "/vrep/vflow/subengines/com/sap/python36/subengine.py", line 11, in <module> import pysix_subengine.subengine File "/vrep/vflow/subdevkits/pysix_subengine/pysix_subengine/subengine.py", line 6, in <module> from . import pre_import_all File "/vrep/vflow/subdevkits/pysix_subengine/pysix_subengine/pre_import_all.py", line 41, in <module> import tornado.websocket ModuleNotFoundError: No module named 'tornado' Process(es) terminated with error(s). restartOnFailure==false
Could you please suggest what is the issue here? Do we need to import simple_salesforce file from github? Unable to figure out the issue.