Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
DanielKraemer
Associate
Associate

This blog is part of a blog series from SAP Datasphere product management with the focus on the Replication Flow capabilities in SAP Datasphere:  

Replication Flow Blog Series Part 1 – Overview | SAP Blogs 

Replication Flow Blog Series Part 2 – Premium Outbound Integration | SAP Blogs 

Replication Flows Blog Series Part 3 – Integration with Kafka 

Replication Flows Blog Series Part 4 – Sizing   

Replication Flows Blog Series Part 5 – Integration between SAP Datasphere and Databricks 

Replication Flows Blog Series Part 6 – Confluent as a Replication Target

Data Integration is an essential topic in a Business Data Fabric like SAP Datasphere. Replication Flow is the cornerstone to fuel SAP Datasphere with data, especially from SAP ABAP sources. There is also a big need to move enriched data from SAP Datasphere into external environments to succeed certain use cases. 

In this part of the Replication Flow Blog series we focus on the usage of Confluent as a target in a Replication Flow. We will explain in detail the new capabilities that have been introduced with SAP Datasphere release 2024.08. The content of this blog is structured as follows.

  1. Introduction
  2. Confluent as a new Connection Type in SAP Datasphere
  3. Configuration options for Replication Flows
  4. Details on Kafka Message and Schema creation
    1. Schema & Message creation
    2. Data Type mappings
  5. Scenarios or what happens if...

 

1. Introduction

The purpose of the additional features that are described in this Blog, next to the generic Kafka integration that has been available since end of last year, is to provide tailor-made integration with the managed Kafka offerings Confluent Cloud and Confluent Platform of our dedicated partner Confluent.

The Confluent integration described in this blog is only usable in SAP Datasphere Replication Flows.

For the examples and step-by-step instructions in the blog, we assume that a properly configured SAP Datasphere tenant and a Confluent Cloud cluster are available. In addition, we assume that the reader is familiar with the basic concepts around Replication Flows and Connection Management in SAP Datasphere as well as with the Kafka capabilities of Confluent Cloud.

 

2. Confluent as a new Connection Type in SAP Datasphere

Beginning of this year a new connection type was introduced for Apache Kafka to offer basic integration with Apache Kafka based sinks. With the 2024.08 release of SAP Datasphere there is a new dedicated connection type introduced that is tailored to Confluent in order to support Confluent specific capabilities like Confluents built-in schema-registry.

Figure 1 - New connection type for ConfluentFigure 1 - New connection type for Confluent

Here are the different configuration options for Confluent.

Connection Details

  • System Type
    Possibility to choose between Confluent Cloud and Confluent Platform
  • Kafka Brokers
    A comma-separated list of Kafka brokers in the format <host>:<port>.

Cloud Connector (only available if System Type Confluent Platform is chosen)

  • Use Cloud Connector
    This setting configures whether a Cloud Connector is used

Authentication

The following SASL based authentication methods are supported.

For Confluent Cloud:

Authentication Type

SASL Authentication Type

Properties

No Authentication

n/a

 

API Key and Secret

PLAIN

API Key*
API Secret*

*mandatory

For Confluent Platform:

Authentication Type

SASL Authentication Type

Properties

No Authentication

n/a

 

User Name and Password

PLAIN

Kafka SASL User Name*
Kafka SASL Password*

Salted Challenge Response Authentication Mechanism (256)

SCRAM256

Kafka SASL User Name*
Kafka SASL Password*

Salted Challenge Response Authentication Mechanism (512)

SCRAM 512

Kafka SASL User Name*
Kafka SASL Password*

Kerberos with User Name and Password

GSSAPI

Kafka Kerberos Service Name*
Kafka Kerberos Realm*
Kafka Kerberos Config*
User Name*
Password*

Kerberos with Keytab File

GSSAPI

Kafka Kerberos Service Name*
Kafka Kerberos Realm*
Kafka Kerberos Config*
User Name*
Keytab File*

*mandatory

Security

Transport Layer Security (TLS) settings for encryption as well as server certification validation are supported for system type Confluent Cloud. In addition, Confluent Platform offers client certificate validation via mTLS.

Figure 2 - TLS configuration for Confluent Cloud and Confluent PlatformFigure 2 - TLS configuration for Confluent Cloud and Confluent Platform

