Skip to Content
Technical Articles
Author's profile photo Andy Yin

Build Log-based integration with SAP Data Intelligence

Log-based integration

Log-based integration is getting popular for integrating different data systems. By capturing the changes in a database and continually apply the same changes to derived data systems, the data in the derived systems is match with the data in the database as long as the message ordering of changed data stream is preserved. The derived data systems are consumers of the changed data stream, as illustrated in below figure.

The derived data systems in practice might be indexes, caches, analytics systems, etc.

In order to build a log-based integtation, we need two critical tools:

  • A CDC tool for capturing database changes.
  • A message broker for persisting messages stream while preserving the message ordering.

In this tutorial, we want to explore how to achieve data integration between database and various other derived data systems using a log-based approach in SAP Data Intelligence.

CDC

Change data capture(CDC) is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.

SAP Data Intelligence 3.0 introduced the table Replicator operator which allows capturing delta changes for different databases. Table Replicator operator effectively implements the CDC using an approach of trigger-based replication.

Kafka

Apache Kafka is a message broker which persists the messages it receives. It provides a total ordering of the messages inside each partition. SAP Data Intelligence has built-in Kafka operators which allows us to use a Kafka cluster.

Next, we will see how to leverage these two operators to implement a concrete integration scenario.

Preparation

We prepare a Hana database table, and its schema is illustrated as below.

The table content is illustrated as below.

For simplicity, we only use factitious data for demonstration purpose. The table initially contains 6 records.

The scenario involves two tasks.

  1. An initial loading of the data in source table to a target file in Data Intelligence local file system.
  2. Continuously capture source table changes(Delta extraction), publish the change messages into a Kafka topic and let derived systems consume those changes.

We will create separate graphs for these two tasks.

Initial loading

The following figure illustrates the initial loading graph:

Initial%20loading%20graph

The Table replicator operator configuration is illustrated as below.

The important configuration parameters are marked in red box.

Note that the deltaGrapMode is set to Manual. This ensures the graph would complete its execution as soon as the initial loading completed. Otherwise, the graph would run indefinitely to track further delta changes.

Now run the graph, and verify the target file has been generated once the graph runs complete.

Open the file and verify the table content has been replicated successfully.

Note that delta record provides a row type like Insert (I), Update (U), Delete (D), Before Image (B), Unknown (X), and so on. In our case of initial loading, the row type is I which means insert record. The row type in the message is named as DI_OPERATION_TYPE.

Our initial loading task has been finished successfully. Next, let’s turn to the Delta extraction task.

Delta extraction

The following figure illustrates the Delta extraction graph:

Let’s take an overview of the dataflow sequence.

  • The constant generator operator(1) will trigger the Table Replicator to begin the CDC delta tracking.
  • The Table Replicator operator (2) will replicate the database changes to a target file like what happens in the initial loading.
  • The JS operator (3) will remove the ‘/vrep’ prefix from the target file path. The prefix was added by Table Replicator operator which will prevent the downstream Read File operator from finding the file if we do not remove it.
  • The Read File operator (4) will read the target file content and send it to downstream JS operator.
  • The JS operator (5) will parse the received file content and send the parsed messages into Kafka topic.
  • The Kafka Producer operator (6) will receive the incoming messages and publish them into the specified topic on the Kafka cluster.
  • The Kafka Consumers (7, 8) receive the messages from the topic on the Kafka cluster.
  • The Connected Wiretap operators (9, 10) act as the derived data system consume and apply the received messages.

Now let’s look at the configuration of some operators.

Table Replicator

Its configuration is illustrated as below.

Note that the deltaGrapMode is set to “Polling Interval”, and the maxPollingInterval is set to “60”. This ensures the graph would run indefinitely to track delta changes, and Table Replicator will polling the delta changes within one minute.

The “Remove path prefix” JS operator

$.setPortCallback("input",onInput);

function onInput(ctx,s) {
    var msg = {};

    msg.Attributes = {};
    for (var key in s.Attributes) {
        msg.Attributes[key] = s.Attributes[key];
    }
    
    msg.Body = s.Body.replace(/^\/vrep/, '');
    $.output(msg);
}

It simply removes the “/vrep” prefix from the received message body which contains the target file path.

The “Parse & send changes JS operator

Its script code is shown as below.

$.setPortCallback("input",onInput);

function isByteArray(data) {
    switch (Object.prototype.toString.call(data)) {
        case "[object Int8Array]":
        case "[object Uint8Array]":
            return true;
        case "[object Array]":
        case "[object GoArray]":
            return data.length > 0 && typeof data[0] === 'number';
    }
    return false;
}

