Skip to Content
Technical Articles
Author's profile photo Jens Kordowski

Cloud Integration – What You Need to Know About the Kafka Adapter

This blog post describes the Kafka adapter of SAP Cloud Integration.

Introduction

Apache Kafka is an event streaming platform. It is often described as a publish/subscribe messaging system or as a distributed commit log. Kafka stores key-value messages (records) in topics that can be partitioned. Each partition stores these records in order, using an incremental offset (position of a record within a partition). Records are not deleted upon consumption, but they remain until the retention time or retention size is met on the broker side. Until then, messages may be re-processed again and again by one or multiple (different) consumers.

To optimize efficiency, the underlying messaging protocol is TCP-based. Messages are usually grouped together to reduce network overhead, which results in larger network packages.

Kafka runs on a cluster of one or more servers (brokers) and all topics and partitions are distributed (and replicated) across these brokers. This architecture allows you to distribute the load and increase the fault-tolerance. Each broker that is part of a cluster acts as a leader for certain partitions and acts as a replica for partitions being led by other brokers.

Kafka%20Cluster%20%5B1%5D

Kafka Cluster [1]

Records & Batches

Kafka messages or records are key-value pairs (with a timestamp). The optional key is typically used to send similar records to one partition. Records that are produced for the same topic and the same partition are usually grouped into batches to reduce the network overhead which results in larger network packages. Batches may be compressed to reduce their size and optimize data transfer. The additional CPU time required for compression is often negligible.

Topics & Partitions

Kafka records are sent to (and retained in) so-called topics. Kafka topics are divided into one or several partitions. Records are assigned an (incremental) offset within a partition and are retained until the defined retention time or retention size on the broker is reached. This prevents infinite retention of records on the broker. The records may be consumed multiple times and without any limitations as long as they are retained on the broker.

Topic names are restricted as follows:

  • [a-zA-Z0-9._-]
  • max. 249 characters
  • use either . (dot) or _ (underscore), but not both to avoid collisions

Topic%20with%204%20partitions%20%5B1%5D

Topic with 4 partitions [1]

The Kafka Sender Adapter

The Kafka sender adapter fetches Kafka record batches from one or more topics. You can find detailed information about all possible parameters in the Configure the Kafka Sender Adapter documentation.

The Connection tab contains basic connectivity-settings to address the broker. You can maintain more than one bootstrap server to support the case of broker-related unavailability. Cloud Integration offers SASL and Client Certificate authentication options. In Kafka terms, this translates to the following supported security protocols: SSL, SASL_PLAINTEXT and SASL_SSL with supported SASL mechanisms PLAIN and SCRAM-SHA-256.

Sender%20Adapter%20-%20Connection%20tab

Sender Adapter – Connection tab

Note: Any change of key or trust material requires a restart (or redeployment) of the integration flow.

SASL

In order to enable SASL_PLAINTEXT

  • the broker has to support SASL_PLAINTEXT (listener configuration, keystore and truststore configuration ..)
  • Authentication has to be set to SASL
  • Connect with TLS has to be disabled
  • Credential Name needs to point to a deployed credential

In order to enable SASL_SSL

  • the broker has to support SASL_SSL (listener configuration, keystore and truststore configuration ..)
  • the Fully-Qualified Domain Name (FQDN) of the broker has to fit to the Common Name (CN) or Subject Alternative Name (SAN) of its certificate (hostname verification)
  • Authentication has to be set to SASL
  • Connect with TLS has to be enabled
  • Credential Name needs to point to a deployed credential
  • the root certificate of the broker has to be added to the keystore (you can usually use the TLS Connection Test to download the certificate)

Client Certificate

In order to enable SSL

  • the broker has to support SSL (listener configuration, keystore and truststore configuration ..)
  • the FQDN of the broker has to fit to the CN or SAN of its certificate (hostname verification)
  • Authentication has to be set to Client Certificate
  • Private Key Alias needs to point to a Key Pair in the keystore
  • the root certificate of the broker has to be added to the keystore (you can usually use the TLS Connection Test to download the certificate)

The Processing tab allows you to specify the topic(s) to subscribe to. You can configure one or more (comma-separated) topics, as well as a simple pattern like MyTopic* to consume from all topics starting with MyTopic (e.g. MyTopic1, MyTopic2).

Sender%20Adapter%20-%20Processing%20tab

Sender Adapter – Processing tab

Consumer Groups

Kafka consumers are typically part of a consumer group. Different consumer groups are separated from each other, even if they consume from the same topics.

The integration flow id (initial integration flow name upon creation) is used as consumer group. This ensures that other integration flows cannot interfere with record consumption. Other integration flows may consume from the same topics, but they will do so within their own, individual consumer group.

