Skip to Content
Technical Articles
Author's profile photo Andy Yin

Build Log-based integration with SAP Data Intelligence

Log-based integration

Log-based integration is getting popular for integrating different data systems. By capturing the changes in a database and continually apply the same changes to derived data systems, the data in the derived systems is match with the data in the database as long as the message ordering of changed data stream is preserved. The derived data systems are consumers of the changed data stream, as illustrated in below figure.

The derived data systems in practice might be indexes, caches, analytics systems, etc.

In order to build a log-based integtation, we need two critical tools:

  • A CDC tool for capturing database changes.
  • A message broker for persisting messages stream while preserving the message ordering.

In this tutorial, we want to explore how to achieve data integration between database and various other derived data systems using a log-based approach in SAP Data Intelligence.

CDC

Change data capture(CDC) is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.

SAP Data Intelligence 3.0 introduced the table Replicator operator which allows capturing delta changes for different databases. Table Replicator operator effectively implements the CDC using an approach of trigger-based replication.

Kafka

Apache Kafka is a message broker which persists the messages it receives. It provides a total ordering of the messages inside each partition. SAP Data Intelligence has built-in Kafka operators which allows us to use a Kafka cluster.

Next, we will see how to leverage these two operators to implement a concrete integration scenario.

Preparation

We prepare a Hana database table, and its schema is illustrated as below.

The table content is illustrated as below.

For simplicity, we only use factitious data for demonstration purpose. The table initially contains 6 records.

The scenario involves two tasks.

  1. An initial loading of the data in source table to a target file in Data Intelligence local file system.
  2. Continuously capture source table changes(Delta extraction), publish the change messages into a Kafka topic and let derived systems consume those changes.

We will create separate graphs for these two tasks.

Initial loading

The following figure illustrates the initial loading graph:

Initial%20loading%20graph

The Table replicator operator configuration is illustrated as below.

The important configuration parameters are marked in red box.

Note that the deltaGrapMode is set to Manual. This ensures the graph would complete its execution as soon as the initial loading completed. Otherwise, the graph would run indefinitely to track further delta changes.

Now run the graph, and verify the target file has been generated once the graph runs complete.

Open the file and verify the table content has been replicated successfully.

Note that delta record provides a row type like Insert (I), Update (U), Delete (D), Before Image (B), Unknown (X), and so on. In our case of initial loading, the row type is I which means insert record. The row type in the message is named as DI_OPERATION_TYPE.

Our initial loading task has been finished successfully. Next, let’s turn to the Delta extraction task.

Delta extraction

The following figure illustrates the Delta extraction graph:

Let’s take an overview of the dataflow sequence.

  • The constant generator operator(1) will trigger the Table Replicator to begin the CDC delta tracking.
  • The Table Replicator operator (2) will replicate the database changes to a target file like what happens in the initial loading.
  • The JS operator (3) will remove the ‘/vrep’ prefix from the target file path. The prefix was added by Table Replicator operator which will prevent the downstream Read File operator from finding the file if we do not remove it.
  • The Read File operator (4) will read the target file content and send it to downstream JS operator.
  • The JS operator (5) will parse the received file content and send the parsed messages into Kafka topic.
  • The Kafka Producer operator (6) will receive the incoming messages and publish them into the specified topic on the Kafka cluster.
  • The Kafka Consumers (7, 8) receive the messages from the topic on the Kafka cluster.
  • The Connected Wiretap operators (9, 10) act as the derived data system consume and apply the received messages.

Now let’s look at the configuration of some operators.

Table Replicator

Its configuration is illustrated as below.

Note that the deltaGrapMode is set to “Polling Interval”, and the maxPollingInterval is set to “60”. This ensures the graph would run indefinitely to track delta changes, and Table Replicator will polling the delta changes within one minute.

The “Remove path prefix” JS operator

$.setPortCallback("input",onInput);

function onInput(ctx,s) {
    var msg = {};

    msg.Attributes = {};
    for (var key in s.Attributes) {
        msg.Attributes[key] = s.Attributes[key];
    }
    
    msg.Body = s.Body.replace(/^\/vrep/, '');
    $.output(msg);
}

It simply removes the “/vrep” prefix from the received message body which contains the target file path.

The “Parse & send changes JS operator

Its script code is shown as below.

$.setPortCallback("input",onInput);

function isByteArray(data) {
    switch (Object.prototype.toString.call(data)) {
        case "[object Int8Array]":
        case "[object Uint8Array]":
            return true;
        case "[object Array]":
        case "[object GoArray]":
            return data.length > 0 && typeof data[0] === 'number';
    }
    return false;
}

