Hana Smart Data Integration – Inside Realtime streams
Most Hana Adapters do support realtime push of changes. Not only for databases as sources but everything, e.g. Twitter or adapters you wrote. While the documentation contains all the information needed from an Adapter developer and user perspective, I’d like to show the internals that might be helpful.
As shown in the blog entry about adapters and their architecture (see Hana Smart Data Integration – Adapters) and the Adapter SDK manual (SAP HANA Data Provisioning Adapter SDK – SAP Library), Adapters provide an interface to interact with the Hana database that revolve about remote sources (=connection to the remote source system) and virtual tables (=the structure of remote information).
For realtime the remote subscription is the central Hana object.
Hana remote subscriptions
The syntax for this command is quite self explanatory:
create remote subscription <subscriptionname> using (select * from <virtual_table_name> where …) target [table | task | procedure] <target_name>;
Hana will send the passed SQL select of that command to the Smart Data Access layer and depending on the capabilities of the adapter, as much as possible is passed to the Adapter. The Adapter can do whatever it takes to get changes, send them to Hana and there the change rows are put either into a target table, a target task (=transformation) or a target stored procedure.
The remote subscription object contains all of this information, as a query on the catalog object shows:
select * from remote_subscriptions;
As seen from the Adapter, above create remote subscription command does nothing. All it does is basic logical validations like checking if such virtual table exists, if the SQL is simple enough for being pushed to the adapter, if the target object exists and the selected columns match the target structure. All checks performed on metadata Hana has already.
Activating a remote subscription
Replicating a table consists of two parts, the initial load to get all current data into the target tables and then applying the changes onward. But in what order?
The usual answer is to set the source system to read only, then perform the initial load, then activate the change data processing and allow users to modify data after. As the initial load can take hours, such down time is not very appreciated. Therefore we designed the remote subscription activation to support two phases.
First phase is initiated with the command
alter remote subscription <subscriptionname> queue;
With this command the Adapter will be notified to start capturing changes in the source. The adapter gets all the information required for that, a Hana connection to use, the SQL select so it knows the table, columns and potential filters. The only thing the adapter is required to do is to add a BeginMarker row into the stream of rows being sent. This is a single line of code in the Adapter and it will tell the Hana receiver that at this point the Adapter started to produce changes for this table.
The adapter does replicate CUSTOMER already, now above remote-subscription-queue command was issued for a subscription using the remote table REGION. Such stream of changes might look like
|13.01:55.0000||CUSTOMER||insert||insert into customer(key, name) values (100,’John’);|
|13:02:56.0000||CUSTOMER||update||update customer set name = ‘Franck’ where key = 7;|
|13:02:56.0000||CUSTOMER||update||update customer set name = ‘Frank’ where key = 7;|
|13:47:33.0000||REGION||BeginMarker||BeginMarker for table REGION|
|13:55:10.0000||REGION||insert||insert into region(region, isocode) value (‘US’, ‘US’);|
The Hana server will take all incoming change rows and process them normally, rows for subscriptions that are in queue mode only, that is a BeginMarker was found in the stream but no EndMarker yet, will be queued on the Hana server. In above example, the CUSTOMER rows end up normally in its target table, the target table for REGION rows will remain empty for now.
Therefore the initial load can be started and it does not have to worry about changes that happened. From the looks of the initial load, the target table is empty and not a single change will be loaded.
Once the initial load is finished, the command
alter remote subscription <subscriptionname> distribute;
should be executed. This will tell the Adapter to add a EndMarker into the stream of data.
When the Hana server finds such EndMarker row it starts to empty the queue and apply the changes to the target table. All rows between the Begin- EndMarker for the given table are loaded carefully, as it is unknown if those had been covered by the initial load already or haven’t. Technically that is quite simple, the insert/update rows are loaded with an upsert command, hence either inserted if the initial load did not find them or updated if already present. Rows of the ChangeType Delete are deleted of course.
All rows after the EndMarker are processed normally.
During operation various errors can happen. The Adapter has a problem with the source and does raise an exception. The Adapter or the Agent itself dies. The network connection between Hana and Agent is interrupted. Hana was shutdown….
In all these cases the issue is logged as exception in a Hana catalog table.
select * from remote_subscription_exceptions;
In above instance the connection between the Agent and Hana was interrupted. Therefore the adapter got a close() call and should have cleaned up everything. Brought in a state where nothing is active anymore. On the Hana side an remote subscription exception is shown with an EXCEPTION_OID. Using this unique row number the exception can be cleared and the connection re-established, using the command
process remote subscription exception 42 retry;
This command will reestablish the connection with the Agent and its Adapter, send all remote subscription definitions to the adapter again plus the information where the adapter should start again. The adapter then has to start reading the changes from this point onward.
Pausing realtime propagation
Another situation might be to either pause the capture of changes in the source or to pause applying the changes into the target objects. This cannot be done on remote subscription level but for the entire source system using the command
alter remote source <name> suspend capture;
alter remote source <name> suspend distribution;
and the reverse operation
alter remote source <name> resume capture;
alter remote source <name> resume distribution;
The magic of Change Types
Whenever an adapter creates realtime changes, these CDC Rows have a RowType, in the UI called Change Type, which is either insert, update, delete, or something else. This Change Type information is used when loading a target table or inside a task to process the data correctly.
For simple 1:1 replications the handling of the Change Type is quite straight forward, the Change Type is used as the loading command, so an insert row is inserted, a delete row deleted etc.
Therefore it is important that the adapter sends useful Change Types. Take the RSS Adapter with its virtual table RSSFEED. The adapter polls the URL, gets the latest news headers and they should be loaded. The primary key of the virtual table is the URI of the news headline and so has the replicated target table.
If the adapter would send all rows with Change Type = Insert, the first realtime transaction would insert the headlines, the second iteration fail with a primary key violation. An RSS Feed simply does not know what was changed, what had been received already. Not even the Adapter knows that for sure as the Adapter might have been restarted and as seen from its perspective it is the first read, it has no idea what happened before it was stopped.
One solution to this would be to send two rows, a Delete row plus an insert row. Would certainly work but cause a huge overhead in the database as twice as many rows are sent and deleting rows and inserting again, even if not changed, is expensive as well.
The solution was to add more Change Types to simplify adapter development. In case of above RSS Adapter, the RowType Upsert was used.
Another special Change Type is the eXterminate value. Imagine a subscription using the SQL “select * from twitter where region = ‘US'” and let’s assume this filter cannot be passed to the Adapter but is executed in the Hana Federation layer.
So Twitter sends rows from all regions to Hana, in Hana the filter region = ‘US’ is applied and only the resulting ones are loaded. No problem. Except for Delete messages from Twitter. Because Twitter does not tell all values, only the TweetID of the Tweet to be removed. So the adapter does send a row with the column TweetID being filled, all other columns are null, especially the region column. Hence this delete row does not pass the filter region=’US’ and will never make it to the target table. Therefore, instead of sending such row as Delete, the Twitter adapter does send this row as eXterminate row.
This tells the applier process that only the primary key is filled and it does not use the filter condition on those rows.
Another Change Type is Truncate. Using this the Adapter can tell to delete many rows at once. An obvious example is, in a database source somebody emptied the source table using a truncate table command. The adapter might send a truncate row with all columns being NULL, instead of deleting every single row. But with the Truncate Change Type subsets of data can be deleted as well. All the Adapter has to do is sending a truncate row with some columns having a value. For example, an Adapter might send a truncate row with region=’US’ to delete all rows where the region = ‘US’. That might sound as a weird example but imagine a source database command like “alter table drop partition”.
Another use case of the Truncate Change Type goes together with the Replace rows. Imagine an Adapter that does notknow what row has been changed, only that something changed within a dataset. Let’s say it is a file adapter and whenever a file appears in a directory, the entire file is sent into the target table. It might happen that a file contained wrong data and hence is put into the directory with the same name as previously. None of the above Change Types can deal with that situation. Insert would result in a primary key violation, upsert would work but what if the file contains less rows as one got deleted?
The solution is to send a first truncate row with the file name column being set, hence the command “delete from target where filename = ?” will be executed and now all rows of the file can be inserted again. But use the Change Type Replace instead of Insert. It does the same thing internally, all replace rows are being inserted but it helps to understand that these Replace rows belong to the previous Truncate row and additional optimizations and validations can be done.
All of the above Change Types work with Tasks as target as well. Understanding what each transform has to do for each row was hard, very hard in fact. But the advantage we get is, complete dataflows do not work in batch but can transform realtime streams of data as well. No delta loads needed, the initial load dataflow can be turned into a realtime task receiving the changes. Per SPS09 for single tables only, but how to deal with joins in realtime is the next big thing.