If a consumer group loses all of its consumers, the broker will remember consumer groups and their last consumed offsets per partition for a defined period of time (broker setting: offsets.retention.minutes). This way, a consumer can reconnect to its group to continue where the group stopped (i.e. downtimes, software updates, un- or redeployments).

If a consumer group is not yet (first deployment) or not anymore (offset retention) known to the broker, a new consumer starts from the latest (newest) offset. It does not consume all records of a topic from the beginning, but listens for new records only. The consumer might also encounter a situation in which the last-known offset (and subsequent ones) does not exist anymore (partition size exceeded threshold or record is expired, both broker configurations). In such situations, the consumer also resets to the latest offset (auto.offset.reset = latest).

No support for user-configurable consumer groups

Cloud Integration does not offer user-defined consumer group definitions in the sender adapter (e.g. in a text field). This is a conscious decision based on several reasons:

  • Avoid negative impact of productive scenarios from the sidelines by other groups or people working on other integration flows.
    • This can happen by accident and might not be unlikely, in case you define own consumer groups in all integration flows.
    • Copying integration flows can be very critical! (conflicts may start immediately or delayed, as outlined further below)
  • We lose the clear relation between integration flow and consumer group. This relation is usually desirable (especially in exceptional / support cases)
    • You might claim that messages have not been processed and neither you, nor Cloud Integration support can be certain that no other integration flow has been processing the same (maybe for some time, e.g. in a conflict-case).
    • There is no historic data of previous configurations and involved integration flows might already been gone.
  • Conflicting configuration of different integration flows might not be visible immediately.
    • There is only one consumer per topic partition (per consumer group), but there might be more consumer threads on client side waiting to get a chance to bind to an empty topic partition.
    • Depending on the context, the problem might only appear after software update, tenant restart (or crash), integration flow un- or re-deployments, broker rebalancing etc. (and maybe it takes several restarts, since the consumer threads will compete with each other)
  • The current behavior is conflict-free across integration flows on one tenant.

If you are in need of a specific consumer group name, you can always create the integration flow with exactly the required name.

Parallel Consumption

Within one consumer group, only one Kafka consumer may fetch records from one partition. Topics can be consumed in parallel, if

  • the topic is divided into multiple partitions and
  • several consumers are spawned within the consumer group.

Example: Topic A with 3 partitions (0, 1 and 2)
Records from this topic can be consumed in parallel by up to 3 consumers (one per partition).

Cloud Integration allows you to define the number of Parallel Consumers within a range of 1 to 25. This configuration scales with the number of worker nodes. To distribute the load across these nodes, it is necessary to configure a value of Parallel Consumers of less than the number of partitions. Otherwise one worker node creates the necessary consumers and hooks onto all partitions before a second worker node has the chance to do the same.

Example: Topic B with 24 partitions and Parallel Consumers set to 24 (3 worker nodes)
All 3 worker nodes spawn 24 processing threads, while most probably only the first node is able to connect to all 24 partitions. The other worker nodes do not consume any messages from Topic B. To balance this fairly across all 3 worker nodes, Parallel Consumers need to be set to 8. As the number of worker nodes may change over time, it makes sense to leave ‘some room’ for later, i.e. to set the configuration to 5. This means that one consumer might read from several partitions, until there are more consumers than partitions.

For the above reason, it makes sense to leave this setting on its default value unless performance requirements and test results show a need to raise it. In many cases (depending on the scenario, of course), a single consumer is perfectly capable to handle many partitions and messages.

Retry Handling

A message fetched by the Kafka sender adapter may fail during processing. The sender may either skip the failed message and continue with the next offset (at most once semantics unless the consumer dies within auto.commit.interval.ms) or retry the failed message (at least once semantics within the record retention on the broker). The option to retry a failed message can potentially lead to infinite retries. Therefore, it is recommended to handle failed messages in an exception subprocess.

The Kafka sender adapter offers additional configuration options to define retry intervals and reconnect intervals. High-frequent retries can put considerable load onto the broker (and to Cloud Integration), which can be avoided by increasing the intervals (meaning: slowing down).

The retry behavior is not easy to comprehend at first. Let us look behind the curtains with some examples. In the examples, we consider one topic with 6 partitions. Trying to consume a message from partition 6 leads to an error.

Retry%20Behavior%3A%20Example%201

Retry Behavior: Example 1

The single consumer C1 fetches messages from all 6 partitions. As defined, it fails when trying to process a record fetched from partition 6. Since Error Handling is set to Retry Failed Message, the consumer disconnects from the broker and waits for the configured Max. Retry Backoff (in ms) before reconnecting. In the meantime, no record is consumed from any partition.

