Skip to Content
Technical Articles
Author's profile photo Mohammed Saifulla Shafiulla

Apache Kafka & Asset Intelligence Network

Consuming Kafka messages – Asset Intelligence Network

Asset Intelligence Network (AIN) is part of Intelligent Asset Management portfolio alongside Asset Strategy and Performance Management (ASPM) and Predictive Asset Insights (PAI) with a common core component, Asset Central Foundation (ACF).

Intelligent Asset Management Portfolio

Asset Intelligence Network triggers Kafka messages for various objects and for a growing list of scenarios. These messages open an avenue for customers to be notified of changes happening to the objects in AIN so that they can use it in various integration scenarios.

 

Apache Kafka

Apache Kafka is an open-source distributed event streaming platform used for data pipelines, streaming analytics, data integration and various other use cases. More details can be found here.

AIN and Apache Kafka

AIN utilises Apache Kafka as a service on Business Technology Platform (BTP) and provides a mechanism through which a Kafka cluster and relevant topics can be shared with its customers.

Customers can develop their own Kafka consumers to consume messages from the topics that are shared. In order to utilise the feature, customers need to place a request for the cluster / topics to be shared as described here.

High%20Level%20Architecture

High Level Architecture

 

How to Consume?

Once advertised, the details for the cluster and topics can be checked as follows. Use the REST API to fetch the details:

API: https://kafka-marketplace.cf.<landscape>.hana.ondemand.com/v1/marketplace?org=<cloud foundry org name>

Method: GET

Response:

Advertisement%20response

Advertisement API response

Note: Pass the bearer token to the API. To generate, use cf oauth-token command.

  • Add/Assign necessary entitlements to your Subaccount with “reference” plan.
  • In your cloud foundry space create a service instance for the advertised cluster using the information from the above API response. Use Cloud foundry CLI.
cf create-service kafka reference <instance name> -c 
'{"advertisement":"<advertisement id from the API response>"}'

How to write a consumer?

JAVA:

The following example is written as a reactive vertx application to consume Kafka messages. However, there are many ways in which it can be written.

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {

    String consumerGroup = "DemoGroup";
    String topicName = "<subaccountid>.com.sap.dsc.ac.equipment";
//refer latest documentation for the updated topic names format or refer the response of the marketplace API
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
    props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
    props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000");
    props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");

// use consumer for interacting with Apache Kafka
  KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, 
  KafkaRulesConsumerFactory.kafkaConsumerCreate(consumerGroup, props));
  consumer.handler(record -> {
    String rec = record.value().toString();
    byte[] decoded = Base64.getMimeDecoder().decode(rec.getBytes());
    System.out.println("Processing key=" + record.key() + ",value=" + rec +
      ",partition=" + record.partition() + ",offset=" + record.offset());
    try {
      String result = KafkaUtils.readAvroMessageWithSchema(decoded);
      System.out.println(result);
    }
    catch(Exception e)
    {
      System.out.println(e);
    }

  });

  // or just subscribe to a single topic
  consumer
    .subscribe(topicName)
    .onSuccess(v ->
      System.out.println("subscribed")
    ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );
  }
}

In order to connect to the Kafka cluster, SASL_SSL (PLAIN) protocol is used. The root certificate is downloaded using the APIs listed in the service key of the instance. A keystore is created and is passed in the properties used to connect to the Kafka instance.

How to decode the message?

The messages from ACF use Avro based schema and are base64 encoded.

public static String readAvroMessageWithSchema(byte[] message) throws IOException {
 StringBuilder queueMessage = new StringBuilder();
 SeekableByteArrayInput input = new SeekableByteArrayInput(message);
 GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
 DataFileReader dataFileReader = new DataFileReader(input, reader);
 GenericRecord data = (GenericRecord) dataFileReader.next();

 return data.toString();
}

 

Some key dependencies used:

"io.vertx:vertx-kafka-client:4.0.3"
"io.projectreactor.kafka:reactor-kafka:1.3.3"
"org.apache.avro:avro:1.10.2"

Consumer%20in%20Java

Consumer in Java

 

NodeJs:

async function kConnect() {
let tokenVal;
tokenVal = await config().token();

const kafka = new Kafka({
clientId: 'node-kconsumer',
brokers: credentials.sslBroker.split(','),
ssl: {
rejectUnauthorized: false,
ca: [fs.readFileSync('./kConsumer/ssl/kafka.crt', 'utf-8')],
},
sasl: {
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username: credentials.username,
password: tokenVal
},
});

const consumer = kafka.consumer({groupId: 'my-group1'})
await consumer.connect()
await consumer.subscribe({topic: '<subaccountid>.com.sap.dsc.ac.equipment',fromBeginning: true})
//Refer documentation or the advertisement API response for the topic name format

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value,
headers: message.headers,
})

console.log("JSON  "+message.value.toJSON());
console.log("STRR  "+message.value.toString());
avro().avroConvert(message.value.toString());
},
})

}


In this example as well SASL_SSL protocol is used to connect to the Kafka instance. The root certificate can also be generated using the APIs exposed in the service key of the instance.

curl https://kafka-service-broker.cf.<landscape>.
hana.ondemand.com/certs/rootCA/current > kafka.crt

The node example uses the following key dependencies:

const avro = require('avro-js');const {Kafka} = require("kafkajs");

Consumer%20in%20Nodejs

Consumer in Nodejs

Python:

def main():
env = AppEnv()
kafka_env = env.get_service(label='kafka')
token = getToken(kafka_env)
consumer = KafkaConsumer(<subaccountid>.com.sap.dsc.ac.equipment',
bootstrap_servers=kafka_env.credentials[u'cluster'][u'brokers'].split(','),
auto_offset_reset='earliest',
enable_auto_commit=True,
client_id='py-kconsumer',
group_id='Pyconn',
security_protocol='SASL_SSL',
ssl_cafile='./ssl/kafka.crt',
sasl_mechanism='PLAIN',
sasl_plain_username=kafka_env.credentials[u'username'],
sasl_plain_password=token,
api_version=(2, 5, 1)
)
print("subscribed") for message in consumer:
print(message)


The message can be converted using avro python library as:

reader = DataFileReader(open("./message.avro", "rb"), DatumReader())
for data in reader:
print(data)
reader.close()

Key dependencies used:

from kafka import KafkaConsumer
import avro.schema 
from avro.datafile import DataFileReader, 
from avro.io import DatumReader

In addition, customers can also leverage Kafka Adapter provided by Cloud Platform Integration.

Conclusion

Kafka event streaming is quite powerful in terms of getting real-time or near real-time updates for your data. AIN utilises this for providing a pathway for customers who wish to integrate with myriad set of disparate systems and helps provide consistency and efficient synchronisation of the data across distributed systems. Kafka in conjunction with Apace Avro makes integration scenarios more stable, providing better compatibility support across releases and upgrades.

See also:

Apache Kafka

Apache Avro

Events in AIN

SAP Cloud Platform Integration Kafka Adapter blog

Assigned tags

      3 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Rutika Bodas
      Rutika Bodas

      Nice write-up Saif. Well explained!

      Author's profile photo Lochner Louw
      Lochner Louw

      Pretty cool and will need to try this.

       

      Question on the messages from Kafka.

      From reading the External Events PDF and asking around I heard that the messages are supposed to be in the CloudEvents spec. I assume the content of the AVRO payload is the same CloudEvents spec?

       

      Thanks

      Lochner

       

      Author's profile photo Mohammed Saifulla Shafiulla
      Mohammed Saifulla Shafiulla
      Blog Post Author

      Lochner Louw Yes, the Avro schema is aligned with the CloudEvents spec.