function onInput(ctx,s) {
    var inbody = s.Body;
    var inattributes = s.Attributes;
    
    var msg = {};
    msg.Attributes = {};
    for (var key in inattributes) {
        msg.Attributes[key] = inattributes[key];
    }

    // convert the body into string if it is bytes
    if (isByteArray(inbody)) {
        inbody = String.fromCharCode.apply(null, inbody);
    }
    
    var lines = inbody.split(/\r\n/);
    
    if (typeof inbody === 'string') {
        // if the body is a string (e.g., a plain text or json string),
        msg.Attributes["js.action"] = "parseFile";
        
        var readOffset = 1;
        var dataCols = lines[0].split(',');
        var o_inter = {};
        var fields = [];

        lines.slice(readOffset).forEach(function(line) {
            if(line.length !== 0){
                fields = line.split(',')
                dataCols.forEach(function(c, i) {
                    o_inter[c] = fields[i];
                    
                });
                ++readOffset;
                msg.Body = o_inter;
                $.output(msg);
           }
        });
    }
    else {
        // if the body is an object (e.g., a json object),
        // forward the body and indicate it in attribute js.action
        msg.Body = inbody;
        msg.Attributes["js.action"] = "noop";
        $.output(msg);
    }
}

It parses the received file content line by line. And each line represents a message and will be immediately sent to the downstream operator.

Kafka Producer

Its configuration is illustrated as below.

Kafka Consumer 1

Its configuration is illustrated as below.

Kafka Consumer 2

Note that the “Group ID” configuration are different for the two consumers. This is important since we want to achieve fan-out messaging. That is, a single Kafka partition consumed by multiple consumers, each maintaining its own message offset.

Now we can start run the graph by clicking the run button. Next we will start to write to source table through insert, update and delete operations and observe how the changes will be replicated into the derived systems.

Insert

Let’s insert one record into the source table by running the below SQL statement.

INSERT INTO "TRIALEDITION1"."HANA_SOURCE" VALUES (7, 3, 40)

Go to Data Intelligence Local file system work spaces, check and verify the target file has been generated, as below figure illustrated.

Open that file and verify the message for the inserted record has been replicated to the target file successfully, as below figure illustrated.

Finally, we open the change consumer wiretap UI to check its output, as below figure illustrated.

Update

Let’s update one record in the source table by running the below SQL statement.

UPDATE "TRIALEDITION1"."HANA_SOURCE" SET TEMPERATURE = TEMPERATURE+ 1 WHERE ID = 1

Check and verify the target file has been generated, as below figure illustrated.

Verify the messages for the updated record have been replicated to the target file successfully, as below figure illustrated.

Note that for Update statement, its change is composed by two messages. Their row types are Before Image (B) and Update (U) respectively.

The Change consumer wiretap UI output is as below figure illustrated.

Delete

Let’s delete one record in the source table by running the below SQL statement.

DELETE FROM "TRIALEDITION1"."HANA_SOURCE" WHERE  ID = 7

Check and verify the target file has been generated, as below figure illustrated.

Verify the change message for the deleted record has been replicated to the target file successfully, as below figure illustrated.

The Change consumer wiretap UI output is as below figure illustrated.

Summary

This tutorial shows how it is easy to build a data integration pipeline to keep different data systems in sync using SAP Data Intelligence’s built-in operators. Hope this will helpful for you to get started with your own journey on SAP Data Intelligence.