Schema Registry

Additional configuration options to specify schema registry endpoint and credentials.

  • URL
    Endpoint of the schema registry in the format <host>:<port>
  • Authentication Type
    Authentication mechanism that is supposed to be used for schema registry. User Name and Password and No Authentication are supported.
  • User Name + Password
    User Name and Password in case the corresponding authentication type is chosen.

Remark: A schema registry configuration is mandatory when creating connections of type Confluent.

Connections of connection type Confluent can currently only be used as targets in Replication Flows.

Example: Creating a connection to Confluent Cloud in SAP Datasphere

We are now showcasing how to create a connection to Confluent Cloud including schema registry configuration. The case of Confluent Platform is similar assuming that a Cloud Connector has been configured properly.

Use the Bootstrap Server URL(s) of your Confluent cluster as an entry in the Kafka Brokers property.

Figure 3 - Specify Kafka Brokers in the SAP Datasphere connection creation wizard.Figure 3 - Specify Kafka Brokers in the SAP Datasphere connection creation wizard.

Choose User Name And Password as authentication type and use an API key and secret pair as Kafka SASL User Name and Kafka SASL Password.

Figure 4 - API Key and Secret as authentication credentialsFigure 4 - API Key and Secret as authentication credentials

Remark: It is assumed that the owner of the API key has sufficient rights to create/delete and access topics.

Use the Stream Governance API Endpoint as the Schema Registry URL.

Figure 5 - Specify schema registry URL in the SAP Datasphere connection creation wizard.Figure 5 - Specify schema registry URL in the SAP Datasphere connection creation wizard.

Choose User Name And Password as the authentication type for the Schema Registry and use an API key and secret pair as User Name and Password, respectively.

Figure 6 - Specify API Key Secret pair for Schema registry access.Figure 6 - Specify API Key Secret pair for Schema registry access.

Remark: It is assumed that the owner of the API credentials has sufficient rights to create and update schemas.

 

3. Configuration Options for Replication Flows

For the remainder of this blog post, we will describe the capabilities of the Confluent integration with Replication Flows alongside the following source data set that is assumed to be stored in a local table (Business Name: Demo Table) in SAP Datasphere.

Figure 7 - Example source dataset Demo TableFigure 7 - Example source dataset Demo Table

Let’s assume we want to replicate this table into a Confluent Cloud instance. We create a corresponding Replication Flow design time artifact, select SAP Datasphere and the local table Demo table, respectively, as source and chose the connection CONFLUENT_DEMO (see section 1) as the sink for the Replication Flow.

The following screenshots highlights the configuration options for the Confluent sink.

Figure 8 - Replication Flow configuration options for ConfluentFigure 8 - Replication Flow configuration options for Confluent

Compared to the usage of the Apache Kafka connection type there are the following additional configuration options Use Schema Registry, Record Name, Subject Name Strategy, Compatibility Type and Clamp Decimal Floating Point Data Types. They are described in the following table together with the other properties.

Setting

Value

Explanation

Replication Thread Limit

Number

The number of parallel replication threads that can be executed during the replication process. Only available in Global configuration.

Number of Partitions

Number

The number of Kafka Partitions for the target Kafka topic.
Only used for new topics that don’t yet exist in the Kafka Cluster, otherwise the setting is ignored.

Replication Factor

Number

The Kafka replication factor for the Kafka topic.
Only used for new topics that don’t yet exist in the Kafka Cluster, otherwise the setting is ignored.

Message Encoder

AVRO or JSON

The message format for the Kafka topic.

Message Compression

No Compression
Gzip
Snappy
LZ4
Zstandard

The compression method for the Kafka messages that are sent to a Kafka topic.

Use Schema Registry

True or False


True: Schema Registry is used

False: Schema Registry is not used

Schema registry is mandatory in case Message Encoder AVRO is chosen.

Topic Name

string

The name of the Kafka topic to be used as the target.
The topic name is always based on the target object name.

It can only be changed by renaming the target object.

Record Name

string

The record name that is used for the schema registry entry when the subject name strategy is applied. It is also referenced in the schema definition itself.

Subject Name Strategy

Topic
Record
Topic-Record

Choose the subject name strategy for the schema.
Only available if Use Schema Registry is used/true.