Now we add a second worker node to the picture.

Retry%20Behavior%3A%20Example%202

Retry Behavior: Example 2

Both consumers C1 and D1 are part of the same consumer group. Hence, they collaborate with the message consumption of the topic, each working on 3 partitions. Once D1 fails and disconnects, a rebalance takes place on broker side and C1 takes over the other 3 partitions in addition. As C1 also fails when trying to process a message of partition 6, it potentially leaves all partitions unattended until D1 reconnects. This depends on the configured Max. Retry Backoff (in ms) and the time the broker needs to finish the rebalance.

The exact same situation may occur on a single node with Parallel Consumers set to 2 as depicted below.

Retry%20Behavior%3A%20Example%203

Retry Behavior: Example 3

In reality, the setup usually looks more complex. The last example tries to provide a glimpse on that.

Retry%20Behavior%3A%20Example%204

Retry Behavior: Example 4

I outlined a single worker with 6 parallel consumers. We can multiply this by additional worker nodes to complicate this further, but I think you get the picture. This time I reduced the Max. Retry Backoff (in ms) so that C6 is back and ready for re-processing within 2 rebalances. In such a case, C5 and C6 take turns to work on the last two partitions. It also shows that the continuous failure of partition 6 also affects the processing of partition 5, while the processing of the other partitions continues to work without any problems.

Based on what we saw, Max. Retry Backoff (in ms) is not necessarily the time gap before the next processing attempt. This is the case in Example 1, but not in the other examples. If another consumer is available, it takes over in the meantime and the observed retry interval might just equal the rebalance time on the broker.

Fetch Optimization

The Kafka sender adapter fetches records in batches. It offers numerous configuration options to tweak the min. and max. sizes to fetch, limit the number of polled records and set a max. waiting time. Even when misconfigured, the sender adapter always tries to progress, i.e. it fetches a record even if it is larger than the defined fetch.max.bytes. But it does not add additional messages to the fetched batch, if the max. size has already been exceeded.

Sender%20Adapter%20-%20Advanced%20tab

Sender Adapter – Advanced tab

Automatic Commit

Consumed records / offsets will be committed automatically in 5 second intervals (auto.commit.interval.ms). If a message fails in between (with retry enabled), the last successfully executed offset is immediately committed. If the consumer shuts down (e.g. undeployment of the integration flow), it also commits the last offset that was processed.

This leaves one potential gap: If a rebalance happens on the broker side (e.g. a consumer / worker node crashes), there is a time window of up to 5 seconds of uncommitted offsets. These records are processed again, once the consumer is up and running again, since the broker is unaware of the processing state on the client.

Heartbeat

Each consumer comes with a processing and a heartbeat thread. The job of the heartbeat thread is to keep the connection alive and signal the liveliness to the broker. If no heartbeat is detected by the broker within the Session Timeout, the broker disconnects the consumer from its partitions and triggers a rebalance. It is recommended to set the Heartbeat Interval to less than 1/3 of the Session Timeout, so the client has at least three chances to reach the broker (i.e. to compensate for temporary network problems).

Max. Processing Time

Kafka offers a setting max.poll.interval.ms to identify if the processing thread has died. If the processing time exceeds the configured value, the processing thread is assumed to have died: In this case the consumer is being disconnected from the broker and a rebalance takes place. Another (or a new) consumer connects to the topic partition and tries to re-process this message.

It is possible that the problem is not with the processing thread however, but the processing time simply exceeds the configured value. This can potentially lead to infinite retries (disconnects, rebalance ..), if the process continues to take longer than the configured max.poll.interval.ms.

Cloud Integration decided to sacrifice this mechanism to some extent to avoid these kind of problems (which may only occur late in production, if something changes (i.e. message sizes)) and for simplicity reasons. The impact of misconfiguration is quite high.

Instead, Cloud Integration sets this setting to 1 day, which is considered high enough to avoid infinite retries (in almost all cases).

Message Headers

Stored Kafka record headers are automatically transformed into message headers and can be used within an integration flow (if configured in the Allowed Header(s) field of the Runtime Configuration). The same is true for the following, additional headers:

Header Type Description
kafka.TOPIC String The topic from where the message originated.
kafka.PARTITION Integer The partition where the message was stored.
kafka.OFFSET Long The offset of the message.
kafka.KEY Object The key of the message, if configured.
kafka.TIMESTAMP Long The timestamp of the message.

Kafka Receiver Adapter

