Technical Articles
Build Log-based integration with SAP Data Intelligence
Log-based integration
Log-based integration is getting popular for integrating different data systems. By capturing the changes in a database and continually apply the same changes to derived data systems, the data in the derived systems is match with the data in the database as long as the message ordering of changed data stream is preserved. The derived data systems are consumers of the changed data stream, as illustrated in below figure.
The derived data systems in practice might be indexes, caches, analytics systems, etc.
In order to build a log-based integtation, we need two critical tools:
- A CDC tool for capturing database changes.
- A message broker for persisting messages stream while preserving the message ordering.
In this tutorial, we want to explore how to achieve data integration between database and various other derived data systems using a log-based approach in SAP Data Intelligence.
CDC
Change data capture(CDC) is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.
SAP Data Intelligence 3.0 introduced the table Replicator operator which allows capturing delta changes for different databases. Table Replicator operator effectively implements the CDC using an approach of trigger-based replication.
Kafka
Apache Kafka is a message broker which persists the messages it receives. It provides a total ordering of the messages inside each partition. SAP Data Intelligence has built-in Kafka operators which allows us to use a Kafka cluster.
Next, we will see how to leverage these two operators to implement a concrete integration scenario.
Preparation
The table content is illustrated as below.
The scenario involves two tasks.
- An initial loading of the data in source table to a target file in Data Intelligence local file system.
- Continuously capture source table changes(Delta extraction), publish the change messages into a Kafka topic and let derived systems consume those changes.
We will create separate graphs for these two tasks.
Initial loading
The following figure illustrates the initial loading graph:
The Table replicator operator configuration is illustrated as below.
The important configuration parameters are marked in red box.
Note that the deltaGrapMode is set to Manual. This ensures the graph would complete its execution as soon as the initial loading completed. Otherwise, the graph would run indefinitely to track further delta changes.
Open the file and verify the table content has been replicated successfully.
Note that delta record provides a row type like Insert (I), Update (U), Delete (D), Before Image (B), Unknown (X), and so on. In our case of initial loading, the row type is I which means insert record. The row type in the message is named as DI_OPERATION_TYPE.
Our initial loading task has been finished successfully. Next, let’s turn to the Delta extraction task.
Delta extraction
The following figure illustrates the Delta extraction graph:
Let’s take an overview of the dataflow sequence.
- The constant generator operator(1) will trigger the Table Replicator to begin the CDC delta tracking.
- The Table Replicator operator (2) will replicate the database changes to a target file like what happens in the initial loading.
- The JS operator (3) will remove the ‘/vrep’ prefix from the target file path. The prefix was added by Table Replicator operator which will prevent the downstream Read File operator from finding the file if we do not remove it.
- The Read File operator (4) will read the target file content and send it to downstream JS operator.
- The JS operator (5) will parse the received file content and send the parsed messages into Kafka topic.
- The Kafka Producer operator (6) will receive the incoming messages and publish them into the specified topic on the Kafka cluster.
- The Kafka Consumers (7, 8) receive the messages from the topic on the Kafka cluster.
- The Connected Wiretap operators (9, 10) act as the derived data system consume and apply the received messages.
Now let’s look at the configuration of some operators.
Table Replicator
Its configuration is illustrated as below.
Note that the deltaGrapMode is set to “Polling Interval”, and the maxPollingInterval is set to “60”. This ensures the graph would run indefinitely to track delta changes, and Table Replicator will polling the delta changes within one minute.
The “Remove path prefix” JS operator
$.setPortCallback("input",onInput);
function onInput(ctx,s) {
var msg = {};
msg.Attributes = {};
for (var key in s.Attributes) {
msg.Attributes[key] = s.Attributes[key];
}
msg.Body = s.Body.replace(/^\/vrep/, '');
$.output(msg);
}
It simply removes the “/vrep” prefix from the received message body which contains the target file path.
The “Parse & send changes JS operator
Its script code is shown as below.
$.setPortCallback("input",onInput);
function isByteArray(data) {
switch (Object.prototype.toString.call(data)) {
case "[object Int8Array]":
case "[object Uint8Array]":
return true;
case "[object Array]":
case "[object GoArray]":
return data.length > 0 && typeof data[0] === 'number';
}
return false;
}
function onInput(ctx,s) {
var inbody = s.Body;
var inattributes = s.Attributes;
var msg = {};
msg.Attributes = {};
for (var key in inattributes) {
msg.Attributes[key] = inattributes[key];
}
// convert the body into string if it is bytes
if (isByteArray(inbody)) {
inbody = String.fromCharCode.apply(null, inbody);
}
var lines = inbody.split(/\r\n/);
if (typeof inbody === 'string') {
// if the body is a string (e.g., a plain text or json string),
msg.Attributes["js.action"] = "parseFile";
var readOffset = 1;
var dataCols = lines[0].split(',');
var o_inter = {};
var fields = [];
lines.slice(readOffset).forEach(function(line) {
if(line.length !== 0){
fields = line.split(',')
dataCols.forEach(function(c, i) {
o_inter[c] = fields[i];
});
++readOffset;
msg.Body = o_inter;
$.output(msg);
}
});
}
else {
// if the body is an object (e.g., a json object),
// forward the body and indicate it in attribute js.action
msg.Body = inbody;
msg.Attributes["js.action"] = "noop";
$.output(msg);
}
}
It parses the received file content line by line. And each line represents a message and will be immediately sent to the downstream operator.
Kafka Producer
Its configuration is illustrated as below.
Kafka Consumer 1
Its configuration is illustrated as below.
Kafka Consumer 2
Note that the “Group ID” configuration are different for the two consumers. This is important since we want to achieve fan-out messaging. That is, a single Kafka partition consumed by multiple consumers, each maintaining its own message offset.
Now we can start run the graph by clicking the run button. Next we will start to write to source table through insert, update and delete operations and observe how the changes will be replicated into the derived systems.
Insert
Let’s insert one record into the source table by running the below SQL statement.
INSERT INTO "TRIALEDITION1"."HANA_SOURCE" VALUES (7, 3, 40)
Go to Data Intelligence Local file system work spaces, check and verify the target file has been generated, as below figure illustrated.
Open that file and verify the message for the inserted record has been replicated to the target file successfully, as below figure illustrated.
Finally, we open the change consumer wiretap UI to check its output, as below figure illustrated.
Update
Let’s update one record in the source table by running the below SQL statement.
UPDATE "TRIALEDITION1"."HANA_SOURCE" SET TEMPERATURE = TEMPERATURE+ 1 WHERE ID = 1
Check and verify the target file has been generated, as below figure illustrated.
Verify the messages for the updated record have been replicated to the target file successfully, as below figure illustrated.
Note that for Update statement, its change is composed by two messages. Their row types are Before Image (B) and Update (U) respectively.
The Change consumer wiretap UI output is as below figure illustrated.
Delete
Let’s delete one record in the source table by running the below SQL statement.
DELETE FROM "TRIALEDITION1"."HANA_SOURCE" WHERE ID = 7
Check and verify the target file has been generated, as below figure illustrated.
Verify the change message for the deleted record has been replicated to the target file successfully, as below figure illustrated.
The Change consumer wiretap UI output is as below figure illustrated.
Summary
This tutorial shows how it is easy to build a data integration pipeline to keep different data systems in sync using SAP Data Intelligence’s built-in operators. Hope this will helpful for you to get started with your own journey on SAP Data Intelligence.
Hi Andy,
Thank you for such a nice article. Would this work with table to table replication, i.e if souce and target are both hana tables. Also, my requirement is to sync the data once a week, so graph should run once a week and replicate the delta changes from source and target. Can you please suggest what changes will I have to do ?
Thank You.
Hi Vikas,
For both source and target are hana tables in your case, use the "Table Replicator" operator itself should be adequate. After your initial loading is complete, you can run your weekly delta change replication by setting the "Delta Graph Run Mode" field to "Manual" and scheduling the graph to run periodically. Please refer to the "Table Replicator" details in https://help.sap.com/viewer/f66004fabfae4cd4974b54b6dd41993d/3.1.2/en-US/79fcadb91f584f868a6662111b92f6e7.html?q=Table%20Replicator%20Task
and the "Initial Load + Delta Extraction from AnyDB" sample graph in https://help.sap.com/viewer/f66004fabfae4cd4974b54b6dd41993d/3.1.2/en-US/099adc5b14184bf0abaab0ea4bd31ed8.html
Hope this useful.
Hello Andy,
Thank you for a great article.It was a wonderful read and learning!
I am working on a use case which involves reading large number of files in a dir from s3 bucket.The file events are stored in SQS (FIFO) and is processed by Lambda compute service on AWS .These files are read into SAP Data Intelligence and are staged/upserted in HANA.
Can Lambda service (being triggered by SQS, FIFO ) at source be mirrored in DI using operators?
The events I am searching for are:
Log of all the files arriving in S3 bucket
Event parsing and processing
Retrieving next batch of file events and process each file event
I'm still trying to understand Lambda, so please understand if something doesn't make sense.
Thanks,
Indu.
Hi Indu,
For your case, you may try the Monitor Files operator in SAP Data intelligence to monitor the specified file events in your S3.
You can refer to the sample graph Ingest Files into HANA for continuously ingesting data from files into a HANA table.
Repository Objects Reference for SAP Data Intelligence is a good place to check the information about the built-in operators and to learn sample graphs that SAP Data Intelligence provides for use in SAP Data Intelligence Modeler. You can also create your own operator if there is no built-in operator can satisfy your specific requirement.
Regards,
/Andy
Hello Andy,
Thank you for the answer, I have been trying the same approach with monitor operator.
I have few questions if you can please answer:
Thanks,
Indu Khurana.
Hi Indu,
I guess you are doing data ingestion from a source system to HANA. For data ingestion, we have the options to use batch ingestion or streaming ingestion. SAP Data Intelligence supports both.
If your use case is streaming ingestion, and your have a way to get the CDC messages for your source system, then you can use the approach introduced in this blog to do your work.
If your use case is batch ingestion and use the ELT pattern, you are probably periodically doing incremental data extraction from your source system and upload them into S3 storage, you can choose an appropriate File operator to read the S3 file content and load them into HANA by simply inserting into table. In this case, you do not need to consider upsert since both the original record and the updated record will coexist in the table. And you can differentiate them with the timestamp information(suppose your source system records has a field identifying the last updated timestamp). Of course, incremental batch ingestion cannot capture the delete operation.
If you really want to get ride of the deleted records and do not want to keep the original records of the updated ones. You may change the incremental batch ingestion into a full batch ingestion. To do that, you need to do a full data extraction from source system to S3, then read the S3 file and load the records into HANA by inserting into table and changing the "Table initialization" configuration of the Hana Client operator to Truncate.
After the data ingestion, you can transform the ingested data into the data model according to your specific needs.
For question 1: If you really want to replicate from source system into HANA , choose the streaming ingestion(CDC approach) or use the full batch ingestion.
For question 2: If you really want to use Hana Client operator to create a HANA table with primary key setting, you simply need to pass the SQL create table statement into the sql port of the Hana Client operator. You can achieve this by filling your SQL statement in a "Constant Generator" operator and connect it to the sql port of the Hana Client operator.
For question 3: The Ingest Files into HANA example illustrated a way of how to guarantee each file is processed at least once if HANA is down temporary by rerunning the graph when HANA restored.
Thank you Andy for the detailed answer.
I really appreciate you taking out time and providing insights.
Good Day to you!
Indu.
What is it about the last JS operator?
Hi Carlos,
You can remove both the last JS operator and the "graph terminator" operator for the Delta extraction graph. They are NOT necessary since delta extraction is a never-ending process and it will never reach to those two operators. Thanks for pointing out this.
Hello Andy,
I am trying to use the table replicator operator for reading tables from Azure SQL cloud into SAP HANA .
The Initial and delta load are working fine but there are missing key and values ( max/no of batches and number of rows) in checkpoint table. Besides, shadow and shadow view are never populated with any value. Is that expected behavior or am I missing something?
Also, the primary key details of the source table are missing in check point table and the target HANA table.The metadata of the source table is not moved along when replicated?
Please suggest.
Thanks,
Indu.
Hi Indu,
As long as your Initial and delta load are working fine, why are you bothering to check the checkpoint table etc. ? Those tables are part of the internal implementation for trigger-based replication used by the table replicator operator itself. I believe for most of the time, we do not need to check those tables unless you have a good reason probably for troubleshooting something.
For the primary keys, according to the operator specification, when HANA as target, other than the "Track Change History" mode, it supports an additional mode "Apply Change Data". You can try to run your pipeline again with mode "Apply Change Data" and check it.
Thank you Andy for your response.
I was hoping I could use the checkpoint/shadow tables for logs and exception management. (I was testing with HANA DB as source and target)
Also, now I have started working with actual data in Azure SQL DB, I ran the graph with table replicator operator and it failed with error " CREATE TABLE permission denied at the database".
I understand this is a pre-requisite for table replicator operator, but how can I asses the changes being made by this operator at the source database. If I request this access from the DB team, would it be a right approach going forward in production as well?
Please suggest.
Regards,
Indu Khurana
Hi Indu,
You can refer to the documentation of Table Replicator , it specifies what required SQL Privileges need to be granted for the connection user.
The "Using a Custom Schema on Table Replicator" section in that documentation has a very detailed specification on what and how to grant the necessary privileges for various supported sources before you can run the Table Replicator operator. After your identify the grant SQL statements for your specific source, you can request the access from the DB admin.
Due to the nature of trigger-based replication, it has overhead on the source. I recommend you do some tests on your use cases before taking it into production. You can also read some suggestions on Improving CDC Graph Generator Operator Performance.
Thank you very much Andy for the useful links.
I can see with custom schema, we need only few privileges on Source schema, which looks like a good approach but creates a dependency between two schemas. Unfortunately, I can not test it until I apply for the required privileges on Azure SQL.
I will try to test a similar approach on HANA as a source, to see if there are any challenges.
Regards,
Indu.
Hi Andy,
I can't find configuration parameters for adding custom schema in table replicator operator.
I have checked the config schema file properties as well.
Can you suggest anything?
Thanks,
IK.
Hi Indu,
Sorry for the inconvenience. It looks like your DI is not the latest version, so there is no configuration parameter yet.
I read table replicator documentation in an earlier version, found the statements like below:
Note: The following source specific SQL Privileges are required. Please ensure the connection user has the following privileges.
CREATE TABLE
CREATE VIEW
CREATE SEQUENCE
CREATE PROCEDURE
CREATE TRIGGER
So, please find the required privileges as per the modeler documentation your are currently using(you can find the doc by right clicking the operator and choose the "Open Documentation" from the context menu).
Thanks Andy.
So, with the current version I have to go with privileges on source schema only.
But, there is still one issue pending with table replicator, which is primary keys. I tested with HANA as source db and the primary keys of the table were not replicated to the target tables.
So, for all the tables, primary key has to enforced onto the target tables!!?
(Also, there is no provision of unique key check in this version)
Thanks,
Indu.
Hi Indu,
Some thoughts in my mind: CDC tracks the change history for all the records of a specific source table. The captured messages include INSETR, UPDATE and DELETE operation types. It is very likely a record would go through the insert, update, even delete operations in the source table. So the target would have multiple CDC messages for a unique record in the source. This full history of a record is attractive for some analytic purpose. One popular usage for CDC messages is to translate and load them into a data warehouse for analytic usage.
For this operator, when target is HANA and you choose Apply Change Data instead of Track Change History, this operator will apply the changes to the target HANA table which means you replicate the data from source to the target table instead of a list of CDC messages. Please notice in this case the replication is a FULL table replication which means if you run the graph again, the full data in the source HANA will be replicated to the target HANA again. This has two implications depending on your settings:
Hello Andy,
For Initial load, If I create my target table with primary key before loading the data, it works for the first time with append, for the next time, it fails. But IF I change it to overwrite, then the primary key is also removed in the target table ,as the table schema is updated with new one from table producer/flowagent table producer.
It is frustrating! 🙁
Hi Indu,
The new version of this operator provides one more option "truncate" for "Target Write Mode" which keeps the primary keys setting should eliminates your primary key concern.
Hi Andy,
I will try this option, but right now, my approach is to create three tables with primary key before loading with Hana initializer, then mapped it to SQL consumer to run a query to extract 15 years of data from azure SQL DB, writing this result to a table producer : tab1 ( in append mode, to keep the primary key , in overwrite and truncate , primary key gets wiped out).( some DQ rules to be done here in this tab1) . Mapped further to Hana table consumer to read tab1 and flowagent table producer ( only keep PK with append, with truncate and overwrite , PK is removed) to move it to staging table in another schema.
At the very first landing point of data from azure to Hana, I have to use append to keep the PK.
P.S.: I have tried using alter table option to add PK later , but that fails with Hana SQL, flowagent SQL, SQL consumer.
BR,IK.
Hello Andy,
I tried with table replicator approach, in overwrite (O)and truncate(T) mode: getting below error for source shadow table view:
Group messages: Group: default; Messages: Graph failure: tablereplicator1 failed with the following error: com.sap.hana.dp.adapter.sdk.AdapterException: Get metadata request could not be performed due to error: Base table or view not found;259 invalid table name: Could not find table/view DLTRANS_PK_24_SHV in schema S1: line 1 col 34 (at pos 33) Context: com.sap.hana.dp.adapter.sdk.AdapterException: Get metadata request could not be performed due to error: Base table or view not found;259 invalid table name: Could not find table/view DLTRANS_PK_24_SHV in schema S1 : line 1 col 34 (at pos 33) at dsadapter.util.AdapterExceptionHelper.getMDSAdapterException(AdapterExceptionHelper.java:135) at dsadapter.mds.RequestHandler.handleMessage(RequestHandler.java:72) at dsadapter.mds.MDSServer.importMetadata(MDSServer.java:352) at dsadapter.mds.MDSServer.importMetadata(MDSServer.java:340) at dsadapter.sdi.adapters.GenericDatabaseAdapterDataStore.importMetadata(GenericDatabaseAdapterDataStore.java:340) at dsadapter.vflow.VFlowHelper.getMetadata(VFlowHelper.java:202) at dsadapter.vflow.VFlowHelper.getMetadata(VFlowHelper.java:291) at dsadapter.vflow.VFlowHelper.getProperties(VFlowHelper.java:486) at dsadapter.vflow.jobs.DataservicesETLJob.getProperties(DataservicesETLJob.java:131) at dsadapter.vflow.jobs.DataservicesIngestionJob.compileIngestionGraph(DataservicesIngestionJob.jav
Sorted 🙂
Truncate worked with flowagent producer.
A bigg thank you for you support Andy. If there was an award I could give to you, I would give you "The amazing support" award 🙂
Hi Indu,
You are welcome. Glad to hear you are working good.
Thank you Andy for the reply, but I did not get the approval to get the privileges required for table replicator operator on source database.
Hence, I am proceeding with standard way of SQL consumer, table producer for initial load. For Delta, we would be using audit trail from database itself.
But primary key addition to target schema still remains an issue.
Thanks,
Indu.
Hi Andy, Thanks for a really great blog. I have a need to replicate SAP HANA data to snowflake. I was exploring Flowagent SQL executor Snowflake connector for this with Table replicator. Please advice if we can use Table Replicate --> Local File system --> Read File --> Snowflake connector to replicate data to snowflake?
My idea is to use Micro batches (by setting higher polling interval in table replicator) of CDC records captured by replicator and
Would this solution scale to handle high transactions in HANA?
Thanks.
Hi Sachin,
It is not encouraged to write to the DI local file system(this blog used that only for illustration purpose). You can write the Table Replicator output directly to a cloud storage like S3. Then you can read, parse the output file and probably interpret the messages into corresponding INSERT, UPDATE, DELETE statements and feed them to the downstream Flowagent SQL executor connector you mentioned.
I am not sure if the target snowflake processing can keep up with the speed your source HANA sent with your configuration. If not, the flow control of backpressure will be applied. So you may need to do some test and configure your settings accordingly as per your use case.
One suggestion is to send the parsed messages to Kafka like this blog did. This way, you do not need to consider if the target processing speed matching with the source delivery speed since they are decoupled now. Also if there is some error happen in the target, you can replay the whole consumption.
Thanks for the quick reply Andy.
Can the Kafka Consumer operator be connected to SQL executor directly i.e. can messages from Kafka Consumer operator be sent to SQL executor to write to snowflake?
Thanks
Hi Sachin,
I don't think you can connect the output of the Kafka consumer directly to the flowagent SQL executor operator since the output are CDC messages while the input port of the flowagent SQL executor operator expects sql statement string. As I mentioned, you need to parse the CDC messages and convert them into appropriate sql statements for flowagent SQL executor operator to execute.
Thanks Andy!
Hello Andy,
I have a question on table replicator operator(TR op).
With TR op, the triggers are created on the source table (tab 1),delta is extracted and applied on target tables.
If there are multiple vendors who want to extract delta of a same table(tab 1), how would they achieve it as we can not create another target table from same source table.
Is it like we have to cleanup after graph termination?
Please advise.
Thanks,
Indu.
Hi Indu,
I am not quite understand what's the meaning of multiple vendors. If you mean there are different people want to extract the delta from the same source table, then they would most likely create a separate pipeline with its own target setting in the TR configuration. Those pipelines are standalone and should not impact each other.
Thanks Andy
Yes, different people trying to extract delta of same source table.
I tried creating a new pipeline, with new instance of table replicator, on same source table, but it never creates a new target table.
BR,
Indu.
Indu,
I have never tried to create separate pipelines for different targets by capturing from the same source. Maybe my previous understanding is not correct. Also, according to the spec of Table Replicator , It is not recommend to create each TR operator runs in a separated graph unless you have a good reason since it becomes costly, since each graph is at least one additional K8s pod.
Anyway, personally, I recommend to implement the fan-out messaging like this blog did if you have multiple consumers tracking the changes for the same source.
Thanks Andy,
IF different users are not using SAP Data intelligence but other platforms then I was wondering these triggers on Source table would affect their access/CDC?
BR,
Indu.
Indu,
I do not think so since the triggers are created specifically by DI. Other platforms may have their own way of implementing the CDC, right?
Hello Andy,
Yes, but can not say that with certainty as I have not tested it, I became skeptical when I was not able to create another target table.
But, thanks you for your time.
BR,
Indu.
Hi Andy,
While deletion , I got this below error: (I am using target as table , not file):
Group: default; Messages: Graph failure: error while stopping 'com.sap.dh.flowagent' sub-engine: Post "http://localhost:33461/service/stop": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
What is the meaning of this error, there are no details...!
Thanks,
Indu.
Andy Yin
Indu Khurana
sachin rao
Carlos Mendez
My Humble Apologies Sorry. I'm really not sure what we trying to do here.
My use case is simple: :
1)Consume the latest offset from apache kafka triggered by user.
2) Deserialize the avro encoded message
3) Convert to table typed message and ingest to HANA DB (Create and Updates)
4) In case of any error, retry only at the point of failure only 5 times (at that particular operator instead of reinitializing the whole graph again)
5) Reprocess only the failed message /offset again and not all.
6) If reprocessing fails, Alert/Notify the user with payload content, kafka offset number and error message in case if graph is dead.
flow