Compatibility Type

Default
Backward
Backward Transitive
Forward
Forward Transitive
Full
Full Transitive
None

Choose the compatibility type for the schema registry subject.
Only available if Use Schema Registry is used/true.

Clamp Decimal Floating Point Data Types

True

Immutable setting (always True)
Decimal values that do not fit into the target type (see section 4.2) are automatically clamped

Overwrite Target Settings at Object Level

True or False



True: The global configuration overwrites the configurations made at task level.

Only available in global configuration

The settings on Replication Task level take precedence over the settings on Replication Flow level unless the Overwrite Target Settings at Object Level checkbox is checked in the settings on Replication Flow level.

After the Replication Flow has been deployed and run the Kafka topic was created and a corresponding schema registry entry was created (see section 4 for details on schema and message creation).

Figure 9 - Example of Target Kafka TopicFigure 9 - Example of Target Kafka Topic

Remark:
As it was the case with the generic Kafka integration, for Confluent we support a Truncate flag. If set and in case the target Kafka topic for the replication is already existing in the Confluent cluster, it is deleted an recreated. This also deletes all messages that are assigned to the topic.

Figure 10 - Truncate flag configuration setting in Replication FlowsFigure 10 - Truncate flag configuration setting in Replication Flows

In the following section we will have a closer look on schema and message creation.

 

4. Details on Kafka Message and Schema creation

Before describing the details regarding schema and message creation, we start with a listing of the producer configurations that are used in SAP Datasphere Replication Flows. The parameter values are fixed and cannot be changed in SAP Datasphere.

Kafka Producer Configuration Parameter

Value used by Replication Flows

Remark

max.request.size

1048576 (1MB)

Confluent Default

request.timeout.ms

30 seconds

Confluent Default

max.in.flight.requests.per.connection

5

Confluent Default

retry.backoff.ms

100

Confluent Default

acks

all (-1)

Confluent Default

retries

3

 

 

4.1 Message & Schema Creation

The following section is only applicable in case the Use Schema registry toggle is activated (true) in the Replication Flow/Task properties panel. In this case schemas a written to the schema registry as described in the following paragraphs.

No schema definition for the Kafka message key (e.g. […]-key) is created in Confluent. Instead, the message key is a string consisting of the primary key values of the source dataset separated by underscores.

Figure 11 - Kafka message key generationFigure 11 - Kafka message key generation

If, in case of our example above, we would have the two columns ID and First_Name together as a primary key, then the generated message keys would look as follows.

Figure 12 - Second example for Kafka message key generationFigure 12 - Second example for Kafka message key generation

The schema definition for the Kafka message body is constructed and created based on the structure/schema of the source dataset, the configuration settings of the executed Replication Flow and the change data capture mechanisms of SAP Datasphere. The following screenshot shows how the different settings are fed into the Kafka schema.

Figure 13 - Schema creation for the Kafka message bodyFigure 13 - Schema creation for the Kafka message body

The corresponding Confluent schema and subject names, the serialization format and the compatibility mode are derived from the configuration values of the Replication Flow and Replication Task definition, respectively.

The schema definition for the message body itself is derived from the source dataset definition and the metadata for the delta information of the Replication Flow. In our example the schema of the Demo Table is translated into an AVRO schema that contains the four fields ID, First_Name, Second_Name and Age as well as the three delta columns that are always added and which contain the Replication Flow specific delta information. The AVRO type of the message body is always record.

In case the serialization format is set to JSON a corresponding JSON schema is registered in the Confluent schema registry that follows the JSON Schema standard. The following schema definition would have been written to the Confluent schema registry in case JSON serialization format would have been used in our example above:

 

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "properties": {
    "Age": {
      "anyOf": [
        {
          "maximum": 9223372036854776000,
          "minimum": -9223372036854776000,
          "type": "integer"
        },
        {
          "type": "null"
        }
      ]
    },
    "First_Name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ]
    },
    "ID": {
      "anyOf": [
        {
          "maximum": 9223372036854776000,
          "minimum": -9223372036854776000,
          "type": "integer"
        },
        {
          "type": "null"
        }
      ]
    },
    "Second_Name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ]
    },
    "__operation_type": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ]
    },
    "__sequence_number": {
      "anyOf": [
        {
          "maximum": 18446744073709552000,
          "minimum": 0,
          "type": "integer"
        },
        {
          "type": "null"
        }
      ]
    },
    "__timestamp": {
      "anyOf": [
        {
          "format": "date-time",
          "type": "string"
        },
        {
          "type": "null"
        }
      ]
    }
  },
  "title": "Demo_Table",
  "type": "object"
}

 

