Technical Articles
Using Structure Tables with Python Operator in Generation 1 Graphs
On a recent blog post, I discussed creating dynamic tables with Generation 2 graphs. There was a question on how one can use Structure Table operators like Data Transform or Table Producer with a Kafka operator and Python operator using Generation 1 graphs. This combination would require the graph to construct a table and use a “table-typed” port to pass the information to the Data Transform or Table Producer operators. In this blog, I will describe how to connect and populate ports using a “table-typed” port from Python.
First you will need to create a Generation 1 graph, press the configuration button on the top right to bring up the configuration of the graph. The “Configuration” button is boxed in red on the top right of the image below.
Next press the “+” button to add a local data type to the graph definition. This button is boxed in red on the bottom right of the image above. We use local data types as they provide a means to specify lengths for string data. You will be prompted with the following dialog.
Select “Table” and provide a name as shown above. Press on the “OK” button. You will be prompted with a dialog that enables you to create a table definition. Provide a description and then press the “+” button circled in red below to add a column. To keep things simple we will create a two column table with the data types integer and string.
Provide the name of the column and select the datatype of “com.sap.core.int32” from the drop-down menu. And press the “+” button again to create a second column.
Provide a name for the second column and select the radio button that maps to “Create” a datatype. Provide the datatype a name, below I use “mystr50”. Select “string” from the Template drop-down menu. Provide a length for the string datatype. Note: Structured tables need to have a size boundary. What you are doing here is creating a local datatype called mystr50 with a definition of string and a size of 50.
Press the “Save” button. You will now see two new datatypes local to the graph as shown below in the Graph configuration.
Drag the Python3 Operator to the canvas. Open the script editor by hovering over the operator and clicking on the script icon which is bolded in the image below.
In the script add the following code.
def gen():
body = [[1, 'Row1'], [2, 'Row2'] ]
type_id ="mytbl"
batch1 = api.Table(body, type_id, {"lastBatch": False})
body = [[3, 'Row3'], [4, 'Row4'] ]
batch2 = api.Table(body, type_id, {"lastBatch": True})
api.send("output", batch1)
api.send("output", batch2)
api.add_generator(gen)
This code uses the api.Table method to generate two batches. It is important to note that the “lastBatch” header is set for each batch. Batch1 is setting the header to false as there are more rows expected whereas Batch2 is setting the header to True as this is the last batch. If you omit this header, you may not get the desired behaviour. The code also specifies the type_id as “mytbl” to be complete.
Next hover over the Python Operator and press the “Add Port” button which is bolded below.
Provide the name “output” for the port and select “Output Port” and select “Table” as the data type. If you do not provide the name as “output” the above code will not work as it expects a port named “output”.
Select the local “mytbl” Data Type ID from the dialog.
Press the “Select” button. Press the “OK” button.
Drag a “Table Producer” operator to the canvas and connect the Python3 operator to the “Table Producer operator. You could also drag the “Data Transform” operator to the canvas but for simplicity, we will use the “Table Producer”. Note that you are able to connect these two operators. If you don’t have the datatype set properly, they may not be able to connect.
Open the “Table Producer”s configuration. Specify the target table by pressing the pencil in the “Table Producer” configuration boxed in red below.
Select an appropriate table for your data and specify the column mapping.
Drag the “WireTap” operator to the canvas and connect the “Table Producer” to the “Wiretap” operator.
Save and Run the graph. When you run the graph the Wiretap operator should produce the following message or something similar depending on your table name.
Hopefully you now have the basis of how to use/generate structured table data types.
Good Luck.
Chris Gruber Wow thanks a lot this is simply great. Thanks for looking into my blog post.
But in this case we don't need data transformation? Source to target mappings, filters, lookup, target json structure build will happen in python script itself? Or we still need data transformation to perform mappings and data operations?
In my case input comes from Kafka producer as json string which needs to be Inserted to Hana Database. Fields of kafka and hana db are totally different.
If you are doing the filters/mappings/projections in your Python script then you would not need the Structured Table operator like Data Transform. In fact you could simplify using the SAP HANA Client operator. The port then would not need a Structured Table but a simple message.
Chris Gruber Could you please help us to brief/elucidate last batch to true and false in batch 1 and batch 2. I'm getting little puzzled with it.
" It is important to note that the “lastBatch” header is set for each batch. Batch1 is setting the header to false as there are more rows expected whereas Batch2 is setting the header to True as this is the last batch. If you omit this header, you may not get the desired behaviour. The code also specifies the type_id as “mytbl” to be complete. "
Lets look at the lastBatch where it is used more typically used in a more conventional setting. Consider the SQL Consumer and Table Producer being connected with a "Graph Terminator". Also consider the scenario is such that the SQL Consumer is going to read 100 rows in 10 row chunks. The SQL Consumer will send 10 rows 9 times with the lastBatch header to "False". On the final chunk or rows, it will send the last 10 rows and set the lastBatch to True to indicate there is no more data in following operators. The Table Producer will populate the rows and eventually commit the rows into the database. The lastBatch will certainly cause the Table Producer to commit the records but it will also populate the output port which will indicate to the Graph Terminator to stop the graph.
I hope this description of a more conventional setting helps you understand the usage of lastBatch.
Thanks alot for your valuable time and blog with right expertise. I tried it but getting below error.
Please need your guidance here Chris Gruber . Thankyou!
ERROR
Group messages:
Group: default; Messages: error building graph: error during init of process: component=com.sap.system.process.subengine.v2.operator process=subengineOperatorV20: subengine operator init error: Unexpected error while building graph: Error while initializing operator com.sap.system.python3Operator.v2:python3operator1: Error while executing Python Operator's user provided script: 'Api' object has no attribute 'add_generator' File "<custom script>", line 10, in <script body>
; the following error also happened during shutdown: expected status code 200 but got 500: ""
Process(es) terminated with error(s). restartOnFailure==false
GRAPH : Inputs json string is sent to Python operator and output data type is created use as output port in python operator and mapped to data transformation to perform data operations.
There are two problem in the graph I am guessing.
api.add_timer(callback)
2. In your graph you would not need a "generator" or timer as it is receiving data from the "Vessel Schedules" operator. My example in the blog had no previous input so that is why I used a generator for Generation 1 graph. You probably need to change the code I provided to use an "input" port and the method:
api.set_port_callback("input", on_input)
These are educated guesses however since I have limited information.
Hello Chris Gruber
I may please require your kind support and expertise guidance.
Below is the 'Script' but it is throwing error, your blog will be helpful if it covers the inputs coming from source applications in a string data type and convert to table typed it would be really greatful indeed. Kindly require your added inputs here.
I really dont wanted to do filters/mappings/lookups/projections in Python script makes really complex instead use the Std. operator "Data transformation" and
for Insert/update to Hana Table I use "Table Producer" with setting max batch size it makes simplier. Last batch is an added feature and reliable functionality but will it work with Table producer via data transformation since we already have max batch size and user defined batch size settings in table producer.
port
Error:
Group: default; Messages: error building graph: error during init of process: component=com.sap.system.process.subengine.v2.operator process=subengineOperatorV20: subengine operator init error: Scalar 'com.sap.core.string' could not be found in local scope; the following error also happened during shutdown: expected status code 200 but got 500: ""
Hello
I think there are still at least two problems.
This code worked fine. It sends one row at a time however.
FINALLY we did that
Chris Gruber Rajesh PS
Couple of findings which I may please require your keen review and Inputs.
2. If above is enabled, do we still need maximum batch size setting in Table Producer?
3. In HANA database, SQL data type is considering as "NCLOB" automatically and varchar is not accepting. Also the value inserts like below which is incorrect it should have only the value 125486
Successful Graph :
Script:
Wiretap: output of node base operator
Regarding your point in question 1), where one record being updated when 50 records are going in, I am not sure what you are referring there. Are you saying that 50 records come in a single input or are you saying 50 records come in and it only handles one record at a time and not batching things up? I would think you need to setup a batch logic in the python code if it is the latter. Consider the following code to batch up 50 records.
If it is the former then your logic is not separating the rows in the python code. The batch size setting would not have any affect in this regard, you would need to add batching logic.
Regarding your third question, I do not understand or have the context to answer.
Batching is pretty good and helps to run smoothly. So lets say in source database there are 500 records (for today), SDI consumes 50 records each 10 times as per above script. This is really cool feature
1.
Chris Gruber but Kafka Producer consumes based on Max Message (bytes) i.e. the maximum number of message bytes to fetch from the broker in a single request. Value is 1000000 bytes with Max Wait Time i.e. the maximum amount of time in milliseconds that the broker waits for the consumer. Value is 500 ms (default). and with newest offset
Will there be discrepancy, data integrity issues or data loss with the value defined above in kafkaProducer Vs batch value defined in Python Script ?? How to confirm whatever is consumed by Kafka is updated to DB completely in batches
2. When using debug i get below error. So I changed to output then it started working and sends 50 records correctly
Hurray !
Trying to send on unknown output port (debug). Known output ports: ['output']. [file '/vrep/vflow/subdevkits/pysix_subengine/pysix_subengine/base_operator.py', line 1077]
3. This is quite important, not sure why the database table inserts like below (as a json string) which is incorrect it should have only the value 130001. May be from node base operator its coming as json string ? Please require your inputs here Chris Gruber
4. In case if we wanted to change the batch value from 50 to 100, should we need to edit the script
. Why don't we make this runtime, by maintaining it some value mapping table which can be changed easily without touching the graph.
With respect to question 1), I am not familiar with the Kafka operators to that level of depth. I suggest you open a support ticket for this type of question.
Regarding your question on 3) I have no view into the logic you are using to put the data together for insertion into the database. I don’t think the Table Producer is changing your data if that is what you are asking. I would consider two things to debug your logic:
a. Perhaps send the data to a debug port to validate what data looks like at certain times within your script. You like would need to “stringify” the data and send it to a wiretap. (ie api.send("debugport",str(mytbl)) )
b. Truncate the table so that you are not looking at older data and older runs which may have had a problem that was fixed.
Regarding your question 4), you can setup a parameter in your python operator by pressing the "+" button in the operators configuration boxed in red in the image below.
Then provide the parameter a name. As shown below, I will call it "getsize".
Notice the value "${sizeprompt}" I provide above. I will reference this later when I run the graph.
From this point we have a configuration parameter for the python operator that can be used in your python code as follows:
This will set the value of your python variable to the setting used in the python configuration.
Save the Graph and Run it.
You will see a prompt for "sizeprompt" when running the graph because I used ${} around the value this markup treats the python parameter as a runtime parameter for the graph and the number you specify in the run setting will be used in your python code to set the value of the batch size.
I hope this helps.
Thankyou so much Chris Gruber
1. I have raised an Support ticket (208366/2023) with SAP, but kafka /event driven architecture its better let the kafka producer at first operator in graph itself decide how much max messages needs to be consumed rather than batching in Python3 Operator.🤔
2. I have tried to map the "debugport" to wiretap, I get the below error. But the problem is table-typed port cannot be mapped to wiretap
Graph failure: operator.com.sap.system.python3Operator:python3operator1: Error while executing callback registered on port(s) ['input1']: name 'mytable' is not defined [line 13]
Process(es) terminated with error(s). restartOnFailure==false
My Script (including : debugport and getsize property ) Please review and any issues please let us know Chris Gruber .
3. Regarding the inputdata, i found the issue it is splitting based on ',' so the json fields is also written to database like below.Not sure how to update only the values correctly. 😳
inputdata = data.split(',')
Regarding your question 2) you do not have a variable called "mytable" in your code and you are referencing an undefined variable. I used this variable("mytbl") as it was referenced in previous posts. In the code you provided, you would most likely be interested in "batch1" so you would want to send batch1 to the "debugport". At least that is how I interpret it.
Regarding question 3, this is more of a general Python question. In my blog example, the data coming into the Python operator is CSV so splitting via comma was what was used. If the data is JSON or string representation of JSON, then you need to treat the data accordingly.
First it would be easiest to handle the data as a JSON object using the following code.
I am not certain about the entire message as it could have multiple rows or a single row entry per message. So you as a reader have to apply your understanding of the data.
In the simplest case where there is only one record per message, then you could provide the following code to format the data assuming it is a JSON object.
If there are multiple rows, then this is more complicated as you will want to loop through the object with something similar to the following:
Since I am only guessing at the your data structure and desired output, this code may need to be changed for the actual format and desired output.
Thanks Chris Gruber
Graph failure: operator.com.sap.system.python3Operator:python3operator1: Error while executing Python Operator's user provided script: unexpected indent (<string>, line 7) [line 7, offset 6, text ' if type(data) is str:\n']
Process(es) terminated with error(s). restartOnFailure==false
This is multiple records in an array [ [ ], [ ] ]
Script :
Please review the input data(json string) to have only the value inserted correctly and also Batching can be removed since it creates discrepancy with kafka maxMessageBytes.
Kindly help Chris Gruber
JSON string - Please review and kindly help
There are three problems with the code as I understand your intentions.
First, your indentations are not consistent and this is the reason for the errors above. You started with a three character indentation within your function and when you have your first if statement and "for loop" you have a seven character indentation. With Python indents matter.
Second, your "datain" variable may not be set if the body variable is a JSON object and not a string as this variable is only set when it incoming data is a string. Your current indentation has the "datain" assignment set when the data variable is a string.
The third problem is the data and your code are not aligned. Your looping does not take into account the arrays in your data. Also the dictionary references are also not likely going to work once your indentations are array references are fixed.
Consider the following code corrections:
Again the "standard disclaimer", this code is subject to my interpretation of the desired result and data structures and may not completely accurate.
As this seems to be more of a general Python set of question, there are many Python resources on the web for general Python learning you may want to have a look at the following sites for general Python questions like these.
OpenSAP
W3 Schools
Thanks I will take this Up Chris Gruber ideally data transformation operator should have options to treat as json object and deliver to next pipeline. Here mapping and transformation becomes very complicated and tough here 🙁
But coming to Batching in python 3 operator, ideally messages coming from source whether its event, or file or API based we still need to check the total record coming from source and then apply batching in python 3. Instead we are directly applying/Enforcing the value '50' and setting lastbatch to true. SAP also has no solution currently regards to kafka max message size with python operator batch count so SAP informed to Get the total no of records coming from Source application.
Chris Gruber
For example when the loop completes total record = batch count then terminate the graph.
total records (coming from source kafka) = 10,000
batch count (specified in python operator) = 500
loop runs 20 times
Could you please provide some brief inputs here. Thankyou Chris Gruber