The Kafka receiver adapter sends Kafka records (or batches) to exactly one topic (or partition). You can find detailed information about all possible parameters in the Configure the Kafka Receiver Adapter documentation.

The connection settings are identical to the sender side. Refer to the sender section above for further details.

Receiver%20Adapter%20-%20Connection%20tab

Receiver Adapter – Connection tab

The Processing tab allows you to define the topic to send messages to. This field also accepts a header (${header.myHeader}) or property (${property.myProperty}) reference to dynamically define the topic to send to during runtime. The Kafka record key can be defined by setting the header kafka.KEY upfront (i.e. using a Content Modifier). By setting the key of a record, you can influence the partition to send to. Same key means same partition, while the exact partition number is not important. You can also define the exact partition number with the help of the header kafka.PARTITION_KEY. The latter throws an exception, if the partition does not exist.

Receiver%20Adapter%20-%20Processing%20tab

Receiver Adapter – Processing tab

Batching

The Kafka receiver adapter can batch records to optimize network traffic. Batching usually happens during high load situations, as the producer does not wait infinitely for additional messages. It is possible to linger for a certain (short) period of time in case batching is more important than fast message processing. The Batch Size (in KB) operates on the uncompressed message sizes, optional compression happens afterwards.

Compression

The following compression algorithms are supported:

  • gzip
  • lz4
  • snappy
  • zstd

The record contains information about whether a message is compressed and which compression algorithm (of the above) is used. The Kafka consumer decompresses the record / batch automatically using the appropriate algorithm.

Max. Message Size

Kafka is not designed to handle large messages. It works best with messages that are huge in amount, but not in size. The broker can restrict the largest allowed record batch size (after compression) via message.max.bytes. Kafka does not put any restrictions on this setting, so it is capped at ~2GB (integer max. value). However, broker providers usually restrict this setting further (within a few megabytes, which makes a lot of sense). Check with your broker provider for more details.

The Kafka receiver adapter can restrict the max. uncompressed record batch size (before compression) via Max. Request Size (in KB). It is recommended to align this setting with the message.max.bytes of the broker. Sending requests to the broker, which exceed the broker limit, results in errors that are hard to detect and client retries. Cloud Integration limits the max. request size to 20MB.

Message Headers

Camel exchange headers are automatically transferred as Kafka record headers and vice versa. The following header types are supported (other types are skipped): String, Integer, Long, Double, Boolean, byte[]

Headers starting with Camel, org.apache.camel or kafka. are not propagated but filtered out.

Custom Timestamp

Usually, the timestamp being transferred along with the record reflects the time when the record is sent. By setting the header kafka.OVERRIDE_TIMESTAMP, you can specify a custom timestamp instead. The value of this header needs to be of type java.lang.Long, which is somewhat inconvenient (i.e. when trying to send along the timestamp via common REST clients). There are simple options to convert the data type using a Content Modifier or a Script step in Cloud Integration.

Custom or human-readable date formats are not supported.

Performance

This section is not intended as a performance guide or anything close to it. Instead, I want to provide some “food for thought” in this context.

  • Do you have a performance problem? Have you clearly defined your performance expectations / KPIs? Performance needs to be measured to assert that the requirements are met and to be prepared for adjustments.
  • Performance is not bound to the client and its configuration only (i.e. Cloud Integration), but it is highly influenced by other factors:
    • broker (hardware, configuration)
    • network
    • topic partitioning
    • record size
    • record distribution
    • etc.
  • Do not simply set the number of Parallel Consumers to its upper limit of 25 assuming this would improve performance. It might, but it might also degrade your performance (and stability) depending on your context. This article outlines that this setting scales with the number of worker nodes and our recommendation to leave some room. Start adjusting the configuration options, if you cannot meet your performance requirements (which requires clear KPIs and measurements as stated before).
  • There is no one-size-fits-all wrt. client configuration. Use the offered “advanced” configuration options of both sender and receiver adapters and monitor, if your performance improves as expected. If you don’t have a performance problem, stick to the defaults.

Troubleshooting

Missing Broker Certificate

If the client needs to connect to the broker using SSL or SASL_SSL (“Connect with TLS“), the root certificate of the broker needs to be part of the tenant’s keystore. If the certificate is missing, this usually results in error like below when trying to send a message to the broker:

com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target.

Usually, the certificate can easily be obtained by executing a TLS Connectivity Test directly from the tenant. Simply type in host and port of the broker, uncheck “Valid Server Certificate Required” and download the resulting certificates. Only the root certificate needs to be imported into the tenant’s keystore.

Remember to restart (or redeploy) already deployed kafka-related integration flows afterwards to take effect.

Incorrect Host Configuration