The topic itself is created and configured based on the Replication Flow/Task settings Number of Partitions, Replication Factor and Topic Name. All other topic specific configuration parameters are inherited from the Confluent Cluster settings.

For initial of delta loads during a Replication Flow run for each row in the source data set exactly one Kafka message is added to the target Kafka topic.

In case of our example (initial load of a SAP Datasphere table) the final Kafka messages look as follows (an overview of the Kafka topic is shown in figure 10 above).

Figure 14 - Example Kafka messages for Demo Table replicationFigure 14 - Example Kafka messages for Demo Table replication

The next section contains an overview of type mappings between selected source systems and JSON/AVRO schema types.

 

4.2 Data Type Mappings

The following two tables contain the data type mappings for the scenarios where SAP HANA/SAP Datasphere or ABAP artifacts are chosen as a source.

Scenario: SAP HANA/SAP Datasphere to Confluent

SAP HANA Type

JSON Type

AVRO Type

TINYINT

number

int

SMALLINT

number

int

INTEGER

number

int

BIGINT

number

long

REAL

number

float

DOUBLE

number

double

SMALLDECIMAL

number

type:"bytes",logical type:"decimal",scale:6,precision:28

DECIMAL

number

type:"bytes",logical type:"decimal",scale:6,precision:38

DECIMAL(p,s)

number

type:"bytes", logical type:"decimal",scale:s,precision:p

FLOAT

number

double

FLOAT(n), 1<=n<=24

number

float

FLOAT(n), 25<=n<=53

number

double

BOOLEAN

boolean

boolean

DATE

string ('YYYY-MM-DD')

type:"int",logical type:"date" (days from UNIX 0)

TIME

string ('HH:MM:SS.NNNNNNNNN')

type:"long",logical type:"time-micros"

CLOB/NCLOB

string

string

SECONDDATE

string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ')

type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0)

TIMESTAMP

string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ')

type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0)

VARCHAR(n)

string

string

NVARCHAR(n)

string

string

ALPHANUM(n)

string

string

SHORTTEXT(n)

string

string

VARBINARY(n)

string

bytes

Scenario: ABAP to Confluent

ABAP Data Dictionary Type (DDIC)

JSON Type

AVRO Type

INT1

number

int

INT2

number

int

INT4

number

int

INT8

number

long

DEC

number

type:"bytes", logical type:"decimal",scale:s,precision:d

DF16_DEC / D16D

number

type:"bytes", logical type:"decimal",scale:6,precision:28

DF34_DEC / D34D

number

type:"bytes",logical type:"decimal",scale:6,precision:38

FLTP

number

double

CURR

number

type:"bytes", logical type:"decimal",scale:s,precision:d

QUAN

number

type:"bytes", logical type:"decimal",scale:s,precision:d

DECFLOAT16 / D16N

number

type:"bytes", logical type:"decimal",scale:6,precision:28

DECFLOAT34 / D34N

number

type:"bytes",logical type:"decimal",scale:6,precision:38

DF16_RAW / D16R

number

type:"bytes", logical type:"decimal",scale:6,precision:28

DF34_RAW/D34R

number

type:"bytes",logical type:"decimal",scale:6,precision:38

RAW

string

bytes

LRAW

string

bytes

RAWSTRING / RSTR

string

bytes

SRAWSTRING / SRST

string

bytes

CHAR

string

string

LCHR

string

string

SSTRING / SSTR

string

string

DATS

string ('YYYY-MM-DD')

type:"int",logical type:"date" (days from UNIX 0)

DATN

string ('YYYY-MM-DD')

type:"int",logical type:"date" (days from UNIX 0)

TIMS

string ('HH:MM:SS.NNNNNNNNN')

type:"long",logical type:"time-micros"

ACCP

number

???

NUMC

string

string

CLNT

string

string

LANG

string

string

UTCLONG / UTCL

string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ')

type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0)

CUKY

