This blog is part of a blog series from SAP Datasphere product management with the focus on the Replication Flow capabilities in SAP Datasphere:
Replication Flow Blog Series Part 1 – Overview | SAP Blogs
Replication Flow Blog Series Part 2 – Premium Outbound Integration | SAP Blogs
Replication Flows Blog Series Part 3 – Integration with Kafka
Replication Flows Blog Series Part 4 – Sizing
Replication Flows Blog Series Part 5 – Integration between SAP Datasphere and Databricks
Replication Flows Blog Series Part 6 – Confluent as a Replication Target
Data Integration is an essential topic in a Business Data Fabric like SAP Datasphere. Replication Flow is the cornerstone to fuel SAP Datasphere with data, especially from SAP ABAP sources. There is also a big need to move enriched data from SAP Datasphere into external environments to succeed certain use cases.
In this part of the Replication Flow Blog series we focus on the usage of Confluent as a target in a Replication Flow. We will explain in detail the new capabilities that have been introduced with SAP Datasphere release 2024.08. The content of this blog is structured as follows.
The purpose of the additional features that are described in this Blog, next to the generic Kafka integration that has been available since end of last year, is to provide tailor-made integration with the managed Kafka offerings Confluent Cloud and Confluent Platform of our dedicated partner Confluent.
The Confluent integration described in this blog is only usable in SAP Datasphere Replication Flows.
For the examples and step-by-step instructions in the blog, we assume that a properly configured SAP Datasphere tenant and a Confluent Cloud cluster are available. In addition, we assume that the reader is familiar with the basic concepts around Replication Flows and Connection Management in SAP Datasphere as well as with the Kafka capabilities of Confluent Cloud.
Beginning of this year a new connection type was introduced for Apache Kafka to offer basic integration with Apache Kafka based sinks. With the 2024.08 release of SAP Datasphere there is a new dedicated connection type introduced that is tailored to Confluent in order to support Confluent specific capabilities like Confluents built-in schema-registry.
Here are the different configuration options for Confluent.
Connection Details
Cloud Connector (only available if System Type Confluent Platform is chosen)
Authentication
The following SASL based authentication methods are supported.
For Confluent Cloud:
Authentication Type | SASL Authentication Type | Properties |
No Authentication | n/a | |
API Key and Secret | PLAIN | API Key* |
*mandatory
For Confluent Platform:
Authentication Type | SASL Authentication Type | Properties |
No Authentication | n/a | |
User Name and Password | PLAIN | Kafka SASL User Name* |
Salted Challenge Response Authentication Mechanism (256) | SCRAM256 | Kafka SASL User Name* |
Salted Challenge Response Authentication Mechanism (512) | SCRAM 512 | Kafka SASL User Name* |
Kerberos with User Name and Password | GSSAPI | Kafka Kerberos Service Name* |
Kerberos with Keytab File | GSSAPI | Kafka Kerberos Service Name* |
*mandatory
Security
Transport Layer Security (TLS) settings for encryption as well as server certification validation are supported for system type Confluent Cloud. In addition, Confluent Platform offers client certificate validation via mTLS.
Schema Registry
Additional configuration options to specify schema registry endpoint and credentials.
Remark: A schema registry configuration is mandatory when creating connections of type Confluent.
Connections of connection type Confluent can currently only be used as targets in Replication Flows.
Example: Creating a connection to Confluent Cloud in SAP Datasphere
We are now showcasing how to create a connection to Confluent Cloud including schema registry configuration. The case of Confluent Platform is similar assuming that a Cloud Connector has been configured properly.
Use the Bootstrap Server URL(s) of your Confluent cluster as an entry in the Kafka Brokers property.
Choose User Name And Password as authentication type and use an API key and secret pair as Kafka SASL User Name and Kafka SASL Password.
Remark: It is assumed that the owner of the API key has sufficient rights to create/delete and access topics.
Use the Stream Governance API Endpoint as the Schema Registry URL.
Choose User Name And Password as the authentication type for the Schema Registry and use an API key and secret pair as User Name and Password, respectively.
Remark: It is assumed that the owner of the API credentials has sufficient rights to create and update schemas.
For the remainder of this blog post, we will describe the capabilities of the Confluent integration with Replication Flows alongside the following source data set that is assumed to be stored in a local table (Business Name: Demo Table) in SAP Datasphere.
Let’s assume we want to replicate this table into a Confluent Cloud instance. We create a corresponding Replication Flow design time artifact, select SAP Datasphere and the local table Demo table, respectively, as source and chose the connection CONFLUENT_DEMO (see section 1) as the sink for the Replication Flow.
The following screenshots highlights the configuration options for the Confluent sink.
Compared to the usage of the Apache Kafka connection type there are the following additional configuration options Use Schema Registry, Record Name, Subject Name Strategy, Compatibility Type and Clamp Decimal Floating Point Data Types. They are described in the following table together with the other properties.
Setting | Value | Explanation |
Replication Thread Limit | Number | The number of parallel replication threads that can be executed during the replication process. Only available in Global configuration. |
Number of Partitions | Number | The number of Kafka Partitions for the target Kafka topic. |
Replication Factor | Number | The Kafka replication factor for the Kafka topic. |
Message Encoder | AVRO or JSON | The message format for the Kafka topic. |
Message Compression | No Compression | The compression method for the Kafka messages that are sent to a Kafka topic. |
Use Schema Registry | True or False |
False: Schema Registry is not used |
Topic Name | string | The name of the Kafka topic to be used as the target. It can only be changed by renaming the target object. |
Record Name | string | The record name that is used for the schema registry entry when the subject name strategy is applied. It is also referenced in the schema definition itself. |
Subject Name Strategy | Topic | Choose the subject name strategy for the schema. |
Compatibility Type | Default | Choose the compatibility type for the schema registry subject. |
Clamp Decimal Floating Point Data Types | True | Immutable setting (always True) |
Overwrite Target Settings at Object Level | True or False |
|
The settings on Replication Task level take precedence over the settings on Replication Flow level unless the Overwrite Target Settings at Object Level checkbox is checked in the settings on Replication Flow level.
After the Replication Flow has been deployed and run the Kafka topic was created and a corresponding schema registry entry was created (see section 4 for details on schema and message creation).
Remark:
As it was the case with the generic Kafka integration, for Confluent we support a Truncate flag. If set and in case the target Kafka topic for the replication is already existing in the Confluent cluster, it is deleted an recreated. This also deletes all messages that are assigned to the topic.
In the following section we will have a closer look on schema and message creation.
Before describing the details regarding schema and message creation, we start with a listing of the producer configurations that are used in SAP Datasphere Replication Flows. The parameter values are fixed and cannot be changed in SAP Datasphere.
Kafka Producer Configuration Parameter | Value used by Replication Flows | Remark |
1048576 (1MB) | Confluent Default | |
30 seconds | Confluent Default | |
5 | Confluent Default | |
100 | Confluent Default | |
all (-1) | Confluent Default | |
3 |
|
The following section is only applicable in case the Use Schema registry toggle is activated (true) in the Replication Flow/Task properties panel. In this case schemas a written to the schema registry as described in the following paragraphs.
No schema definition for the Kafka message key (e.g. […]-key) is created in Confluent. Instead, the message key is a string consisting of the primary key values of the source dataset separated by underscores.
If, in case of our example above, we would have the two columns ID and First_Name together as a primary key, then the generated message keys would look as follows.
The schema definition for the Kafka message body is constructed and created based on the structure/schema of the source dataset, the configuration settings of the executed Replication Flow and the change data capture mechanisms of SAP Datasphere. The following screenshot shows how the different settings are fed into the Kafka schema.
The corresponding Confluent schema and subject names, the serialization format and the compatibility mode are derived from the configuration values of the Replication Flow and Replication Task definition, respectively.
The schema definition for the message body itself is derived from the source dataset definition and the metadata for the delta information of the Replication Flow. In our example the schema of the Demo Table is translated into an AVRO schema that contains the four fields ID, First_Name, Second_Name and Age as well as the three delta columns that are always added and which contain the Replication Flow specific delta information. The AVRO type of the message body is always record.
In case the serialization format is set to JSON a corresponding JSON schema is registered in the Confluent schema registry that follows the JSON Schema standard. The following schema definition would have been written to the Confluent schema registry in case JSON serialization format would have been used in our example above:
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"properties": {
"Age": {
"anyOf": [
{
"maximum": 9223372036854776000,
"minimum": -9223372036854776000,
"type": "integer"
},
{
"type": "null"
}
]
},
"First_Name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"ID": {
"anyOf": [
{
"maximum": 9223372036854776000,
"minimum": -9223372036854776000,
"type": "integer"
},
{
"type": "null"
}
]
},
"Second_Name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"__operation_type": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"__sequence_number": {
"anyOf": [
{
"maximum": 18446744073709552000,
"minimum": 0,
"type": "integer"
},
{
"type": "null"
}
]
},
"__timestamp": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
]
}
},
"title": "Demo_Table",
"type": "object"
}
The topic itself is created and configured based on the Replication Flow/Task settings Number of Partitions, Replication Factor and Topic Name. All other topic specific configuration parameters are inherited from the Confluent Cluster settings.
For initial of delta loads during a Replication Flow run for each row in the source data set exactly one Kafka message is added to the target Kafka topic.
In case of our example (initial load of a SAP Datasphere table) the final Kafka messages look as follows (an overview of the Kafka topic is shown in figure 10 above).
The next section contains an overview of type mappings between selected source systems and JSON/AVRO schema types.
The following two tables contain the data type mappings for the scenarios where SAP HANA/SAP Datasphere or ABAP artifacts are chosen as a source.
Scenario: SAP HANA/SAP Datasphere to Confluent
SAP HANA Type | JSON Type | AVRO Type |
TINYINT | number | int |
SMALLINT | number | int |
INTEGER | number | int |
BIGINT | number | long |
REAL | number | float |
DOUBLE | number | double |
SMALLDECIMAL | number | type:"bytes",logical type:"decimal",scale:6,precision:28 |
DECIMAL | number | type:"bytes",logical type:"decimal",scale:6,precision:38 |
DECIMAL(p,s) | number | type:"bytes", logical type:"decimal",scale:s,precision:p |
FLOAT | number | double |
FLOAT(n), 1<=n<=24 | number | float |
FLOAT(n), 25<=n<=53 | number | double |
BOOLEAN | boolean | boolean |
DATE | string ('YYYY-MM-DD') | type:"int",logical type:"date" (days from UNIX 0) |
TIME | string ('HH:MM:SS.NNNNNNNNN') | type:"long",logical type:"time-micros" |
CLOB/NCLOB | string | string |
SECONDDATE | string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ') | type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0) |
TIMESTAMP | string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ') | type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0) |
VARCHAR(n) | string | string |
NVARCHAR(n) | string | string |
ALPHANUM(n) | string | string |
SHORTTEXT(n) | string | string |
VARBINARY(n) | string | bytes |
Scenario: ABAP to Confluent
ABAP Data Dictionary Type (DDIC) | JSON Type | AVRO Type |
INT1 | number | int |
INT2 | number | int |
INT4 | number | int |
INT8 | number | long |
DEC | number | type:"bytes", logical type:"decimal",scale:s,precision:d |
DF16_DEC / D16D | number | type:"bytes", logical type:"decimal",scale:6,precision:28 |
DF34_DEC / D34D | number | type:"bytes",logical type:"decimal",scale:6,precision:38 |
FLTP | number | double |
CURR | number | type:"bytes", logical type:"decimal",scale:s,precision:d |
QUAN | number | type:"bytes", logical type:"decimal",scale:s,precision:d |
DECFLOAT16 / D16N | number | type:"bytes", logical type:"decimal",scale:6,precision:28 |
DECFLOAT34 / D34N | number | type:"bytes",logical type:"decimal",scale:6,precision:38 |
DF16_RAW / D16R | number | type:"bytes", logical type:"decimal",scale:6,precision:28 |
DF34_RAW/D34R | number | type:"bytes",logical type:"decimal",scale:6,precision:38 |
RAW | string | bytes |
LRAW | string | bytes |
RAWSTRING / RSTR | string | bytes |
SRAWSTRING / SRST | string | bytes |
CHAR | string | string |
LCHR | string | string |
SSTRING / SSTR | string | string |
DATS | string ('YYYY-MM-DD') | type:"int",logical type:"date" (days from UNIX 0) |
DATN | string ('YYYY-MM-DD') | type:"int",logical type:"date" (days from UNIX 0) |
TIMS | string ('HH:MM:SS.NNNNNNNNN') | type:"long",logical type:"time-micros" |
ACCP | number | ??? |
NUMC | string | string |
CLNT | string | string |
LANG | string | string |
UTCLONG / UTCL | string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ') | type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0) |
CUKY | string | string |
UNIT | string | string |
STRING / STRG | string | string |
GEOM_EWKB / GGM1 | string | string |
TIMN | string ('HH:MM:SS.NNNNNNNNN') | type:"long",logical type:"time-micros" |
Domain TZNTSTMPL | string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ') | type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0) |
Domain TZNTSTMPS | string ('YYYY-MM-DDTHH:MM:SS.NNNNNNNNNZ') | type:"long",logical type:"timestamp-micros" (microseconds after UNIX 0) |
Domain SYSUUID_X16 and SYSUUID | string | string |
Domain SYSUUID_C22 SYSUUID_22 | string | string |
Domain SYSUUID_C26 | string | string |
Domain SYSUUID_C32 and SYSUUID_C | string | string |
Domain SYSUUID_C36 | string | string |
In the final section we will have a look on the behavior of SAP Datasphere Replication Flows when they are scheduled and a Confluent cluster is selected as the target system.
In this section we assume that the Kafka topic a Replication Flow/Task is supposed to write to already exists in the target Confluent Cluster. By making different assumptions, we explain the behaviour of a Replication Flow/Task that is configured to use the already existing topic. We will again leverage our small Demo Table setup and we assume that the target Kafka topic Demo Table already exists.
In general, the concept of the Kafka schema registry are always applied when a new schema is registered during a Replication Flow run.
Scenario 1: A schema entry in the schema registry does not exist.
A schema is registered in the Schema registry using the subject name strategy that is specified in the Replication Flow. The messages are written by the Replication Flow into the already existing topic.
Scenario 2: A schema entry in the schema registry exists but with a different subject Naming strategy that is specified in the Replication Flow
A new schema is registered in the Schema registry using the subject name strategy that is specified in the Replication Flow. The messages are written by the Replication Flow into the already existing topic.
Scenario 3: A schema entry in the schema registry exists with the same subject name strategy that is specified in the Replication Flow
Assumption A: The compatibility type that is specified in the Replication Flow coincides with the compatibility type of the already existing schema/subject definition and the new schema definition is different from the already existing one but complies with the compatibility type.
Assumption B: The compatibility type that is specified in the Replication Flow is different from the compatibility type of the already existing Schema/Subject definition, but the schema definition is the same.
Assumption C: The compatibility type that is specified in the Replication Flow is different from the compatibility type of the already existing schema/subject definition and the schema definition is different but complies with the new compatibility type.
Assumption D : The compatibility type that is specified in the Replication Flow/Task is the same as the one that is specified in the already existing Schema/Subject definition but the schema in the Replication Flow is different or introduces a change that does not comply with the compatibility type.
In this blog post we introduced new integration capabilities of SAP Datasphere with Confluent Cloud and Confluent Platform. The intention was to provide as many details as possible and provide step-by-step guides.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
20 | |
8 | |
8 | |
8 | |
7 | |
6 | |
6 | |
6 | |
5 | |
5 |