Skip to Content
Technical Articles
Author's profile photo Chris Qi

Build Your First Scenario with SAP Cloud Integration Kafka Adapter

Introduction

SAP has recently released Kafka Adapter on both Cloud Foundry and Neo environment . This blog is to describe step by step how to build your first simple demo using Cloud Integration as both Producer and Consumer of Kafka to run scenario end-to-end.

Some important concepts of Kafka and Cloud Integration Kafka Adapter configuration has been described here Cloud Integration – What You Need to Know About the Kafka Adapter

The Cloud Integration Kafka Adapter current has the limitaion that Cloud Connector support is not available. Thus the Kafka used in this article is Kafka Cloud trial.

 

Getting Kafka Cloud Trial

1. Visit https://confluent.cloud/. Register and get free trial (basic version)

You can sign up for Confluent Cloud for free. Confluent Cloud provides a simple, scalable, resilient, and secure event streaming platform for us to easily test the Kafka scenario.

2. Create cluster

Basic version cluster would be sufficient for our demo. $200 USD credit is offered each of your first three monthly bills. So you won’t be chareged for the test.

confluent%20create%20cluster%201

confluent%20create%20cluster%202

confluent create cluster 2

3. Create Topic

Next essential step will be create topic. It can be created with default configuration. You can also customize the configuration according to requirement.

4. Create Kafka API Key

Then you need to create API key for CI access.

Copy the Key and Secret here. We will need them when creating Kafka credential in Cloud Integration.

 

Design the iFlow in Cloud Integration as Producer for Kafka

1. Set up Kafka Reciever Adapter

a. Copy host from Confluent Setting

b. Authentication

SALS with PLAIN machanism would be the easiest to set up in this scenario.  We need to save Confluent API Credential in CI for access.

To create the credential, we use the API key and secret from abov step as user and password.

 

2. Set up http Sender Adapter to trigger the process

configure the iFlow to be producer of the topic you created in Confluent. Configure the parameters as required.

3. Deploy the iFlow

 

Design the iFlow in Cloud Integration as Consumer (group) for Kafka

1. Set up Kafka Sender Adapter

The host and authentication setting would be same as reciever setting

Set the adapter to be consumer of the topic you created. Configure the other parameters as required.

2. Set up Script to log the message.

import com.sap.gateway.ip.core.customdev.util.Message;
import java.util.HashMap;
def Message processData(Message message) {
def body = message.getBody(java.lang.String) as String;
def messageLog = messageLogFactory.getMessageLog(message);
if(messageLog != null){
messageLog.setStringProperty("Logging#1", "Printing Payload As Attachment")
messageLog.addAttachmentAsString("ResponsePayload:", body, "text/plain");
}
return message;
}

 

3. You can customize the message also and send to any downstream processes.

4. Deploy the iFlow

 

Test the iFlow

1. Send HTTP request from postman

2. See message in Confluent

3. Check iFlow from Cloud Integration

With the above steps you will be able to set up a very simple end-to-end senario to utilize Kafka on Confluent Cloud and Kafka Adapter in Cloud Integration. There are many settings available in the adapter that you can further try and test. The blog is just to help you get started.

Welcome to provide feedback regarding the content in the comment section.

Thanks!