string

string

UNIT

string

string

STRING / STRG

string

string

GEOM_EWKB / GGM1

string

string

TIMN

string ('HH:MM:SS.NNNNNNNNN')

type:"long",logical type:"time-micros"

Domain TZNTSTMPL

string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ')

type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0)

Domain TZNTSTMPS

string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ')

type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0)

Domain SYSUUID_X16 and SYSUUID

string

string

Domain SYSUUID_C22 SYSUUID_22

string

string

Domain SYSUUID_C26

string

string

Domain SYSUUID_C32 and SYSUUID_C

string

string

Domain SYSUUID_C36

string

string

In the final section we will have a look on the behavior of SAP Datasphere Replication Flows when they are scheduled and a Confluent cluster is selected as the target system.

 

5. Scenarios or what happens if...

In this section we assume that the Kafka topic a Replication Flow/Task is supposed to write to already exists in the target Confluent Cluster. By making different assumptions, we explain the behaviour of a Replication Flow/Task that is configured to use the already existing topic. We will again leverage our small Demo Table setup and we assume that the target Kafka topic Demo Table already exists.   

In general, the concept of the Kafka schema registry are always applied when a new schema is registered during a Replication Flow run.

Scenario 1: A schema entry in the schema registry does not exist.

A schema is registered in the Schema registry using the subject name strategy that is specified in the Replication Flow. The messages are written by the Replication Flow into the already existing topic.

Scenario 2: A schema entry in the schema registry exists but with a different subject Naming strategy that is specified in the Replication Flow

A new schema is registered in the Schema registry using the subject name strategy that is specified in the Replication Flow. The messages are written by the Replication Flow into the already existing topic.

Figure 15 - Demo Table Example for Scenario 2Figure 15 - Demo Table Example for Scenario 2

Scenario 3: A schema entry in the schema registry exists with the same subject name strategy that is specified in the Replication Flow

Assumption A: The compatibility type that is specified in the Replication Flow coincides with the compatibility type of the already existing schema/subject definition and the new schema definition is different from the already existing one but complies with the compatibility type.

  • A new schema version is registered based on the compatibility type that was specified for the existing schema/subject. The messages are written by the Replication Flow into the already existing topic.
    Figure 16 - Demo Table Example for Scenario 3 Assumption AFigure 16 - Demo Table Example for Scenario 3 Assumption A

Assumption B: The compatibility type that is specified in the Replication Flow is different from the compatibility type of the already existing Schema/Subject definition, but the schema definition is the same.

  • The compatibility type is overwritten, and no new schema is registered in the schema registry. The messages are written by the Replication Flow into the already existing topic.
    Figure 17 - Demo Table Example for Scenario 3 Assumption B Part IFigure 17 - Demo Table Example for Scenario 3 Assumption B Part I
  • Watch out that such a change may produce schema version chains with changes that do not comply with the compatibility type that is specified in the schema definition.
    Figure 18 - Demo Table Example for Scenario 3 Assumption B Part IIFigure 18 - Demo Table Example for Scenario 3 Assumption B Part II

Assumption C: The compatibility type that is specified in the Replication Flow is different from the compatibility type of the already existing schema/subject definition and the schema definition is different but complies with the new compatibility type.

  • The old compatibility type is overwritten and a new schema version for the already existing subject is registered in the schema registry applying the new compatibility type
    Figure 19 - Example: Compatibility type Backward is overwritten by Forward and a new schema version is registered that introduces a forward compatible change.Figure 19 - Example: Compatibility type Backward is overwritten by Forward and a new schema version is registered that introduces a forward compatible change.

Assumption D : The compatibility type that is specified in the Replication Flow/Task is the same as the one that is specified in the already existing Schema/Subject definition but the schema in the Replication Flow is different or introduces a change that does not comply with the compatibility type.

  • The Replication Flow fails with an error message that indicated that the new message schema that is supposed to be registered in the schema registry is incompatible with the already existing one.
    Figure 20 - Demo Table Example for Scenario C Assumption DFigure 20 - Demo Table Example for Scenario C Assumption D

Summary

In this blog post we introduced new integration capabilities of SAP Datasphere with Confluent Cloud and Confluent Platform. The intention was to provide as many details as possible and provide step-by-step guides.

5 Comments