function onInput(ctx,s) {
    var inbody = s.Body;
    var inattributes = s.Attributes;
    
    var msg = {};
    msg.Attributes = {};
    for (var key in inattributes) {
        msg.Attributes[key] = inattributes[key];
    }

    // convert the body into string if it is bytes
    if (isByteArray(inbody)) {
        inbody = String.fromCharCode.apply(null, inbody);
    }
    
    var lines = inbody.split(/\r\n/);
    
    if (typeof inbody === 'string') {
        // if the body is a string (e.g., a plain text or json string),
        msg.Attributes["js.action"] = "parseFile";
        
        var readOffset = 1;
        var dataCols = lines[0].split(',');
        var o_inter = {};
        var fields = [];

        lines.slice(readOffset).forEach(function(line) {
            if(line.length !== 0){
                fields = line.split(',')
                dataCols.forEach(function(c, i) {
                    o_inter[c] = fields[i];
                    
                });
                ++readOffset;
                msg.Body = o_inter;
                $.output(msg);
           }
        });
    }
    else {
        // if the body is an object (e.g., a json object),
        // forward the body and indicate it in attribute js.action
        msg.Body = inbody;
        msg.Attributes["js.action"] = "noop";
        $.output(msg);
    }
}

It parses the received file content line by line. And each line represents a message and will be immediately sent to the downstream operator.

Kafka Producer

Its configuration is illustrated as below.

Kafka Consumer 1

Its configuration is illustrated as below.

Kafka Consumer 2

Note that the “Group ID” configuration are different for the two consumers. This is important since we want to achieve fan-out messaging. That is, a single Kafka partition consumed by multiple consumers, each maintaining its own message offset.

Now we can start run the graph by clicking the run button. Next we will start to write to source table through insert, update and delete operations and observe how the changes will be replicated into the derived systems.

Insert

Let’s insert one record into the source table by running the below SQL statement.

INSERT INTO "TRIALEDITION1"."HANA_SOURCE" VALUES (7, 3, 40)

Go to Data Intelligence Local file system work spaces, check and verify the target file has been generated, as below figure illustrated.

Open that file and verify the message for the inserted record has been replicated to the target file successfully, as below figure illustrated.

Finally, we open the change consumer wiretap UI to check its output, as below figure illustrated.

Update

Let’s update one record in the source table by running the below SQL statement.

UPDATE "TRIALEDITION1"."HANA_SOURCE" SET TEMPERATURE = TEMPERATURE+ 1 WHERE ID = 1

Check and verify the target file has been generated, as below figure illustrated.

Verify the messages for the updated record have been replicated to the target file successfully, as below figure illustrated.

Note that for Update statement, its change is composed by two messages. Their row types are Before Image (B) and Update (U) respectively.

The Change consumer wiretap UI output is as below figure illustrated.

Delete

Let’s delete one record in the source table by running the below SQL statement.

DELETE FROM "TRIALEDITION1"."HANA_SOURCE" WHERE  ID = 7

Check and verify the target file has been generated, as below figure illustrated.

Verify the change message for the deleted record has been replicated to the target file successfully, as below figure illustrated.

The Change consumer wiretap UI output is as below figure illustrated.

Summary

This tutorial shows how it is easy to build a data integration pipeline to keep different data systems in sync using SAP Data Intelligence’s built-in operators. Hope this will helpful for you to get started with your own journey on SAP Data Intelligence.

