Skip to Content
Technical Articles
Author's profile photo Chris Qi

Build Your First Scenario with SAP Cloud Integration Kafka Adapter


SAP has recently released Kafka Adapter on both Cloud Foundry and Neo environment . This blog is to describe step by step how to build your first simple demo using Cloud Integration as both Producer and Consumer of Kafka to run scenario end-to-end.

Some important concepts of Kafka and Cloud Integration Kafka Adapter configuration has been described here Cloud Integration – What You Need to Know About the Kafka Adapter

The Cloud Integration Kafka Adapter current has the limitaion that Cloud Connector support is not available. Thus the Kafka used in this article is Kafka Cloud trial.


Getting Kafka Cloud Trial

1. Visit Register and get free trial (basic version)

You can sign up for Confluent Cloud for free. Confluent Cloud provides a simple, scalable, resilient, and secure event streaming platform for us to easily test the Kafka scenario.

2. Create cluster

Basic version cluster would be sufficient for our demo. $200 USD credit is offered each of your first three monthly bills. So you won’t be chareged for the test.



confluent create cluster 2

3. Create Topic

Next essential step will be create topic. It can be created with default configuration. You can also customize the configuration according to requirement.

4. Create Kafka API Key

Then you need to create API key for CI access.

Copy the Key and Secret here. We will need them when creating Kafka credential in Cloud Integration.


Design the iFlow in Cloud Integration as Producer for Kafka

1. Set up Kafka Reciever Adapter

a. Copy host from Confluent Setting

b. Authentication

SALS with PLAIN machanism would be the easiest to set up in this scenario.  We need to save Confluent API Credential in CI for access.

To create the credential, we use the API key and secret from abov step as user and password.


2. Set up http Sender Adapter to trigger the process

configure the iFlow to be producer of the topic you created in Confluent. Configure the parameters as required.

3. Deploy the iFlow


Design the iFlow in Cloud Integration as Consumer (group) for Kafka

1. Set up Kafka Sender Adapter

The host and authentication setting would be same as reciever setting

Set the adapter to be consumer of the topic you created. Configure the other parameters as required.

2. Set up Script to log the message.

