Skip to Content
Product Information
Author's profile photo Chris Gruber

Generation 2 Graphs: Adding Snapshot support [Part 2 of 2]

Recap

In our first part of this blog series a simple graph created to dynamically produce a table with three columns with datatypes integer, string and float.  The graph will generate 10 rows data within every 5 seconds for the dynamic table and pass this information to an operator that will receive the data and populate a database and print out the data in a wiretap operator.  This blog is predicated that you completed the first part of the blog series and this graph exists.

 

Considerations for Snapshots

In this blog, the discussion will focus on adding support for snapshots to the previous graph.  It is important for you to consider what values need to be saved and restored and how things need to be replayed for snapshot support.  When running a graph with snapshots, snapshots occur periodically based on the user specified interval.   When resuming from a snapshot it is possible that the data may have been partially sent to the target operator in the previous execution as such some data may need to be “re-played”.  In the case of the graph in the first part of the blog series, the graph may have inserted records 1 through 100 into the database but it may still resume with the counter at 80 which means the graph will re-insert rows 80 through 100.  A properly implemented graph supporting snapshots needs to be able to be replay data without causing detrimental impact.  Additionally, you need to identify what you data need to restore.  In the graph being discussed the value of counter drives the row generation, so this variable needs to be “snapshotted” as it is key to the state information.

Finally, it is important to call out that the key operators for snapshots are “Sources” and “Targets”.  In graph being discussed the “Python 3 Operator” is the source and the operator renamed “Target” is the target operator in the graph.  There are markers or epochs that are used internally when snapshots are implemented to map snapshots between operators.  Some SAP operations use these markers to manage their own “buffers” particularly releasing information from the buffers.  For this reason, you should be implementing snapshots in the Target even if there is nothing to “save”, as your implementation will may help the source operators understand what they may be able to release based on the markers that the targets identify with as being saved.

Note: In reality there may be many operators between the source and target operators in a graph and data will be in transit through these operators so this example graph probably does not have the number of transition points as what many real implementations would have. Also, not all operators in the graph need to implement snapshot support.

For the graph upon which we are working with we need to consider saving the counter within the Source operator in the snapshot.  Also, the implementation needs to reconsider using “insert” into the database since this will cause primary key issues on the Target operator.

Learning Point: Lets briefly consider a scenario where the graph did not generate data dynamically but rather pulled data from a database table.  To support snapshot where  the database is source of the scenario, the code would need to query the database in a manner that guarantees order (typically via ORDER BY clause) and then keep track of what rows you had already sent (likely by using a counter variable).  Your implementation would need to include an ORDER BY and likely a LIMIT/OFFSET clause in the query. Similar to the example in the first part of the blog series, a counter is needed to keep track of the rows that you had already sent.  This counter would also need to be saved within the snapshot in the implementation.

Change to the Graph from the Previous Section

 

For the purposes of learning and discussion, it would be good for the target operator not to use “autocommit” for the connection.   By not using autocommit, it should help illustrate the implementation considerations for snapshots.  You will change the code to disable autocommit and manually control commits.  Again, remember that the predefined operators for HANA would be the recommended method for this implementation but we are using custom code so that the reader benefits in understanding implementing snapshots.

 

Modifying Source operator for Snapshots

As mentioned in the previous section the snapshot for this graph needs to save the counter as it is variable that drives the other data generation.

1. Open the script editor for the Python 3 Operator (source).

Click on the Script icon for the Python 3 (source) operator when hovering over the Python3 Operator.

Paste following functions for the establishing the snapshot support which will save the state of the counter when a snapshot occur.  Paste the code prior to the previous code that you already have for the operator.

import pickle

##Section 2.1:  Identify that this operator support is stateful and supports snapshots.
api.set_initial_snapshot_info(api.InitialProcessInfo(is_stateful=True))

##Section 2.2:  Create the serialization function to save the counter and create callback for the function
def serialize(epoch):
    global counter
    return pickle.dumps(counter)
 
api.set_serialize_callback(serialize)

##Section 2.3:  Create the serialization function to restore the counter and create callback for the function
def restore(epoch, state_bytes):
     global counter
     counter = pickle.loads(state_bytes)
 
api.set_restore_callback(restore)

 

Note: The “pickle” library is required for Snapshots and you need to import this library.

Section 2.1:  Identifies that this operator support is stateful and supports snapshots using the “set_initial_snapshot_info” method and set the variable is_stateful to True.

Section 2.2:  Create the serialization function to save the counter and create callback for the function so that when a snapshot is made the serialize function is invoked.  The serialization (“serialize”) returns the results of the pickle_dumps method to ensure proper serialization. The “set_serialize_callback” sets the callback to call the “serialize” function when the snapshot interval is reached and the graph is saves the state information for the graph.

Section 2.3:  Create the restoration function to restore the counter from a previous state and create callback for this function.  The restoration function (“restore”) returns the results of the pickle_loads method again to ensure proper serialization. The “set_restore_callback” sets the callback to call the “restore” function when the snapshot is resumed from a “paused” state such that when you select to resume the graph, this function is called.

 

 

Modifying Target operator for Snapshots

  1. Open the script editor for the Target operator.

Click on the Script icon for the Target operator when hovering over the Target Operator.

Paste following code for the establishing the snapshot support after the “import” statements current code that you already have for the Target operator.  Again, remember to change the credentials for your HANA database within the restore function.

import pickle
dummyvar = True

##Section 2.1:  Identify that this operator support is stateful and supports snapshots.
api.set_initial_snapshot_info(api.InitialProcessInfo(is_stateful=True))


