Technical Articles
Build Your First Scenario with SAP Cloud Integration Kafka Adapter
Introduction
SAP has recently released Kafka Adapter on both Cloud Foundry and Neo environment . This blog is to describe step by step how to build your first simple demo using Cloud Integration as both Producer and Consumer of Kafka to run scenario end-to-end.
Some important concepts of Kafka and Cloud Integration Kafka Adapter configuration has been described here Cloud Integration – What You Need to Know About the Kafka Adapter
The Cloud Integration Kafka Adapter current has the limitaion that Cloud Connector support is not available. Thus the Kafka used in this article is Kafka Cloud trial.
Getting Kafka Cloud Trial
1. Visit https://confluent.cloud/. Register and get free trial (basic version)
You can sign up for Confluent Cloud for free. Confluent Cloud provides a simple, scalable, resilient, and secure event streaming platform for us to easily test the Kafka scenario.
2. Create cluster
Basic version cluster would be sufficient for our demo. $200 USD credit is offered each of your first three monthly bills. So you won’t be chareged for the test.
confluent create cluster 2
3. Create Topic
Next essential step will be create topic. It can be created with default configuration. You can also customize the configuration according to requirement.
4. Create Kafka API Key
Then you need to create API key for CI access.
Copy the Key and Secret here. We will need them when creating Kafka credential in Cloud Integration.
Design the iFlow in Cloud Integration as Producer for Kafka
1. Set up Kafka Reciever Adapter
a. Copy host from Confluent Setting
b. Authentication
SALS with PLAIN machanism would be the easiest to set up in this scenario. We need to save Confluent API Credential in CI for access.
To create the credential, we use the API key and secret from abov step as user and password.
2. Set up http Sender Adapter to trigger the process
configure the iFlow to be producer of the topic you created in Confluent. Configure the parameters as required.
3. Deploy the iFlow
Design the iFlow in Cloud Integration as Consumer (group) for Kafka
1. Set up Kafka Sender Adapter
The host and authentication setting would be same as reciever setting
Set the adapter to be consumer of the topic you created. Configure the other parameters as required.
2. Set up Script to log the message.
import com.sap.gateway.ip.core.customdev.util.Message;
import java.util.HashMap;
def Message processData(Message message) {
def body = message.getBody(java.lang.String) as String;
def messageLog = messageLogFactory.getMessageLog(message);
if(messageLog != null){
messageLog.setStringProperty("Logging#1", "Printing Payload As Attachment")
messageLog.addAttachmentAsString("ResponsePayload:", body, "text/plain");
}
return message;
}
3. You can customize the message also and send to any downstream processes.
4. Deploy the iFlow
Test the iFlow
1. Send HTTP request from postman
2. See message in Confluent
3. Check iFlow from Cloud Integration
With the above steps you will be able to set up a very simple end-to-end senario to utilize Kafka on Confluent Cloud and Kafka Adapter in Cloud Integration. There are many settings available in the adapter that you can further try and test. The blog is just to help you get started.
Welcome to provide feedback regarding the content in the comment section.
Thanks!
Thanks for sharing the details blog. Like to try this.
Regards,
Souvik
Where do you define the Avro schema registry? Almost all data in Kafka is using Avro as a payload. This is supported, isn't it?
Hi Werner,
As to my understanding, the schema need to be created in Kafka when you create the topic (haven't try that out in Conflunt Cloud though). The payload to be produced or consumed via CPI which are the JSON-like content will be transfered like pure JSON or all other formats of data.
Hope this make sense to you.
Best regards,
Chris
Thanks for the answer, would love to see that, Chris Qi .
Are you saying the Avro Payload is converted to Json back and forth automatically? How does it even know if the payload is Avro, binary data, Json, ProtoBuf, text data? In Kafka all is possible so you need to configure something in CPI. And without a schema registry known to CPI, it does not even know the schema to use for the Avro deserialization.
:confused:
Hi Werner,
We've so far only used KAFKA Sender Adapter in consumer scenarios, but I can tell you there is no possibility to specify Schema Registries inside CPI. That means in our case we had to develop some kind of ksqlDB transformation at kafka side in order to receive the messages in simple key-value json format.
Otherwise the messages were coming in CPI and the adapter was not able to interprete them as it didn't have the Schema anywhere.
This was raised to SAP and I can tell that their development team is analysing it. In other tools as SAP PO you have the option to specify the AVRO Schema url so I understand this is simply a feature that should be released for CPI anytime soon.
Cheers.
Alex.
Thanks, that's what I feared but could not believe.
Then the Kafka connectivity is practically non-existent. It is like saying, yes we can connect to other databases via JDBC. But the tables must have two columns with id and text and hence you have to create views that concat all columns into a single text.
Very much so.
Actually this is known by SAP as they stated at the moment of the release that no Schema Registry was supported. Quite shocking I would say as producing into a kafka topic without an Schema is not what I would call a best practice.
You can check in Cloud Integration – What You Need to Know About the Kafka Adapter | SAP Blogs which where the limitations at the moment of the release (May 2021)
Hi Chris,
We got this error when we tried to connect kafka with SCPI:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
And the Kafka team told me that - As a Kafka client, this error appears because there is an attempt to check the server hostname, and we need to disable this by setting "ssl.endpoint.identification.algorithm" to empty in our client tool.
Is it necessary to set this "ssl.endpoint.identification.algorithm" property in SAP CPI? and where exactly in SCPI?
Regards,
Hi Chris,
We are getting the below error while producing the message from SAP CPI to kafka topic in confluent cloud.
Could you please let me know what I am missing.
Error Details
com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: Topic purXXXXXXXXXXX not present in metadata after 15000 ms..
The MPL ID for the failed message is : AGOr_HXXXXXXXXXX
Kafka connection configuration:
Authentication : SASL
Connect with TLS : <<Disabled >>
SASL mechanism : PLAIN
Credentials name : << Stored the api-key and api-secret as user-credentials under "manage security material">>
NOTE: Kafka topic is up & running.
Tried the connectivity test in Integration suite and getting the below error as well,
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1672226173167, tries=1, nextAllowedTryMs=1672226173268) timed out at 1672226173168 after 1 attempt(s) Cause: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1672226173167, tries=1, nextAllowedTryMs=1672226173268) timed out at 1672226173168 after 1 attempt(s) Cause: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listTopics
Regards,
Rajkumar
I was able to resolve the issue. Able to produce and consume messages to kafka topic now.
Hello @Rajkumar G: I am also getting the same issue, could you please help and let me know how to resolve this issue. Appreciate your help. Thanks
I would also be very interested in the details that enabled you to produce and consume messages to Kafka topic, @Rajkumar G.
Many thanks in advance, Frank
Anyone stuck with the certificate error on handshake as below may follow these steps to resolve the ssl chain issue.
That should fix the cert issue, If required restart the flow and test again.
Thanks!
Sudhir
"com.sap.it.rt.adapter.http.api.exception.HttpResponseException: An internal server error occured: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target."
Great blog, Chris.
Where in CI do I create the User Credentials for my Kafka connection, please? I do not seem to be able to find what your screenshot shows.
Many thanks in advance, Frank
Chris Qi