Product Information
SAP Data Intelligence Generation 2 Graphs: Dynamic Tables [Part 1 of 2]
An introduction to Generation 2 Operators
In November 2021, SAP Data Intelligence Cloud introduced Generation 2 (Gen2) operators and graphs. This next generation provides a some very interesting features:
- A new data type paradigm which includes support for dynamic tables.
- Graph Snapshot capabilities that allows saving the state of a graph at a user specified interval.
- While the ability to pause and resume graphs applies for all graphs, coupling pause/resume functionality with Gen 2 snapshots provides very interesting value in this release.
- While there is restart features with all graphs, with the new snapshot support, you can also enable graphs to support automatic recovery and resume state
- Upgraded Python 3.9 support from Python 3.6 is default in Gen2 graphs. Note: Gen1 graphs may see this change in the future.
With the fundamental changes in Gen2 and need for backward compatibility, the classification of Gen1 and Gen2 graphs has been introduced as Gen1 operators are unable to work with Gen2 operators in the same graph.
In this blog series I would like to walk you through creating a graph that takes advantage of using the Dynamic Table feature. Part 2 of this blog series discusses how one can add support for snapshots for the graph to support pause/resume functionality.
Prerequisites
This blog series requires the following:
- A HANA Cloud database with table created as follows:
“CREATE COLUMN TABLE MYTBL ( ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR(100), MEAN FLOAT);
Note: you can use non-HANA Cloud, but the connection parameters may need to be further tweaked.
- Create a docker image with the following definitions. To find out how to create and use a custom operator with a docker file. There are several great blogs on the subject.
Dockerfile:
from $com.sap.sles.base
RUN python3.9 -m pip install --upgrade pip --user
RUN python3.9 -m pip install --upgrade Pillow --user
RUN python3.9 -m pip install hana_ml== 2.11.22010700 --user
Tags.json
{
"python": "3.9",
"hana_ml": " 2.11.22010700",
"pyarrow": "4.0.1",
"u-msgpack-python": "2.7.1",
"tornado": "6.1.0"
}
After adding this file, your Tags configuration for the Docker image should appear as follows:
Graph Scenario
There may be cases where you need a dynamic approach to pass data in a table format and often you may not know the structure of the table before hand. The simple graph you will create using this blog is going to dynamically produce a table with three columns with datatypes: integer, string and float. The graph will generate 10 records every 5 seconds for the dynamic table and pass this information to an operator that will receive the data and populate a database.
While this graph could benefit from using pre-defined operators like “Write HANA Table” or “Table Producer”, the author felt that using this approach will help reader understand enabling snapshots for an operator which is discussed in the second part of the blog series.
Steps for the Scenario
1. Create a Gen2 Graph
Select the “Graphs” vertical tab and Click the “+” drop down and select “Use Generation 2 Operators”
Generation 2 Graph
2. Add a Python operator which will act as the data generator (source).
If there are references to a source operator, this operator is what is considered the source operator for this discussion. At this point the vertical tabs should have changed to make the “Operators” active. Add a Python operator by Scrolling down to the Processing category and drag the Python 3 Operator to the canvas.
3. Open the script editor to add data generation to produce the table.
Paste following code within the script editor.
counter=0
def gen():
##Section 1: Create and populate the table
global counter
tbl = []
#Establish a list of lists [[1,'1111111111',1.1],[2,'2222222222',2.2],...]
tbl = [[counter+i,str(counter+i*11111111111),round(counter+i*1.1,1)] for i in range(1,11)]
table = api.Table(tbl)
##Section 2: Set the columns types for the table.
columns_definition = {
"id": api.DataTypeReference("scalar", "com.sap.core.int64"),
"name": api.DataTypeReference("scalar", "com.sap.core.string"),
"mean": api.DataTypeReference("scalar", "com.sap.core.float64")
}
##Section 3: Create and set the type context for the output port
table_type_ref = api.type_context.create_new_table(columns = columns_definition,keys = ["id"])
api.outputs.myDynamicPort.set_dynamic_type(table_type_ref)
##Section 4: Output the table and adjust the counter for the next group of rows
_ , writer = api.outputs.myDynamicPort.with_writer()
writer.write(table)
writer.close()
counter += 10
##Return "5" will wait five seconds and rerun the "gen" function similar to a timer.
return 5
api.add_timer(gen)
As you can see within the comments above there are four main sections for the dynamic table.
Section 1: Creates the table and uses a range loop to create a list of lists where the columns are integer, string and float respectively. Finally once the list named “tbl” is populated, the api.Table() method is applied to create the table. The comment gives you an idea of what the list will look like.
Section 2: Establishes the column data types for the table using a JSON structure. The datatypes such as “com.sap.int64” are type definitions that can be found in the documentation. There is a large list of options. In the documentation you will also find the mapping from Python to these values.
Section 3: Establishes the type context using the column definitions from Section 2. The type context provides access to methods that interact with the type system, such as checking if a type exists or creating a new dynamic type. The second line of code in the section references an output port named “myDynamicPort” which you will be creating soon and assigns the type context to this port.
Note: Section 2 is provided to be thorough and explicit with respect to the typing for the columns. You can simplify this code by inferring the datatypes and assigning it to a type context. To do this replace Section 2 and the first line of Section 3 with the following line of code:
table_type_ref = table.infer_dynamic_type()
Section 4: Establishes a writer for the output port, writes the table to the output port and closes the writer. Finally the counter is updated for subsequent row generation. This counter controls which numbers are being used to send data to the target operator.
The last line creates a timer callback for the function “gen”.
4. Close the script editor.
5. Create the output port
Right click on the Python 3 operator and select “Add Port”
Provide the name “myDynamicPort” which is referenced by the code above. Specify that the port is an output port and that the structure is a “Table” and the datatype ID is “*” as shown below. The asterisk, represents a dynamic table.
6. Add a Python operator which will receive the dynamic table and populate a database (we will reference this operator as the “Target” operator).
At this point the “Operators” tab active should still be active on the vertical tabs bar. Add a Python operator by Scrolling down to the Processing category and drag the Python 3 Operator to the canvas.
7. Create the input port on the recently added Python 3 operator
Right click on the freshly added Python 3 operator and select “Add Port”
Provide the name “myDynPort”. Specify that the port is an input port and that the structure is a “Table” and the datatype ID is “*” as shown below.
8. Open the script editor to add the connection code.
Click on the Script icon for the Python operator when hovering over the Python3 Operator.
Paste the following code into the script editor. Change the hardcoded credentials to your own HANA database credentials. Note: Sometimes copy and paste change the quotes for the credentials which may cause errors; ensure single quotes are not converted to something that may cause syntax errors.
from hdbcli import dbapi
import pandas as pd
#conn = dbapi.connect(
# address=api.config.hanaConnection['connectionProperties']['host'],
# port=api.config.hanaConnection['connectionProperties']['port'],
# user=api.config.hanaConnection['connectionProperties']['user'],
# password=api.config.hanaConnection['connectionProperties']['password'],
# encrypt='true',sslValidateCertificate='false', autocommit=False)
conn = dbapi.connect(
address='<enteryourdbhosthere>',
port=443,
user='<enteryourHANAuser>',
password='<enteryourpasswordhere>',
encrypt='true',sslValidateCertificate='false')
In the code above we import some needed libraries, establish a connection. You will note that the code uses both configuration setting values (commented out) and hardcoded values. For simplicity the author uses hardcoded values but it is recommended to use configuration setting values for HANA connections and alike to avoid exposing credentials. This link references a blog on leveraging Managed Connections which will help you to use the recommended way of doing connections within your operator.
The author is using hdbcli library (rather than hana_ml) as it is more useful for illustration purposes when implementing the snapshots in the second part of the blog series.
9. Add the code to configure the input function to read and manage the data from the source operator and populate the database.
Paste the following code after the code above:
def on_input(msg_id, header, body):
global conn
##Section 1: Read the table data
table_reader = body.get_reader()
tble = table_reader.read(-1)
##Section 2: Output the table data to a the output port
api.outputs.output.publish("df--"+str(tble))
##Section 3: Uses the list of lists to insert the record into the database
## into the datbase
cursor = conn.cursor()
sql = "insert into MYTBL values (?,?,?)"
#sql = "upsert MYTBL values (?,?,?) with primary key"
cursor.executemany(sql, tble.body)
#conn.commit()
api.set_port_callback("myDynPort", on_input)
As you can see within the comments above there are 3 main sections for the dynamic table ingestion.
Section 1: Establishes the reader for the input being received and read to the end of the input. The negative one value identifies to read to the end of the input.
Section 2: Writes out the table data in string format to the output port named “output” which has not yet been created yet. This is done so the reader can see the data being received to validate what is happening during the graph execution.
Section 3: Uses data which is a list of lists from the input and uses the hdbcli library to insert the lists into the database.
The last line creates the callback for the function “on_input”.
10. Close the script editor.
11. Add an the “output” port for the recently added operator or the target operator.
The name of the output port is “output” as the code references and select that is a “Scalar” and set to “com.sap.core.string” datatype ID.
12. Rename the “Python 3” operator to “Target”.
On the second “Python 3” operator (the target operator), double click on the text “Python 3 Operator” and type “Target”
13. Add the Wiretap operator
Ensure the vertical tab is still set on “Operators” and scroll down to find the Wiretap operator. Drag the “Wiretap” operator to the right side of the “Target” operator that was recently renamed.
14. Link the “Target” Operator output port with the input port of the “Wiretap” operator.
Select the output port “Target” operator and drag it on top of the input port of the “Wiretap” operator.
15. Link the “Python 3” Operator output port with the input port of the “Target” operator.
Select the output port “Python 3” operator and drag it on top of the input port of the “Target” operator.
16. Add a Group to the “Target” operator and configure it to use the docker image that you set in the prerequisites section.
Right click on the “Target” operator and select “Group”.
Click on the Group and select the “Configuration” icon, the icon is circled in red below.
Using the “+” button circled in red below, add the tags so that this operator will use your custom docker file.
17. Save the graph with a name for “DynamicTypes” and run it.
Click on the Save button as shown below.
18. Run the graph.
Select the drop down for the “Run” icon and select “Run”.
Summary
You have just created a graph that uses Dynamic tables as a means to input data which is a list of lists and output data. Congratulations!
A suggested the exercise for reader would be to use the Wiretap operator to review what is being sent to the target operator during operation and validate the data was inserted into the table.
Next step is to add “Snapshot” capabilities, click here to find out how.
By the way I recently found that the target operator can acquire column name information via the following function calls.
def on_input(msg_id, header, body):
table = body.get()
table_def = api.type_context.get_vtype(table.get_type_refence())
column_names = list(table_def.columns.keys())
I hope this helps!
Hi Chris,
Great blog!
Getting this error when running the code:
Graph failure: Error happened during port callback: 'Outputs' object has no attribute 'output' File "<custom script>", line 199, in on_input
| error with process "python3operator1": Error happened during port callback: 'Outputs' object has no attribute 'output' File "<custom script>", line 199, in on_input
Corresponds to this line of the code:
Any ideas?
Do you have an output port with the name "output" for this operator?
Chris Gruber : I'm using data transformation after python3 operator to evoke data operations but the problem it is not compatible with table port data type
https://answers.sap.com/questions/13811772/kafka-to-hana-table-build-error.html
Consume avro messages - (kafka producer)
decode avro encoded message to json - ( node base operator using nodejs)
to convert the string to table data type port - (python 3 operator ) stucked here totally
perform data operators - (data transform)
to insert/update records to HANA table - (table producer)
Thanks in advance Chris Gruber
Hello Rajesh, the blog on Using Structured Tables with Python in Generation 1 graphs may help.
Hi,
I have extracted a HANA table using "Read HANA Table" operator and I am able to read the table into python operator. But I am not able to convert it into json or pandas dataframe to be able to do further processing on it.
Please advise.
Thanks,
Jatin
There is a helper function for the dataframe.
Also the code above (tble.body) is a list of lists.
import pandas as pd
import json
def on_input(data):
df = pd.read_json(data)df = pd.json_normalize(json.loads(data))
api.send("output", df.to_json(orient="records"))
api.set_port_callback("input1", on_input)