Assigned Tags

      39 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Vikas Kumar Singh
      Vikas Kumar Singh

      Hi Andy,

      Thank you for such a nice article. Would this work with table to table replication, i.e if souce and target are both hana tables. Also, my requirement is to sync the data once a week, so graph should run once a week and replicate the delta changes from source and target. Can you please suggest what changes will I have to do ?

      Thank You.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Vikas,

      For both source and target are hana tables in your case, use the "Table Replicator" operator itself should be adequate. After your initial loading is complete, you can run your weekly delta change replication by setting the "Delta Graph Run Mode" field to "Manual" and scheduling the graph to run periodically. Please refer to the "Table Replicator" details in https://help.sap.com/viewer/f66004fabfae4cd4974b54b6dd41993d/3.1.2/en-US/79fcadb91f584f868a6662111b92f6e7.html?q=Table%20Replicator%20Task

      and the "Initial Load + Delta Extraction from AnyDB" sample graph in https://help.sap.com/viewer/f66004fabfae4cd4974b54b6dd41993d/3.1.2/en-US/099adc5b14184bf0abaab0ea4bd31ed8.html

      Hope this useful.

      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

      Thank you for a great article.It was a wonderful read and learning!

      I  am working on a use case which involves reading large number of files in a dir from s3 bucket.The file events are stored in SQS (FIFO) and is processed by Lambda compute service on AWS .These files are read into SAP Data Intelligence and are staged/upserted in HANA.

      Can Lambda service (being triggered by SQS, FIFO ) at source be mirrored in DI using operators?

      The events I am searching for are:

      Log of all the files arriving in S3 bucket

      Event parsing and processing

      Retrieving next batch of file events and process each file event

      I'm still trying to understand Lambda, so please understand if something doesn't make sense.

      Thanks,

      Indu.

       

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      For your case, you may try the Monitor Files operator in SAP Data intelligence to monitor the specified file events in your S3.

      You can refer to the sample graph Ingest Files into HANA for continuously ingesting data from files into a HANA table.

      Repository Objects Reference for SAP Data Intelligence is a good place to check the information about the built-in operators and to learn sample graphs that SAP Data Intelligence provides for use in SAP Data Intelligence Modeler.  You can also create your own operator if there is no built-in operator can satisfy your specific requirement.

      Regards,

      /Andy

      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

       

      Thank you for the answer, I have been trying the same approach with monitor operator.

      I have few questions if you can please answer:

      1. With 'monitor' operator, we could monitor four events( new, modified, delete, already exists), now for all the events, the graph is triggered and I can see the events type in wiretap, but as in 'Hana client' operator the mode is insert(not upsert) the records in Hana table are neither updated nor deleted (for existing file), but are just added for a new file.
      2. For upsert mode, we need to set a primary key for the table but I do not see any provision for that in Hana Client operator. How can we achieve this?
      3. What if HANA is down, how can we track file loss?

      Thanks,

      Indu Khurana.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      I guess you are doing data ingestion from a source system to HANA. For data ingestion, we have the options to use batch ingestion or streaming ingestion. SAP Data Intelligence supports both.

      If your use case is streaming ingestion, and your have a way to get the CDC messages for your source system, then you can use the approach introduced in this blog to do your work.

      If your use case is batch ingestion and use the ELT pattern, you are probably periodically doing incremental data extraction from your source system and upload them into S3 storage, you can choose an appropriate File operator to read the S3 file content and load them into HANA by simply inserting into table.  In this case, you do not need to consider upsert since both the original record and the updated record will coexist in the table. And you can differentiate them with the timestamp information(suppose your source system records has a field identifying the last updated timestamp). Of course, incremental batch ingestion cannot capture the delete operation.

      If you really want to get ride of the deleted records and do not want to keep the original records of the updated ones. You may change the incremental batch ingestion into a full batch ingestion. To do that, you need to do a full data extraction from source system to S3, then read the S3 file and load the records into HANA by inserting into table and changing the "Table initialization" configuration of the Hana Client operator to Truncate.

      After the data ingestion, you can transform the ingested data into the data model according to your specific needs.

      For question 1:  If you really want to replicate from source system into HANA , choose the streaming ingestion(CDC approach) or use the full batch ingestion.

      For question 2:  If you really want to use Hana Client operator to create a HANA table with primary key setting, you simply need to pass the SQL create table statement into the sql port of the Hana Client operator. You can achieve this by filling your SQL statement in a "Constant Generator" operator and connect it to the sql port of the Hana Client operator.

      For question 3: The  Ingest Files into HANA example illustrated a way of how to guarantee each file is processed at least once if HANA is down temporary by rerunning the graph when HANA restored.

      Author's profile photo Indu Khurana
      Indu Khurana

      Thank you Andy for the detailed answer.

      I really appreciate you taking out time and providing insights.

      Good Day to you!

      Indu.

      Author's profile photo Carlos Mendez
      Carlos Mendez

      What is it about the last JS operator?

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Carlos,

      You can remove both the last JS operator and the "graph terminator" operator for the Delta extraction graph. They are NOT necessary since delta extraction is a never-ending process and it will never reach to those two operators. Thanks for pointing out this.

      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

      I am trying to use the table replicator operator for reading tables from Azure SQL cloud into SAP HANA .

      The Initial and delta load are working fine but there are missing key and values ( max/no of batches and number of rows) in checkpoint table. Besides, shadow and shadow view are never populated with any value. Is that expected behavior or am I missing something?

      Also, the primary key details of the source table are missing in check point table and the target HANA table.The metadata of the source table is not moved along when replicated?

      Please suggest.

      Thanks,

      Indu.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      As long as your Initial and delta load are working fine, why are you bothering to check the checkpoint table etc. ? Those tables are part of the internal implementation for trigger-based replication used by the table replicator operator itself. I believe for most of the time, we do not need to check those tables unless you have a good reason probably for troubleshooting something.

      For the primary keys, according to the operator specification, when HANA as target, other than the "Track Change History" mode, it supports an additional mode "Apply Change Data". You can try to run your pipeline again with mode "Apply Change Data" and check it.

      Author's profile photo Indu Khurana
      Indu Khurana

      Thank you Andy for your response.

      I was hoping I could use the checkpoint/shadow tables for logs and exception management. (I was testing with HANA DB as source and target)

      Also, now I have started working with actual data in Azure SQL DB, I ran the graph with table replicator operator and it failed with error " CREATE TABLE permission denied at the database".

      I understand this is a pre-requisite for table replicator operator, but how can I asses the changes being made by this operator at the source database. If I request this access from the DB team, would it be a right approach going forward in production as well?

      Please suggest.

      Regards,

      Indu Khurana

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      You can refer to the documentation of Table Replicator , it specifies what required SQL Privileges need to be granted for the connection user.

      The "Using a Custom Schema on Table Replicator" section in that documentation has a very detailed specification on what and how to grant the necessary privileges for various supported sources before you can run the Table Replicator operator. After your identify the grant SQL statements for your specific source, you can request the access from the DB admin.

      Due to the nature of trigger-based replication, it has overhead on the source. I recommend you do some tests on your use cases before taking it into production. You can also read some suggestions on Improving CDC Graph Generator Operator Performance.

      Author's profile photo Indu Khurana
      Indu Khurana

      Thank you very much Andy for the useful links.

      I can see with custom schema, we need only few privileges on Source schema, which looks like a good approach but creates a dependency between two schemas. Unfortunately, I can not test it until I apply for the required privileges on Azure SQL.

      I will try to test a similar approach on HANA as a source, to see if there are any challenges.

      Regards,

      Indu.

      Author's profile photo Indu Khurana
      Indu Khurana

      Hi Andy,

      I can't find configuration parameters for adding custom schema in table replicator operator.

      I have checked the config schema file properties as well.

      Can you suggest anything?

      Thanks,

      IK.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      Sorry for the inconvenience. It looks like your DI is not the latest version, so there is no configuration parameter yet.

      I read table replicator documentation in an earlier version, found the statements like below:

      Note: The following source specific SQL Privileges are required. Please ensure the connection user has the following privileges.

      CREATE TABLE
      CREATE VIEW
      CREATE SEQUENCE
      CREATE PROCEDURE
      CREATE TRIGGER

      So, please find the required privileges as per the modeler documentation your are currently using(you can find the doc by right clicking the operator and choose the "Open Documentation" from the context menu).

      Author's profile photo Indu Khurana
      Indu Khurana

      Thanks Andy.

      So, with the current version I have to go with privileges on source schema only.

      But, there is still one issue pending with table replicator, which is primary keys. I tested with HANA as source db and the primary keys of the table were not replicated to the target tables.

      So, for all the tables, primary key has to enforced onto the target tables!!?

      (Also, there is no provision of unique key check in this version)

      Thanks,

      Indu.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      Some thoughts in my mind: CDC tracks the change history for all the records of a specific source table. The captured messages include INSETR, UPDATE and DELETE operation types. It is very likely a record would go through the insert, update, even delete operations in the source table. So the target would have multiple CDC messages for a unique record in the source. This full history of a record is attractive for some analytic purpose. One popular usage for CDC messages is to translate and load them into a data warehouse for analytic usage.

      For this operator, when target is HANA and you choose Apply Change Data instead of Track Change History, this operator will apply the changes to the target HANA table which means you replicate the data from source to the target table instead of a list of CDC messages. Please notice in this case the replication is a FULL table replication which means if you run the graph again, the full data in the source HANA will be replicated to the target HANA again. This has two implications depending on your settings:

      • If you did not create the target table with primary key setting and the the "Target write mode" is "append", then your target will ends up with duplicate records. I guess this is what you encountered and complained now. Change that setting to "overwrite" should fix your problem.
      • If you do created the target table with primary key setting before running the graph and the "Target write mode" is "append", you graph will most likely failed due to the conflict of primary key constraint. Change that setting to "overwrite" should fix your problem as well.
      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

      For Initial load, If I create my target table with primary key before loading the data, it works for the first time with append, for the next time, it fails. But IF I change it to overwrite, then the primary key is also removed in the target table ,as the table schema is updated with new one from table producer/flowagent table producer.

      It is frustrating! 🙁

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      The new version of this operator provides one more option "truncate" for "Target Write Mode" which keeps the primary keys setting should eliminates your primary key concern.

      Author's profile photo Indu Khurana
      Indu Khurana

      Hi Andy,

      I will try this option, but right now, my approach is to create three tables with primary key before loading with Hana initializer, then mapped it to SQL consumer to run a query to extract 15 years of data from azure SQL DB, writing this result to a table producer : tab1 ( in append mode, to keep the primary key , in overwrite and truncate , primary key gets wiped out).( some DQ rules to be done here in this tab1) . Mapped further to Hana table consumer to read tab1 and flowagent table producer ( only keep PK with append, with truncate and overwrite , PK is removed) to move it to staging table in another schema.

      At the very first landing point of data from azure to Hana, I have to use append to keep the PK.

      P.S.: I have tried using alter table option to add PK later , but that fails with Hana SQL, flowagent SQL, SQL consumer.

       

      BR,IK.

      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

       

      I tried with table replicator approach, in overwrite (O)and truncate(T) mode: getting below error for source shadow table view:

      Group messages: Group: default; Messages: Graph failure: tablereplicator1 failed with the following error: com.sap.hana.dp.adapter.sdk.AdapterException: Get metadata request could not be performed due to error: Base table or view not found;259 invalid table name: Could not find table/view DLTRANS_PK_24_SHV in schema S1: line 1 col 34 (at pos 33) Context: com.sap.hana.dp.adapter.sdk.AdapterException: Get metadata request could not be performed due to error: Base table or view not found;259 invalid table name: Could not find table/view DLTRANS_PK_24_SHV in schema S1 : line 1 col 34 (at pos 33) at dsadapter.util.AdapterExceptionHelper.getMDSAdapterException(AdapterExceptionHelper.java:135) at dsadapter.mds.RequestHandler.handleMessage(RequestHandler.java:72) at dsadapter.mds.MDSServer.importMetadata(MDSServer.java:352) at dsadapter.mds.MDSServer.importMetadata(MDSServer.java:340) at dsadapter.sdi.adapters.GenericDatabaseAdapterDataStore.importMetadata(GenericDatabaseAdapterDataStore.java:340) at dsadapter.vflow.VFlowHelper.getMetadata(VFlowHelper.java:202) at dsadapter.vflow.VFlowHelper.getMetadata(VFlowHelper.java:291) at dsadapter.vflow.VFlowHelper.getProperties(VFlowHelper.java:486) at dsadapter.vflow.jobs.DataservicesETLJob.getProperties(DataservicesETLJob.java:131) at dsadapter.vflow.jobs.DataservicesIngestionJob.compileIngestionGraph(DataservicesIngestionJob.jav

       

      Author's profile photo Indu Khurana
      Indu Khurana

      Sorted 🙂

      Truncate worked with flowagent producer.

      A bigg thank you for you support Andy. If there was an award I could give to you, I would give you "The amazing support" award 🙂

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      You are welcome. Glad to hear you are working good.

      Author's profile photo Indu Khurana
      Indu Khurana

      Thank you Andy for the reply, but I did not get the approval to get the privileges required for table replicator operator on source database.

      Hence, I am proceeding with standard way of SQL consumer, table producer for initial load. For Delta, we would be using audit trail from database itself.

      But primary key addition to target schema still remains an issue.

      Thanks,

      Indu.

      Author's profile photo sachin rao
      sachin rao

      Hi Andy,  Thanks for a really great blog. I have a need to replicate SAP HANA data to snowflake. I was exploring Flowagent SQL executor Snowflake connector for this with Table replicator. Please advice if we can use Table Replicate --> Local File system --> Read File --> Snowflake connector to replicate data to snowflake?

      My idea is to use Micro batches (by setting higher polling interval in table replicator) of CDC records captured by replicator and

      Would this solution scale to handle high transactions in HANA?

      Thanks.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Sachin,

      It is not encouraged to write to the DI local file system(this blog used that only for illustration purpose). You can write the Table Replicator output directly to a cloud storage like S3.  Then you can read, parse the output file and probably interpret the messages into corresponding INSERT, UPDATE, DELETE statements and feed them to the downstream Flowagent SQL executor connector you mentioned.

      I am not sure if the target snowflake processing can keep up with the speed your source HANA sent with your configuration. If not, the flow control of backpressure will be applied. So you may need to do some test and configure your settings accordingly as per your use case.

      One suggestion is to send the parsed messages to Kafka like this blog did. This way, you do not need to consider if the target processing speed matching with the source delivery speed since they are decoupled now. Also if there is some error happen in the target, you can replay the whole consumption.

      Author's profile photo sachin rao
      sachin rao

      Thanks for the quick reply Andy.

      Can the Kafka Consumer operator be connected to SQL executor directly i.e. can messages from Kafka Consumer operator be sent to SQL executor to write to snowflake?

       

      Thanks

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Sachin,

      I don't think you can connect the output of the Kafka consumer directly to the flowagent SQL executor operator since the output are CDC messages while the input port of the flowagent SQL executor operator expects sql statement string. As I mentioned, you need to parse the CDC messages and convert them into appropriate sql statements for flowagent SQL executor operator to execute.

      Author's profile photo sachin rao
      sachin rao

      Thanks Andy!

      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

      I have a question on table replicator operator(TR op).

      With TR op, the triggers are created on the source table (tab 1),delta is extracted and applied on target tables.

      If there are multiple vendors  who want to extract delta of a same table(tab 1), how would they achieve it as we can not create another target table from same source table.

      Is it like we have to cleanup after graph termination?

      Please advise.

      Thanks,

      Indu.

       

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Hi Indu,

      I am not quite understand what's the meaning of multiple vendors. If you mean there are different people want to extract the delta from the same source table, then they would most likely create a separate pipeline with its own target setting in the TR configuration. Those pipelines are standalone and should not impact each other.

      Author's profile photo Indu Khurana
      Indu Khurana

      Thanks Andy

      Yes, different people trying to extract delta of same source table.

      I tried creating a new pipeline, with new instance of table replicator, on same source table, but it never creates a new target table.

      BR,

      Indu.

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Indu,

      I have never tried to create separate pipelines for different targets by capturing from the same source. Maybe my previous understanding is not correct. Also, according to the spec of Table Replicator , It is not recommend to create each TR operator runs in a separated graph unless you have a good reason since it becomes costly, since each graph is at least one additional K8s pod.

      Anyway, personally, I recommend to implement the fan-out messaging like this blog did if you have multiple consumers tracking the changes for the same source.

      Author's profile photo Indu Khurana
      Indu Khurana

      Thanks Andy,

      IF different users are not using SAP Data intelligence but other platforms then I was wondering these triggers on Source table would affect their access/CDC?

      BR,

      Indu.

       

      Author's profile photo Andy Yin
      Andy Yin
      Blog Post Author

      Indu,

      I do not think so since the triggers are created specifically by DI. Other platforms may have their own way of implementing the CDC, right?

      Author's profile photo Indu Khurana
      Indu Khurana

      Hello Andy,

      Yes, but can not say that with certainty as I have not tested it, I became skeptical when I was not able to create another target table.

      But, thanks you for your time.

      BR,

      Indu.

      Author's profile photo Indu Khurana
      Indu Khurana

      Hi Andy,

      While deletion , I got this below error: (I am using target as table , not file):

      Group: default; Messages: Graph failure: error while stopping 'com.sap.dh.flowagent' sub-engine: Post "http://localhost:33461/service/stop": context deadline exceeded (Client.Timeout exceeded while awaiting headers)

      What is the meaning of this error, there are no details...!

      Thanks,

      Indu.

      Author's profile photo Rajesh PS
      Rajesh PS

      Andy Yin

      Indu Khurana

      sachin rao

      Carlos Mendez

      My Humble Apologies Sorry. I'm really not sure what we trying to do here.

       

      My use case is simple: :

      1)Consume the latest offset from apache kafka triggered by user.

      2) Deserialize the avro encoded message

      3) Convert to table typed message and ingest to HANA DB (Create and Updates)

      4) In case of any error, retry only at the point of failure only 5 times (at that particular operator instead of reinitializing the whole graph again)

      5) Reprocess only the failed message /offset again and not all.

      6) If reprocessing fails, Alert/Notify the user with payload content, kafka offset number and error message in case if graph is dead.

      flow

      flow