Assigned Tags

      15 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Souvik Sinha
      Souvik Sinha

      Thanks for sharing the details blog. Like to try this.

       

      Regards,

      Souvik

      Author's profile photo Werner Dähn
      Werner Dähn

      Where do you define the Avro schema registry? Almost all data in Kafka is using Avro as a payload. This is supported, isn't it?

      Author's profile photo Chris Qi
      Chris Qi
      Blog Post Author

      Hi Werner,

       

      As to my understanding, the schema need to be created in Kafka when you create the topic (haven't try that out in Conflunt Cloud though). The payload to be produced or consumed via CPI which are the JSON-like content will be transfered like pure JSON or all other formats of data.

      Hope this make sense to you.

       

      Best regards,

      Chris

      Author's profile photo Werner Dähn
      Werner Dähn

      Thanks for the answer, would love to see that, Chris Qi .

      Are you saying the Avro Payload is converted to Json back and forth automatically? How does it even know if the payload is Avro, binary data, Json, ProtoBuf, text data? In Kafka all is possible so you need to configure something in CPI. And without a schema registry known to CPI, it does not even know the schema to use for the Avro deserialization.

      :confused:

       

       

      Author's profile photo Alejandro Sainz
      Alejandro Sainz

      Hi Werner,

      We've so far only used KAFKA Sender Adapter in consumer scenarios, but I can tell you there is no possibility to specify Schema Registries inside CPI. That means in our case we had to develop some kind of ksqlDB transformation at kafka side in order to receive the messages in simple key-value json format.

      Otherwise the messages were coming in CPI and the adapter was not able to interprete them as it didn't have the Schema anywhere.

       

      This was raised to SAP and I can tell that their development team is analysing it. In other tools as SAP PO you have the option to specify the AVRO Schema url so I understand this is simply a feature that should be released for CPI anytime soon.

       

      Cheers.

      Alex.

      Author's profile photo Werner Dähn
      Werner Dähn

      Thanks, that's what I feared but could not believe.

      Then the Kafka connectivity is practically non-existent. It is like saying, yes we can connect to other databases via JDBC. But the tables must have two columns with id and text and hence you have to create views that concat all columns into a single text.

       

       

      Author's profile photo Alejandro Sainz
      Alejandro Sainz

      Very much so.

      Actually this is known by SAP as they stated at the moment of the release that no Schema Registry was supported. Quite shocking I would say as producing into a kafka topic without an Schema is not what I would call a best practice.

       

      You can check in Cloud Integration – What You Need to Know About the Kafka Adapter | SAP Blogs which where the limitations at the moment of the release (May 2021)

      Current Limitations

      The following features are not yet available and may come with a later release of the adapter:

      • Schema registry support
      • Cloud Connector support
      • Integration Flow monitor does not reflect all connection problems of a consumer (some improvements are on its way as described in Cloud Integration – Monitoring Polling Status in Kafka Sender Adapter)
      • ksqlDB support
      • SASL/OAUTHBEARER
      • Consumption of arbitrary offsets
      • Consuming a certain partition only (as in contrast to consume from a topic)
      • Transactional delivery

      The adapter can currently not connect to brokers, which are not exposed via the Internet. This limitation is based on a lack of proxy support of the kafka library itself, which itself is based on a lack of proxy support of the used java NIO library (https://bugs.openjdk.java.net/browse/JDK-8199457).

      Author's profile photo SANA BOUKALLI
      SANA BOUKALLI

      Hi Chris,

      We got this error when we tried to connect kafka with SCPI:

      java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed

      And the Kafka team told me that - As a Kafka client, this error appears because there is an attempt to check the server hostname, and we need to disable this by setting "ssl.endpoint.identification.algorithm" to empty in our client tool.

      Is it necessary to set this "ssl.endpoint.identification.algorithm" property in SAP CPI? and where exactly in SCPI?

      Regards,

       

      Author's profile photo Rajkumar G
      Rajkumar G

      Hi Chris,

      We are getting the below error while producing the message from SAP CPI to kafka topic in confluent cloud.

      Could you please let me know what I am missing.

      Error Details
      com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: Topic purXXXXXXXXXXX not present in metadata after 15000 ms..
      The MPL ID for the failed message is : AGOr_HXXXXXXXXXX

      Kafka connection configuration:

      Authentication : SASL

      Connect with TLS : <<Disabled >>

      SASL mechanism : PLAIN

      Credentials name : << Stored the api-key and api-secret as user-credentials under "manage security material">>

       

      NOTE: Kafka topic is up & running.

      Tried the connectivity test in Integration suite and getting the below error as well,

      java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1672226173167, tries=1, nextAllowedTryMs=1672226173268) timed out at 1672226173168 after 1 attempt(s) Cause: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1672226173167, tries=1, nextAllowedTryMs=1672226173268) timed out at 1672226173168 after 1 attempt(s) Cause: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listTopics

       

      Regards,

      Rajkumar

       

      Author's profile photo Rajkumar G
      Rajkumar G

      I was able to resolve the issue. Able to produce and consume messages to kafka topic now.

      Author's profile photo Ankush Dhar
      Ankush Dhar

      Hello @Rajkumar G: I am also getting the same issue, could you please help and let me know how to resolve this issue. Appreciate your help. Thanks

      Author's profile photo Frank Schuler
      Frank Schuler

      I would also be very interested in the details that enabled you to produce and consume messages to Kafka topic, @Rajkumar G.

      Many thanks in advance, Frank

      Author's profile photo Sudhir Tiwari
      Sudhir Tiwari

      Anyone stuck with the certificate error on handshake as below may follow these steps to resolve the ssl chain issue.

      1. Go to the connectivity Tests tile under Monitoring section.
      2. Type the bootstrap server URl(without any prefix or port) in TLS tab and keep the port as 443
      3. Click on Send Button
      4. Click on Download button next to Server Certificate Chain to download the certificates in zip format.
      5. Unzip the file to extract certs.
      6. Upload the certs to Keystore Tile under Monitoring

      That should fix the cert issue, If required restart the flow and test again.

       

      Thanks!

      Sudhir

       

       

       

       

      "com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target."

      Author's profile photo Frank Schuler
      Frank Schuler

      Great blog, Chris.

      Where in CI do I create the User Credentials for my Kafka connection, please? I do not seem to be able to find what your screenshot shows.

      Many thanks in advance, Frank

      Author's profile photo Rajesh PS
      Rajesh PS

      Chris Qi

       

      There seems to be a big problem with SAP CPI connecting with advance Apache kafka.
      Below are the limitations:
      1) Client authentication (client keystore) not supported (jks file deployment with passphrase).
      2) Server authentication (server truststore) not supported (p12 file deployment with passphrase).
      3) SASL /SCRAM-SHA-512 authentication mechanism not supported
      4) consume offset- latest, oldest, full load  not supported
      5) Schema registry configuration to read is not supported
      6) avro deserializer/decoder not supported
      7) consumer group config not supported
      8) content conversion not supported.
      Really there is a big lag with both SAP CPI and SAP BTP Integration suite in connecting with Event driven orchestration with platform like apache kafka and its a great miss by SAP and not upgraded.