##Section 2.2:  Create the serialization function to save the counter and create callback for the function
def serialize(epoch):
    global dummyvar
    return pickle.dumps(dummyvar)

api.set_serialize_callback(serialize)

##Section 2.2: Ensure the database records are committed to the database
def complete_callback(epoch):
    global conn
    conn.commit()
    api.logger.info(f"epoch {epoch} is completed!!!")

api.set_epoch_complete_callback(complete_callback)

##Section 2.4:  Create the serialization function to restore the database and create callback for the function
def restore(epoch, state_bytes):
    global dummyvar
    global conn
    conn = dbapi.connect(
        address='<enteryourdbhosthere>',
        port=443,
        user='<enteryourusernamehere>',
        password='<enteryourpasswordhere>',
        encrypt='true',sslValidateCertificate='false', autocommit=False)
    dummyvar = pickle.loads(state_bytes)

api.set_restore_callback(restore)

Note: The “pickle” library is required for Snapshots and you need to import this library as previously mentioned.

Section 2.1:  Identifies that this operator support is stateful and supports snapshots using the “set_initial_snapshot_info” method and the variable set is_stateful to True.

Section 2.2:  Create the serialization function which will simply serialize the value “True” create callback for the function.  The “set_serialize_callback” sets the callback to call the “serialize” function when the snapshot interval is reached and the graph is saves the state information for the graph using the “serialize” function.  Since there isn’t state information needed to save a dummyvar is used.

Section 2.3: Since the scenario is not using AUTOCOMMIT, the logic needs to ensure that the data is stored and committed to the database.  Using the complete_callback function, which is called after the serialize function callback. Similarly if you need to flush data to disk this where you would want to implement the code to ensure the data is persisted.

Section 2.4:  Create the restore function to restore the dummyvar value and re-establish the database connection.  The “set_restore_callback” sets the callback to call the “restore” function when the snapshot is resumed from a “paused” state.  Note that the restored connection does not use autocommit.

 

Tweak the Database code in the Target Operator

  1. Change the Connection code to use “autocommit=False”

Locate the connection assignment that is established at the outer most scope (ie not in the function for restore “conn=dbapi.connect”).  Find the following code:

conn = dbapi.connect(
    address='<enteryourdbhosthere>',
    port=443,
    user='<enteryourHANAuser>',
    password='<enteryourpasswordhere>',
    encrypt='true',sslValidateCertificate='false')

Change the above code to the following (of course change the credentials to match your HANA database) to control when the records are committed to the database:

conn = dbapi.connect(
    address='<enteryourdbhosthere>',
    port=443,
    user='<enteryourHANAuser>',
    password='<enteryourpasswordhere>',
    encrypt='true',sslValidateCertificate='false' , autocommit=False)

2. Change the on_input function to commit the data to the database after inserting the data as the graph no longer uses autocommit.

Locate the on_input function and toggle the uncomment the code that last line of code in the function (conn.commit) so the that function now shows as follows:

def on_input(msg_id, header, body):
    global conn
    ##Section 1: Read the table data
    table_reader = body.get_reader()
    tble = table_reader.read(-1)

    ##Section 2: Output the table data to a the output port
    api.outputs.output.publish("df--"+str(tble))

    ##Section 3: Populate a dataframe and using a lambda function insert the records
    ##           into the datbase 
    cursor = conn.cursor()
    
    sql = "insert into MYTBL values (?,?,?)"
    #sql = "upsert MYTBL values (?,?,?) with primary key"
    cursor.executemany(sql, tble.body)
    conn.commit()

 

3. Change the on_input function to use UPSERT to avoid primary key errors upon resuming the graph by uncommenting the second line above the commit code and commenting out the line third line above the line that you just modified.  The function should now appear as follows:

def on_input(msg_id, header, body):
    global conn
    ##Section 1: Read the table data
    table_reader = body.get_reader()
    tble = table_reader.read(-1)

    ##Section 2: Output the table data to a the output port
    api.outputs.output.publish("df--"+str(tble))

    ##Section 3: Populate a dataframe and using a lambda function insert the records
    ##           into the datbase 
    cursor = conn.cursor()
    
    #sql = "insert into MYTBL values (?,?,?)"
    sql = "upsert MYTBL values (?,?,?) with primary key"
    cursor.executemany(sql, tble.body)
    conn.commit()

 

4. Save the changes to the graph.

Click on the Save button as shown below.

5. Run Graph with Snapshots

Select the drop down for the “Run” icon and select “Run As”.

Provide an arbitrary name for this execution of the graph, below “testrun” is entered.  Select “Capture Snapshots” and specify an interval; below 20 seconds is entered.  Press OK.

Let the graph for a minute so that data is generated and at least one snapshot occurs.

6. Open the wiretap to see the data resulting from the graph.

Select the Wiretap operator and select the icon “Open UI” icon circled below.

In the new tab opened, you should see something similar to the following:

Note the numbers starting at zero in the Wiretap.

7. Pause the graph.

Click on the “Pause” button found in the “Status” pane on the right circled below.

8. Resume the graph

Click on the “Resume” button found in the Status pane on the right circled below.

9. Once the graph resumes, open the wiretap to see the data resulting from the graph.

Select the Wiretap operator and select the icon “Open UI” icon circled below.

In the new tab opened, you should see something similar to the following:

Note that the numbers are now no longer starting at zero, but rather starting at the last snapshot in the case above, the graph resumed at 80.

 

Congratulations, you just created a graph that supports Snapshots.  Enjoy!

Assigned Tags

      Be the first to leave a comment
      You must be Logged on to comment or reply to a post.