import java.util.HashMap;
def Message processData(Message message) {
def body = message.getBody(java.lang.String) as String;
def messageLog = messageLogFactory.getMessageLog(message);
if(messageLog != null){
messageLog.setStringProperty("Logging#1", "Printing Payload As Attachment")
messageLog.addAttachmentAsString("ResponsePayload:", body, "text/plain");
return message;


3. You can customize the message also and send to any downstream processes.

4. Deploy the iFlow


Test the iFlow

1. Send HTTP request from postman

2. See message in Confluent

3. Check iFlow from Cloud Integration

With the above steps you will be able to set up a very simple end-to-end senario to utilize Kafka on Confluent Cloud and Kafka Adapter in Cloud Integration. There are many settings available in the adapter that you can further try and test. The blog is just to help you get started.

Welcome to provide feedback regarding the content in the comment section.


Assigned Tags

      You must be Logged on to comment or reply to a post.
      Author's profile photo Souvik Sinha
      Souvik Sinha

      Thanks for sharing the details blog. Like to try this.




      Author's profile photo Werner Dähn
      Werner Dähn

      Where do you define the Avro schema registry? Almost all data in Kafka is using Avro as a payload. This is supported, isn't it?

      Author's profile photo Chris Qi
      Chris Qi
      Blog Post Author

      Hi Werner,


      As to my understanding, the schema need to be created in Kafka when you create the topic (haven't try that out in Conflunt Cloud though). The payload to be produced or consumed via CPI which are the JSON-like content will be transfered like pure JSON or all other formats of data.

      Hope this make sense to you.


      Best regards,


      Author's profile photo Werner Dähn
      Werner Dähn

      Thanks for the answer, would love to see that, Chris Qi .

      Are you saying the Avro Payload is converted to Json back and forth automatically? How does it even know if the payload is Avro, binary data, Json, ProtoBuf, text data? In Kafka all is possible so you need to configure something in CPI. And without a schema registry known to CPI, it does not even know the schema to use for the Avro deserialization.




      Author's profile photo Alejandro Sainz
      Alejandro Sainz

      Hi Werner,

      We've so far only used KAFKA Sender Adapter in consumer scenarios, but I can tell you there is no possibility to specify Schema Registries inside CPI. That means in our case we had to develop some kind of ksqlDB transformation at kafka side in order to receive the messages in simple key-value json format.

      Otherwise the messages were coming in CPI and the adapter was not able to interprete them as it didn't have the Schema anywhere.


      This was raised to SAP and I can tell that their development team is analysing it. In other tools as SAP PO you have the option to specify the AVRO Schema url so I understand this is simply a feature that should be released for CPI anytime soon.




      Author's profile photo Werner Dähn
      Werner Dähn

      Thanks, that's what I feared but could not believe.

      Then the Kafka connectivity is practically non-existent. It is like saying, yes we can connect to other databases via JDBC. But the tables must have two columns with id and text and hence you have to create views that concat all columns into a single text.



      Author's profile photo Alejandro Sainz
      Alejandro Sainz

      Very much so.

      Actually this is known by SAP as they stated at the moment of the release that no Schema Registry was supported. Quite shocking I would say as producing into a kafka topic without an Schema is not what I would call a best practice.


      You can check in Cloud Integration – What You Need to Know About the Kafka Adapter | SAP Blogs which where the limitations at the moment of the release (May 2021)

      Current Limitations

      The following features are not yet available and may come with a later release of the adapter:

      • Schema registry support
      • Cloud Connector support
      • Integration Flow monitor does not reflect all connection problems of a consumer (some improvements are on its way as described in Cloud Integration – Monitoring Polling Status in Kafka Sender Adapter)
      • ksqlDB support
      • Consumption of arbitrary offsets
      • Consuming a certain partition only (as in contrast to consume from a topic)
      • Transactional delivery

      The adapter can currently not connect to brokers, which are not exposed via the Internet. This limitation is based on a lack of proxy support of the kafka library itself, which itself is based on a lack of proxy support of the used java NIO library (

      Author's profile photo SANA BOUKALLI

      Hi Chris,

      We got this error when we tried to connect kafka with SCPI:

      java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed

      And the Kafka team told me that - As a Kafka client, this error appears because there is an attempt to check the server hostname, and we need to disable this by setting "ssl.endpoint.identification.algorithm" to empty in our client tool.

      Is it necessary to set this "ssl.endpoint.identification.algorithm" property in SAP CPI? and where exactly in SCPI?



      Author's profile photo Rajkumar G
      Rajkumar G

      Hi Chris,

      We are getting the below error while producing the message from SAP CPI to kafka topic in confluent cloud.

      Could you please let me know what I am missing.

      Error Details An internal server error occured: Topic purXXXXXXXXXXX not present in metadata after 15000 ms..
      The MPL ID for the failed message is : AGOr_HXXXXXXXXXX

      Kafka connection configuration:

      Authentication : SASL

      Connect with TLS : <<Disabled >>

      SASL mechanism : PLAIN

      Credentials name : << Stored the api-key and api-secret as user-credentials under "manage security material">>


      NOTE: Kafka topic is up & running.

      Tried the connectivity test in Integration suite and getting the below error as well,

      java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1672226173167, tries=1, nextAllowedTryMs=1672226173268) timed out at 1672226173168 after 1 attempt(s) Cause: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1672226173167, tries=1, nextAllowedTryMs=1672226173268) timed out at 1672226173168 after 1 attempt(s) Cause: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listTopics





      Author's profile photo Rajkumar G
      Rajkumar G

      I was able to resolve the issue. Able to produce and consume messages to kafka topic now.

      Author's profile photo Ankush Dhar
      Ankush Dhar

      Hello @Rajkumar G: I am also getting the same issue, could you please help and let me know how to resolve this issue. Appreciate your help. Thanks