If the provided broker host does not exist, the integration flow deployment fails with the following error message:

[CAMEL][IFLOW][EXCEPTION] : org.apache.kafka.common.KafkaException: Failed to construct kafka <consumer / producer>  [CAMEL][IFLOW][CAUSE] : Cause: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

Double-check for typos or revert to your broker provider to get the correct connection details.

Missing Topic

Trying to send a record to a non-existing topic results in an error unless the broker is configured to create topics automatically (auto.create.topics.enable). If automatic topic creation is disabled, any attempt to send a message leads to the following error after the configured Max. Blocking Time (in s), which defaults to 30 seconds:

com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: Topic <topic name> not present in metadata after 30000 ms..

Create the topic on broker side or correct your configuration to solve this problem.

Trying to consume records from a non-existing topic leads to infinite client retries. These retries are not visible to the user, but create some load on the tenant and the broker. Due to current limitations, the integration flow appears to be running successfully (The Integration Flow is deployed successfully). Since the topic does not exist, you won’t see any messages being processed. You can revert to the Kafka Connectivity Test to check the connectivity upfront. It will list up to 200 topics, which are available on the broker side. The Kafka Connectivity Test is mentioned in the blog post Cloud Integration – Monitoring Polling Status in Kafka Sender Adapter.

Message Too Large

Sending a single message that is larger than the max. allowed size the broker / topic accepts (message.max.bytes) results in the following exception:

com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: The request included a message larger than the max message size the server will accept..

It is highly recommended to check the message.max.bytes setting of the broker and set the Max. Request Size (in KB) correspondingly (notice the different unit (bytes vs. KB)) in the receiver adapter. Of course, you will still see an error trying to send a message that is too large, but the size-verification now happens on the client-side and avoids putting additional, unnecessary load on the broker. The error message changes slightly:

com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: The message is <number of bytes> bytes when serialized which is larger than <configured max. request size in bytes>, which is the value of the max.request.size configuration..

If you require to send larger messages, you need to adjust the broker (or topic) configuration first and adapt the receiver adapter configuration afterwards.

Batch Too Large

There might be cases when a single message is smaller than the max. allowed size of the broker / topic, but a batch of several of these messages are larger than the max. allowed size. This situation leads to the following error:

com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: Expiring <n> record(s) for <topic>-<partition>:<time> ms has passed since batch creation.

The client tries to send the message many times before this error appears, putting considerable load on client and broker. It is highly recommended to align the message.max.bytes setting of the broker with the Max. Request Size (in KB) of the receiver adapter. The adapter makes sure that the Batch Size (in KB) is configured smaller than the Max. Request Size (in KB). Following this recommendation helps you to avoid this error situation completely.

No Tenant Separation

The design of the consumer group (being the integration flow id) avoids any conflicts between integration flows on the tenant. The same is not true across tenants (by choice).

Isolating tenants is not desirable in all cases. If you want to migrate content from one tenant to another tenant, you can easily export and import the content. The new tenant is able to join the existing consumer group and continue where the previous tenant stopped.

However, this means you need to take care when importing content as-is on any tenant. If the integration flow id remains unchanged (can be defined during import), a consumer is able to join an existing consumer group. This is not different from many other adapters (e.g. SFTP, MAIL, AMQP ..) polling data from somewhere.

Current Limitations

The following features are not yet available and may come with a later release of the adapter:

  • Schema registry support
  • Cloud Connector support
  • Integration Flow monitor does not reflect all connection problems of a consumer (some improvements are on its way as described in Cloud Integration – Monitoring Polling Status in Kafka Sender Adapter)
  • ksqlDB support
  • SASL/OAUTHBEARER
  • Consumption of arbitrary offsets
  • Consuming a certain partition only (as in contrast to consume from a topic)
  • Transactional delivery