Assigned tags

      12 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Vikas Kumar Singh
      Vikas Kumar Singh

      Hi Andy,

      Thank you for such a nice article. Would this work with table to table replication, i.e if souce and target are both hana tables. Also, my requirement is to sync the data once a week, so graph should run once a week and replicate the delta changes from source and target. Can you please suggest what changes will I have to do ?

      Thank You.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Vikas,

      For both source and target are hana tables in your case, use the "Table Replicator" operator itself should be adequate. After your initial loading is complete, you can run your weekly delta change replication by setting the "Delta Graph Run Mode" field to "Manual" and scheduling the graph to run periodically. Please refer to the "Table Replicator" details in https://help.sap.com/viewer/f66004fabfae4cd4974b54b6dd41993d/3.1.2/en-US/79fcadb91f584f868a6662111b92f6e7.html?q=Table%20Replicator%20Task

      and the "Initial Load + Delta Extraction from AnyDB" sample graph in https://help.sap.com/viewer/f66004fabfae4cd4974b54b6dd41993d/3.1.2/en-US/099adc5b14184bf0abaab0ea4bd31ed8.html

      Hope this useful.

      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

      Thank you for a great article.It was a wonderful read and learning!

      I  am working on a use case which involves reading large number of files in a dir from s3 bucket.The file events are stored in SQS (FIFO) and is processed by Lambda compute service on AWS .These files are read into SAP Data Intelligence and are staged/upserted in HANA.

      Can Lambda service (being triggered by SQS, FIFO ) at source be mirrored in DI using operators?

      The events I am searching for are:

      Log of all the files arriving in S3 bucket

      Event parsing and processing

      Retrieving next batch of file events and process each file event

      I'm still trying to understand Lambda, so please understand if something doesn't make sense.

      Thanks,

      Indu.

       

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      For your case, you may try the Monitor Files operator in SAP Data intelligence to monitor the specified file events in your S3.

      You can refer to the sample graph Ingest Files into HANA for continuously ingesting data from files into a HANA table.

      Repository Objects Reference for SAP Data Intelligence is a good place to check the information about the built-in operators and to learn sample graphs that SAP Data Intelligence provides for use in SAP Data Intelligence Modeler.  You can also create your own operator if there is no built-in operator can satisfy your specific requirement.

      Regards,

      /Andy

      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

       

      Thank you for the answer, I have been trying the same approach with monitor operator.

      I have few questions if you can please answer:

      1. With 'monitor' operator, we could monitor four events( new, modified, delete, already exists), now for all the events, the graph is triggered and I can see the events type in wiretap, but as in 'Hana client' operator the mode is insert(not upsert) the records in Hana table are neither updated nor deleted (for existing file), but are just added for a new file.
      2. For upsert mode, we need to set a primary key for the table but I do not see any provision for that in Hana Client operator. How can we achieve this?
      3. What if HANA is down, how can we track file loss?

      Thanks,

      Indu Khurana.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      I guess you are doing data ingestion from a source system to HANA. For data ingestion, we have the options to use batch ingestion or streaming ingestion. SAP Data Intelligence supports both.

      If your use case is streaming ingestion, and your have a way to get the CDC messages for your source system, then you can use the approach introduced in this blog to do your work.

      If your use case is batch ingestion and use the ELT pattern, you are probably periodically doing incremental data extraction from your source system and upload them into S3 storage, you can choose an appropriate File operator to read the S3 file content and load them into HANA by simply inserting into table.  In this case, you do not need to consider upsert since both the original record and the updated record will coexist in the table. And you can differentiate them with the timestamp information(suppose your source system records has a field identifying the last updated timestamp). Of course, incremental batch ingestion cannot capture the delete operation.

      If you really want to get ride of the deleted records and do not want to keep the original records of the updated ones. You may change the incremental batch ingestion into a full batch ingestion. To do that, you need to do a full data extraction from source system to S3, then read the S3 file and load the records into HANA by inserting into table and changing the "Table initialization" configuration of the Hana Client operator to Truncate.

      After the data ingestion, you can transform the ingested data into the data model according to your specific needs.

      For question 1:  If you really want to replicate from source system into HANA , choose the streaming ingestion(CDC approach) or use the full batch ingestion.

      For question 2:  If you really want to use Hana Client operator to create a HANA table with primary key setting, you simply need to pass the SQL create table statement into the sql port of the Hana Client operator. You can achieve this by filling your SQL statement in a "Constant Generator" operator and connect it to the sql port of the Hana Client operator.

      For question 3: The  Ingest Files into HANA example illustrated a way of how to guarantee each file is processed at least once if HANA is down temporary by rerunning the graph when HANA restored.

      Author's profile photo Indu Khurana
      Indu Khurana

      Thank you Andy for the detailed answer.

      I really appreciate you taking out time and providing insights.

      Good Day to you!

      Indu.

      Author's profile photo Carlos Mendez
      Carlos Mendez

      What is it about the last JS operator?

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Carlos,

      You can remove both the last JS operator and the "graph terminator" operator for the Delta extraction graph. They are NOT necessary since delta extraction is a never-ending process and it will never reach to those two operators. Thanks for pointing out this.

      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

      I am trying to use the table replicator operator for reading tables from Azure SQL cloud into SAP HANA .

      The Initial and delta load are working fine but there are missing key and values ( max/no of batches and number of rows) in checkpoint table. Besides, shadow and shadow view are never populated with any value. Is that expected behavior or am I missing something?

      Also, the primary key details of the source table are missing in check point table and the target HANA table.The metadata of the source table is not moved along when replicated?

      Please suggest.

      Thanks,

      Indu.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      As long as your Initial and delta load are working fine, why are you bothering to check the checkpoint table etc. ? Those tables are part of the internal implementation for trigger-based replication used by the table replicator operator itself. I believe for most of the time, we do not need to check those tables unless you have a good reason probably for troubleshooting something.

      For the primary keys, according to the operator specification, when HANA as target, other than the "Track Change History" mode, it supports an additional mode "Apply Change Data". You can try to run your pipeline again with mode "Apply Change Data" and check it.

      Author's profile photo Indu Khurana
      Indu Khurana

      Thank you Andy for your response.

      I was hoping I could use the checkpoint/shadow tables for logs and exception management. (I was testing with HANA DB as source and target)

      Also, now I have started working with actual data in Azure SQL DB, I ran the graph with table replicator operator and it failed with error " CREATE TABLE permission denied at the database".

      I understand this is a pre-requisite for table replicator operator, but how can I asses the changes being made by this operator at the source database. If I request this access from the DB team, would it be a right approach going forward in production as well?

      Please suggest.

      Regards,

      Indu Khurana