Product Information
Generation 2 Graphs: Adding Snapshot support [Part 2 of 2]
Recap
In our first part of this blog series a simple graph created to dynamically produce a table with three columns with datatypes integer, string and float. The graph will generate 10 rows data within every 5 seconds for the dynamic table and pass this information to an operator that will receive the data and populate a database and print out the data in a wiretap operator. This blog is predicated that you completed the first part of the blog series and this graph exists.
Considerations for Snapshots
In this blog, the discussion will focus on adding support for snapshots to the previous graph. It is important for you to consider what values need to be saved and restored and how things need to be replayed for snapshot support. When running a graph with snapshots, snapshots occur periodically based on the user specified interval. When resuming from a snapshot it is possible that the data may have been partially sent to the target operator in the previous execution as such some data may need to be “re-played”. In the case of the graph in the first part of the blog series, the graph may have inserted records 1 through 100 into the database but it may still resume with the counter at 80 which means the graph will re-insert rows 80 through 100. A properly implemented graph supporting snapshots needs to be able to be replay data without causing detrimental impact. Additionally, you need to identify what you data need to restore. In the graph being discussed the value of counter drives the row generation, so this variable needs to be “snapshotted” as it is key to the state information.
Finally, it is important to call out that the key operators for snapshots are “Sources” and “Targets”. In graph being discussed the “Python 3 Operator” is the source and the operator renamed “Target” is the target operator in the graph. There are markers or epochs that are used internally when snapshots are implemented to map snapshots between operators. Some SAP operations use these markers to manage their own “buffers” particularly releasing information from the buffers. For this reason, you should be implementing snapshots in the Target even if there is nothing to “save”, as your implementation will may help the source operators understand what they may be able to release based on the markers that the targets identify with as being saved.
Note: In reality there may be many operators between the source and target operators in a graph and data will be in transit through these operators so this example graph probably does not have the number of transition points as what many real implementations would have. Also, not all operators in the graph need to implement snapshot support.
For the graph upon which we are working with we need to consider saving the counter within the Source operator in the snapshot. Also, the implementation needs to reconsider using “insert” into the database since this will cause primary key issues on the Target operator.
Learning Point: Lets briefly consider a scenario where the graph did not generate data dynamically but rather pulled data from a database table. To support snapshot where the database is source of the scenario, the code would need to query the database in a manner that guarantees order (typically via ORDER BY clause) and then keep track of what rows you had already sent (likely by using a counter variable). Your implementation would need to include an ORDER BY and likely a LIMIT/OFFSET clause in the query. Similar to the example in the first part of the blog series, a counter is needed to keep track of the rows that you had already sent. This counter would also need to be saved within the snapshot in the implementation.
Change to the Graph from the Previous Section
For the purposes of learning and discussion, it would be good for the target operator not to use “autocommit” for the connection. By not using autocommit, it should help illustrate the implementation considerations for snapshots. You will change the code to disable autocommit and manually control commits. Again, remember that the predefined operators for HANA would be the recommended method for this implementation but we are using custom code so that the reader benefits in understanding implementing snapshots.
Modifying Source operator for Snapshots
As mentioned in the previous section the snapshot for this graph needs to save the counter as it is variable that drives the other data generation.
1. Open the script editor for the Python 3 Operator (source).
Click on the Script icon for the Python 3 (source) operator when hovering over the Python3 Operator.
Paste following functions for the establishing the snapshot support which will save the state of the counter when a snapshot occur. Paste the code prior to the previous code that you already have for the operator.
import pickle
##Section 2.1: Identify that this operator support is stateful and supports snapshots.
api.set_initial_snapshot_info(api.InitialProcessInfo(is_stateful=True))
##Section 2.2: Create the serialization function to save the counter and create callback for the function
def serialize(epoch):
global counter
return pickle.dumps(counter)
api.set_serialize_callback(serialize)
##Section 2.3: Create the serialization function to restore the counter and create callback for the function
def restore(epoch, state_bytes):
global counter
counter = pickle.loads(state_bytes)
api.set_restore_callback(restore)
Note: The “pickle” library is required for Snapshots and you need to import this library.
Section 2.1: Identifies that this operator support is stateful and supports snapshots using the “set_initial_snapshot_info” method and set the variable is_stateful to True.
Section 2.2: Create the serialization function to save the counter and create callback for the function so that when a snapshot is made the serialize function is invoked. The serialization (“serialize”) returns the results of the pickle_dumps method to ensure proper serialization. The “set_serialize_callback” sets the callback to call the “serialize” function when the snapshot interval is reached and the graph is saves the state information for the graph.
Section 2.3: Create the restoration function to restore the counter from a previous state and create callback for this function. The restoration function (“restore”) returns the results of the pickle_loads method again to ensure proper serialization. The “set_restore_callback” sets the callback to call the “restore” function when the snapshot is resumed from a “paused” state such that when you select to resume the graph, this function is called.
Modifying Target operator for Snapshots
- Open the script editor for the Target operator.
Click on the Script icon for the Target operator when hovering over the Target Operator.
Paste following code for the establishing the snapshot support after the “import” statements current code that you already have for the Target operator. Again, remember to change the credentials for your HANA database within the restore function.
import pickle
dummyvar = True
##Section 2.1: Identify that this operator support is stateful and supports snapshots.
api.set_initial_snapshot_info(api.InitialProcessInfo(is_stateful=True))
##Section 2.2: Create the serialization function to save the counter and create callback for the function
def serialize(epoch):
global dummyvar
return pickle.dumps(dummyvar)
api.set_serialize_callback(serialize)
##Section 2.2: Ensure the database records are committed to the database
def complete_callback(epoch):
global conn
conn.commit()
api.logger.info(f"epoch {epoch} is completed!!!")
api.set_epoch_complete_callback(complete_callback)
##Section 2.4: Create the serialization function to restore the database and create callback for the function
def restore(epoch, state_bytes):
global dummyvar
global conn
conn = dbapi.connect(
address='<enteryourdbhosthere>',
port=443,
user='<enteryourusernamehere>',
password='<enteryourpasswordhere>',
encrypt='true',sslValidateCertificate='false', autocommit=False)
dummyvar = pickle.loads(state_bytes)
api.set_restore_callback(restore)
Note: The “pickle” library is required for Snapshots and you need to import this library as previously mentioned.
Section 2.1: Identifies that this operator support is stateful and supports snapshots using the “set_initial_snapshot_info” method and the variable set is_stateful to True.
Section 2.2: Create the serialization function which will simply serialize the value “True” create callback for the function. The “set_serialize_callback” sets the callback to call the “serialize” function when the snapshot interval is reached and the graph is saves the state information for the graph using the “serialize” function. Since there isn’t state information needed to save a dummyvar is used.
Section 2.3: Since the scenario is not using AUTOCOMMIT, the logic needs to ensure that the data is stored and committed to the database. Using the complete_callback function, which is called after the serialize function callback. Similarly if you need to flush data to disk this where you would want to implement the code to ensure the data is persisted.
Section 2.4: Create the restore function to restore the dummyvar value and re-establish the database connection. The “set_restore_callback” sets the callback to call the “restore” function when the snapshot is resumed from a “paused” state. Note that the restored connection does not use autocommit.
Tweak the Database code in the Target Operator
- Change the Connection code to use “autocommit=False”
Locate the connection assignment that is established at the outer most scope (ie not in the function for restore “conn=dbapi.connect”). Find the following code:
conn = dbapi.connect(
address='<enteryourdbhosthere>',
port=443,
user='<enteryourHANAuser>',
password='<enteryourpasswordhere>',
encrypt='true',sslValidateCertificate='false')
Change the above code to the following (of course change the credentials to match your HANA database) to control when the records are committed to the database:
conn = dbapi.connect(
address='<enteryourdbhosthere>',
port=443,
user='<enteryourHANAuser>',
password='<enteryourpasswordhere>',
encrypt='true',sslValidateCertificate='false' , autocommit=False)
2. Change the on_input function to commit the data to the database after inserting the data as the graph no longer uses autocommit.
Locate the on_input function and toggle the uncomment the code that last line of code in the function (conn.commit) so the that function now shows as follows:
def on_input(msg_id, header, body):
global conn
##Section 1: Read the table data
table_reader = body.get_reader()
tble = table_reader.read(-1)
##Section 2: Output the table data to a the output port
api.outputs.output.publish("df--"+str(tble))
##Section 3: Populate a dataframe and using a lambda function insert the records
## into the datbase
cursor = conn.cursor()
sql = "insert into MYTBL values (?,?,?)"
#sql = "upsert MYTBL values (?,?,?) with primary key"
cursor.executemany(sql, tble.body)
conn.commit()
3. Change the on_input function to use UPSERT to avoid primary key errors upon resuming the graph by uncommenting the second line above the commit code and commenting out the line third line above the line that you just modified. The function should now appear as follows:
def on_input(msg_id, header, body):
global conn
##Section 1: Read the table data
table_reader = body.get_reader()
tble = table_reader.read(-1)
##Section 2: Output the table data to a the output port
api.outputs.output.publish("df--"+str(tble))
##Section 3: Populate a dataframe and using a lambda function insert the records
## into the datbase
cursor = conn.cursor()
#sql = "insert into MYTBL values (?,?,?)"
sql = "upsert MYTBL values (?,?,?) with primary key"
cursor.executemany(sql, tble.body)
conn.commit()
4. Save the changes to the graph.
Click on the Save button as shown below.
5. Run Graph with Snapshots
Select the drop down for the “Run” icon and select “Run As”.
Provide an arbitrary name for this execution of the graph, below “testrun” is entered. Select “Capture Snapshots” and specify an interval; below 20 seconds is entered. Press OK.
Let the graph for a minute so that data is generated and at least one snapshot occurs.
6. Open the wiretap to see the data resulting from the graph.
Select the Wiretap operator and select the icon “Open UI” icon circled below.
In the new tab opened, you should see something similar to the following:
Note the numbers starting at zero in the Wiretap.
7. Pause the graph.
Click on the “Pause” button found in the “Status” pane on the right circled below.
8. Resume the graph
Click on the “Resume” button found in the Status pane on the right circled below.
9. Once the graph resumes, open the wiretap to see the data resulting from the graph.
Select the Wiretap operator and select the icon “Open UI” icon circled below.
In the new tab opened, you should see something similar to the following:
Note that the numbers are now no longer starting at zero, but rather starting at the last snapshot in the case above, the graph resumed at 80.
Congratulations, you just created a graph that supports Snapshots. Enjoy!
Hello Chris Gruber
Can you explain how does the snapshot feature work in scenarios of extracting data from ABAP based backend system. Imagine there is a CDS view with 1000 records and it has extracted 900 records and then the graph fails due to some reason. If I want to resume the graph later, would it resume from 901 records or does it also bring some duplicates of already extracted data into the target?
Regards,
Sandesh
If a snapshot was captured at the 900th record, the resuming of the graph would start at 901. That being said, the pipeline should accommodate for the possibility of duplicate records as this can happen as such you need to ensure that duplicates are addressed.
Chris Gruber
Good day!
May please require your valuable inputs for below
2. Also converting one value to another (key-value) based on the incoming message. For example
Is there any std operator?
I am not sure of your use case requirements as this description is fairly high level. There is a Data Transform operator that supports "CASE" which may be useful. Alternative you could use database lookups/joins or depending on the data, a dataframe/dict lookup with Python could be used.
Chris Gruber
I'm using SDI on premise 3.2.some of the functions doesn't support in this version and Graph looks like below
from kafka there is json message produced and one of the field name is '"country: ""', SDI should apply filter to consume only specific countries and send it to SAP S4, where we can maintain those country lists to do a filtration and consume the refined requests only?? I tried to do in data transform -> projection -> filter but problem is all country lists can be maintained there it should ideally be at runtime.
2. Also converting one value to another (key-value) based on the incoming message. For example
from json message there is a field name "service code: : "" which is the key and in SDI need to maintain value mapping to convert one value to another.Again this needs to be maintained some where at runtime.
Kindly share your precious inputs. Thanks!
If you need to have these configurations done at runtime (ie the values need to change at runtime) then I would be looking at using a database table or csv file that would filter the values. For example the Data Transform could "inner join" the values with your lookup table/file and this by definition would filter the values.
I do not understand the second point/question that you are making.
Regards
Chris
Perfect Chris Gruber Thanks a lot for your kind reply.
Wow 😊 I actually implemented the same yesterday and completed the POC Successfully. I maintained the entries in DB and used joins and projection rightly in data transform to filter the records and send only refined ones.
- > Chris Gruber Let me please make it very clear on Seconds point, incoming JSON there is a field name "service Code" which has value 'BEANT00' which indeed needs to be converted to 'MSG000' in SAP DI and then sent it to target application.
Do we have THE functionality/operator to maintain list of entries and convert the values ?
Thankyou!
SAP Data Intelligence does not have a lookup operator that uses a runtime definition for the lookups. That being said, to do something that is determined at runtime, you would need a external file/service or database to do the lookup and a Python and/or HANA SQL Client could be used.
For a file based approach you could a python and generate a dictionary for the lookups.
Consider your incoming message as follows:
myobj = {"id":10, "servicecode":'BEANT000',"other":'value'}
#you may want to use an external file system for best practices.
with open('/vrep/foo.csv', mode='r') as infile:
reader = csv.reader(infile)
mydict = {rows[0]:rows[1] for rows in reader}
myobj['servicecode']=mydict[myobj['servicecode']]
For a database approach you can use a python operator or the HANA SQL Client. The HANA SQL Client can take in a SQL string which can be used to do the lookup and replacement but also retain the other values for the object. Consider the following python that generates a SQL SELECT statement to do the lookup and retain the values of the other parts of the object where the table named "dblookuptable" has the above values, the column "servicecode" has the original value and the column "lookupdbvalue" has the mapped value.
sql = "select '{\"id\":\""+str(myobj['id'])+"\",\"servicecode\":\"'||lookupdbvalue|| '\", \"other\":\""+myobj['other']+"\"}' as replacedobj from dblookuptable where servicecode ='"+myobj['servicecode']+"'"
Or you can retain the values in separate columns in a more standard way and serialize the values after the lookup which may be more easily maintained. I am sure that there are other approaches as well.
I hope this helps.
Regards
Chris
Chris Gruber There are 8-10 value mapping conversion, should I really need to create 8-10 tables for each and achieve it using database approach using HANA SAP Client operator or Run HANA SQL? 🙂 or just proceed with file based approach and for file based which operator need to be used and any script involved to perform value mapping please elucidate?
sql = "select '{\"id\":\""+str(myobj['id'])+"\",\"servicecode\":\"'||lookupdbvalue|| '\", \"other\":\""+myobj['other']+"\"}' as replacedobj from dblookuptable where servicecode ='"+myobj['servicecode']+"'"
Table:dblookuptable
Many Thanks Chris Gruber
Chris Gruber
If Graphs are build on GEN 1, is there any way to setup Snapshot feature custom build? Lets say While fetching 100 records from HANA DB or while writing records to HANA DB and it has extracted/written 50 records and remaining 50 got failed due to some technical or business error and then the graph is dead, it will automatically restart and resume from 51 records ? or does it restart the complete extracts ?
Hello Rajesh, Generation 1 graphs do not support Snapshots. Generation 2 graphs are required for snapshots. Note that Kafka is available in Generation 2 graphs. I think you made a comment in a different blog referencing a Kafka requirement. I hope this helps.
Chris Gruber In SDI onpremise 3.2 there is no Kafka operator, avro decoder, node base operator, convertors and OPen API Operators 🙁 so henceforth Gen 1 is used to build graphs but snapshot feature is missing which is really important factor for error handlers and reprocessing. Do we have any custom feature to fulfill this please as part of Gen 1?
Hello
According to the documentation there are Generation 2 Kafka operators. For Avro, there are Python libraries for Avro encoding, for OpenAPI Client the Python "requests" module can be used within Python. Could your Node.js code be ported to Python?
Chris Gruber Its good if we have Snapshot in Generation1 because most of the standard operators are ready for use and scripting for each data orchestration will make it fuzzy. Node base operator is not available in SDI 3.2 under gen 2 operators.