The adapter can currently not connect to brokers, which are not exposed via the Internet. This limitation is based on a lack of proxy support of the kafka library itself, which itself is based on a lack of proxy support of the used java NIO library (https://bugs.openjdk.java.net/browse/JDK-8199457).

TL;DR

  • Security protocols SSL, SASL_SSL and SASL_PLAINTEXT supported with sasl mechanisms PLAIN and SCRAM-SHA-256.
  • “Connect with TLS” requires the root certificate of the broker in the tenant’s keystore. Run a TLS Connectivity Test to obtain it.
  • Any change of key or trust material requires a restart (or redeployment) of the integration flow.

Consumer (Sender adapter)

  • Consumer Group == Integration Flow ID (~ name)
  • auto.offset.reset = latest
  • Parallel Consumers (1 to 25) scale with the number of worker nodes; do not set this value to the number of partitions to leave room to scale (i.e. divide it by the number of worker nodes for fair load distribution)
  • auto.commit.interval.ms = 5000
  • max.poll.interval.ms = 86400000 (1 day)
  • key.deserializer = StringDeserializer
  • value.deserializer = ByteArrayDeserializer

Producer (Receiver adapter)

  • Topic field can be set dynamically (${header…} or ${property…})
  • Kafka key can be defined by manually setting the header kafka.KEY upfront (e.g. using a Content Modifier)
  • Partition can be defined by setting kafka.PARTITION_KEY
  • Custom timestamp can be defined by setting kafka.OVERRIDE_TIMESTAMP (datatype java.lang.Long)
  • max.request.size is limited to 20MB
  • buffer.memory = 32MB (per endpoint)
  • key.serializer = StringSerializer
  • value.serializer = ByteArraySerializer

Conclusion

This blog post started with some basics about Kafka and continued with basic configuration settings and requirements to connect to a Kafka broker. I hope this helps you to get a smooth start when integrating your Kafka cluster with Cloud Integration. We looked at several complex topics and internals, which might be of interest to you and which is useful together with the official Kafka documentation. The troubleshooting section highlights a few of the most common error messages and how to resolve or avoid them. Finally, I outlined some of the current limitations and condensed the most relevant information (for the technical readers upon you) in the last two sections.

References

[1] Kafka: The Definitive Guide – Neha Narkhede, Gwen Shapira, and Todd Palino (First Edition)

Assigned Tags

      29 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Sunil John
      Sunil John

      Nice to hear SAP’s integration offerings around Apache Kafka and stream processing.

      Author's profile photo Jeremy Ma
      Jeremy Ma

      Thanks Jens!  Well done, this is detail and concise!

      Author's profile photo Karl Bergstrom
      Karl Bergstrom

      Thx for a excellent blog!

       

      Author's profile photo Matthias Lüthi
      Matthias Lüthi

      Hi Jens

      Thx for this great blog.

      Short question about the support with the Cloud Connector?

      Did you have any information which time SAP will release the support with the Cloud Connector for the kafka reciever adapter?

      Thx and regards

      Matthias

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Matthias,

      this limitation is based on a lack of proxy support in kafka itself. After clarification with the kafka developers, this does not even seem to be on their roadmap, currently. Hence, this will not come any time soon and there is no release date.

      Best regards

      Jens

      Author's profile photo Matthias Lüthi
      Matthias Lüthi

      Hi Jens

      Thx for your replay.

      So the Kafka Adapter will not be usable on the CPI for a long time or SAP itself has a solution for this problem.

       

      Best regards

      Matthias

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Matthias,

      well the Kafka adapter is usable on Cloud Integration. But it is not usable with brokers, which are not exposed to the Internet.

      Best regards

      Jens

      Author's profile photo Faraz Anwar
      Faraz Anwar

      Excellent and comprehensive blog!

      One thing I would appreciate is in the Introduction, a sentence or two on why it should be used (common usecases) and if SAP has an equivalent service in BTP (re: Event Mesh?).

      Thanks and best regards!

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Faraz,

      there are dozens of online articles trying to answer this from a generic point of view. I might be missing good use cases and highlighting unimportant ones (very subjective), therefore rather skipping this all along. You obviously want to use the adapter, if you already have a kafka broker or need to integrate with one of another party. People starting out usually want to know what is their best option compared to other / similar technologies. As the devil is in the details and requires good knowledge of all compared technologies, I wouldn't dare to start a comparison. It will always contain flaws, might be slightly different in a certain use case, potentially even misleading customers. This would definitely be worth an own blog post.

      SAP BTP does not offer kafka brokers for external usage that I know of.

      Best regards

      Jens

      Author's profile photo Alejandro Sainz
      Alejandro Sainz

      Hi Jens,

      Thanks for sharing. We are already using Kafka Sender Adapter in Cloud Integration in one production scenario.

      However missing the Schema Registry functionality gave us a hard time as we had to convert the messages on the kafka side so they could be interpreted by CPI.

      Do you have any information when this functionality will be released?

       

      Thanks!

      Alex.

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Alex,

      we are already looking into schema registry support. No promises or forecast on a release date or feature-set from my end.

      What kind of schemas do you use and what exactly do you intend to do with defined data types within Cloud Integration?

      Best regards

      Jens

      Author's profile photo Maik Offerle
      Maik Offerle

      HI Jens Kordowski ,

      any news when the limitation for only auto.offset.reset = latest is solved?

      As this leads to issues in our projects as the Topics need to be recreated by the Kafka team every time. And we can not really decide from what offset to start

      Thanks and regards

      Maik

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Maik,

      what exactly do you require and why do you need to re-create topics? Consumption of arbitrary offsets has some side-effects / risks, if simply applied.

      • Let us assume you want to start with offset #3. At some point in the future (latest offset could be #4890954), somebody forgets to adapt this value, but redeploys the scenario (maybe fixed something else, but did not think about the kafka sender). The client would again try to start with offset #3 leading to hundreds of thousands message processings. -> easy to make a mistake
      • Sofware update: New nodes are started and the integration flow will be deployed on the new nodes. You most certainly do not want to start with a previously defined offset. After all, you do not even control the node restart.
      • A node might crash (maybe for unrelated reasons). The node will be replaced, the scenario would start over from the defined offset.

      While I don't like the first point above, the second and third ones are unacceptable as outlined. Hence, we need to take some precautions to avoid these problems and this is what makes this story complicated. (And those are just a few examples, there are more things to consider.)

      I cannot comment on release dates for any feature increment. Feel free to add new requests in our influence campaign: https://influence.sap.com/sap/ino/#/campaign/2282

      Best regards

      Jens

      Author's profile photo Maik Offerle
      Maik Offerle

      Hi Jens Kordowski ,

      The issue is that we are using Kafka not only for messaging but heavily for really storing data. See the following blog for more info: https://www.confluent.io/blog/okay-store-data-apache-kafka/

      This means it can happen from time to time (new field is introduced or other things) that a complete reload of all the data stored in the topics is needed.

      But at the moment there is a limitation for only auto.offset.reset = latest

      This leads to issues as topics need to be recreated and reconnected. We would like to have here more possibilities to work e.g. with auto.offset.reset = earliest

      See for more info: https://stackoverflow.com/questions/48320672/what-is-the-difference-between-kafka-earliest-and-latest-offset-values

      I created the Influence: https://influence.sap.com/sap/ino/#/idea/275105

      Thanks and regards

      Maik

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Maik,

      I agree this is a valid (yet risky) option in certain scenarios. Thanks for sharing your use-case!

      Best regards

      Jens

      Author's profile photo Sebastian Alvarez
      Sebastian Alvarez

      Hi Jens Kordowski great blog and very clear!

      I would like to know if there any update about Schema Registry and Avro.

      Regards,

      Sebastian

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Sebastian,

      it is on the roadmap, but I cannot promise any release dates. Feel free to add new requests (or support existing ones) in our influence campaign: https://influence.sap.com/sap/ino/#/campaign/2282

      Best regards

      Jens

      Author's profile photo Stanislav Deviatov
      Stanislav Deviatov

      Hi Jens Kordowski great blog and very clear!

      I wold like if there any updates about Transactional delivery and Exactly-once delivery (KIP-98).

       

      Best regards,

      Stanislav

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Stanislav,

      there is no update in that regard.

      Best regards

      Jens

      Author's profile photo Jan Engelmohr
      Jan Engelmohr

      Hi Jens Kordowski

       

       

      thank you for the blog 🙂 Do you know how the Kafka adapter deals with enable.idempotence on Receiver / Producer side? Is this option always enabled? I could not find anything in the official docs either, that's why I'm asking here 😉

       

       

      Best regards,

      Jan

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Jan,

      enable.idempotence is set to false and not configurable today.

      Best regards

      Jens

      Author's profile photo Rajesh PS
      Rajesh PS

      Hi Jens Kordowski

      Nice blog indeed. Cheers to this!

      I would like to know if kafka adapter supports SAP CPI fully fledged & how flexible it is (in comparison with SAP PO Kafka adapter) ?

      Secondly does SAP CPI supports schema registry, serialization as of now, and also how about the avro & json conversions?

      Is it a tactical long term reliable solution to use via SAP CPI Cloud?

      Also not sure about the license/subscription  cost ? post using Kafka adapter which should not ideally not end up in capacity or feature constraints and each has it pros and cons?

      Looking forward for your valuable thoughts in elucidate. Thanks in advance!

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Rajesh

      I am not sure what you mean by "supports CPI fully fledged". The documentation and this blogs is quite comprehensive in what the adapter is capable of and which limitations exist. Can you provide more details what exactly you are looking for that is not already captured here? I have no knowledge about the PO adapter to compare it. The schema registry support is not there yet, as outlined in the "current limitations" section above. This is the official kafka adapter provided by Cloud Integration and is considered long-term and reliable as such. There are no additional licence costs imposed by this adapter. I cannot comment on any (at least not-yet-planned) future changes about this aspect.

      Best regards

      Jens

      Author's profile photo Rajesh PS
      Rajesh PS

      Jens Kordowski

      There seems to be a big problem with SAP CPI connecting with advanced Apache kafka latest versions to produce and consume event.
      Below are the limitations:
      1) Client authentication (client keystore) not supported (jks file deployment with passphrase).
      2) Server authentication (server truststore) not supported (p12 file deployment with passphrase).
      3) SASL /SCRAM-SHA-512 authentication mechanism not supported
      4) consume offset- latest, oldest, full load  not supported
      5) Schema registry configuration to read is not supported
      6) AVRO deserializer/decoder not supported
      7) consumer group config not supported
      8) content conversion not supported.
      Really there is a big lag with both SAP CPI and SAP BTP Integration suite in connecting with Event driven orchestration with platform like apache kafka and its a great miss by SAP and not upgraded till date.
      Do we have any open connectors available or can this be achieved completely using javascript ??
      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Rajesh

      I acknowledge there are several improvements / extensions that are not addressed yet. You can influence the roadmap and its priorities via the influence campaign: https://influence.sap.com/sap/ino/#/campaign/2282

      Having said that, let me give you my view on the points you raised:

      1 & 2: We do have an architecture how to handle / deploy key material and how to authenticate against systems. We try to align this across our adapter portfolio for consistency reasons. I don't see how 1 & 2 fit into that. In case you want to raise this via the mentioned influence campaign, I'd appreciate more details (including the significance for you or why our model does not work for you).

      3: true -> influence campaign. No significance wrt. security, but still a valid ask.

      4: auto.offset.reset = latest. If this is about arbitrary offset consumption, this creates many problems, as I already outlined in a previous comment (https://blogs.sap.com/2021/03/16/cloud-integration-what-you-need-to-know-about-the-kafka-adapter/comment-page-1/#comment-596402)

      5: true

      6: true

      7: not supported by choice, explained in the blog

      8: would need more input on this. Is this schema registry related? CPI itself offers many options to convert or transform data.

      I am not aware of any open connector or javascript.

      If you want to raise awareness or have an impact on our roadmap, the blog is not the best choice. The influence campaign is the way to go: https://influence.sap.com/sap/ino/#/campaign/2282

      Best regards

      Jens

      Author's profile photo Rajesh PS
      Rajesh PS

      Thanks Jens Kordowski

      for your reply. I feel there should be some open connectors, java/libraries to connect with apache kafka latest version. Event driven solutions needs to be much improved in SAP CPI and BTP IS
      Author's profile photo Sumit Sharma
      Sumit Sharma

      Hello Jens Kordowski

      Nice Blog.

      I am currently following below blog to use Kafka Sender Adapter to connect to a Azure EventHub.

      https://blogs.sap.com/2022/01/19/connect-cloud-integration-with-azure-event-hubs/

      My need is to connect to a specific Consumer Group created in the Azure EventHub.

      I see my iflow is running with succcessful polling status but no messages are getting processed. I even tried renaming my Integration Flow to the name of the Consumer Group (as mentioned in this article) but it is still not working.

      Below are some more details:

      Integration Flow Name/Id: sap-cpi-cf-test

      Azure Event Hub Consumer Grop (to read messages from): sap-cpi-cf-test

      Below is the screenshot from the Manage Integration Status -> Status Detail of the Integration Flow after deployment.

      Wanted to check if you can help me in this regard ?

      Thank you,

      Sumit Sharma

       

       

       

      Author's profile photo Jens Kordowski
      Jens Kordowski
      Blog Post Author

      Hi Sumit

      I am not familiar with Azure EventHub and its concepts. Apparently, an " Azure ConsumerGroup" is an entity to consume from? It seems to be something else than a kafka consumer group. Based on the screenshot, I can see that the groupId is set to your desired name (meaning your strategy to name the iflow accordingly worked so far), so it should join the right kafka consumer group. That property maps to https://kafka.apache.org/documentation/#consumerconfigs_group.id

      Being a generic kafka adapter (and not an Azure EventHub adapter) there is nothing else I can do in this context.

      I took a brief look into the blog post that you mentioned and it states:

      Update 2nd March 2023: The sender AMQP adapter does not support Consumer Groups, just queues. However Azure Event Hubs requires to put the Consumer Group as queue name. As a result the integration flow consumes again and again the messages from Event Hubs. According to this, we can state that consumption of events from Azure Event Hubs is not supported through AMQP adapter. The following section is just for connection test purposes.

      Best regards

      Jens

      Author's profile photo Sumit Sharma
      Sumit Sharma

      Hello Jens Kordowski,

      Thank you for looking into my question and appreciate your reply!