Skip to Content
Technical Articles

SAP Analytics Cloud Custom Widget to Send and Receive Messages with Kafka

In this tutorial, we’ll learn how we can create a SAC custom widget to send messages to NodeJS app with Kafka. In the last tutorial, we discussed how to achieve the same thing with RabbitMQ.

Kafka

Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. It is designed for high volume publish-subscribe messages and streams, meant to be durable, fast and scalable. A Kafka cluster is not only highly scalable and fault-tolerant, but it also has a much higher throughput compared to other message brokers such as RabbitMQ. If you want to read more on when to use Kafka or RabbitMQ, you can refer to this blog.

Diagram

Below is the high-level diagram with the following components:

  • SAP Analytics Cloud, Analytic App with the custom widget, SAPUI5 component and Socket.IO JavaScript client.
  • Producer – A NodeJS server with Socket.IO library and KafkaJS library.
  • An Apache Kafka server.
  • Consumer – A client app.

Kafka Server Setup

Download and install Kafka server on your machine. You can refer to this tutorial how to install on Ubuntu machine.

  • Once you have installed and start the server, we need to create a topic named SACTopic:
    ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic SACTopic​

  • Let’s test sending a message to a topic from command line:
    echo "Hello World from Kafka" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic SACTopic > /dev/null​

  • And consume messages from a topic from command line:
    ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic SACTopic --from-beginning​

    If there is no configuration error, you should see “Hello World from Kafka” in your console:

NodeJS Server (Producer)

The NodeJS server acts as a “Producer” to publish messages to topics. It uses web socket, Socket.IO to relay the messages between the SAC widget and Kafka.

Install the required libraries:

npm install kafkajs http express fs url body-parser

nodeserver.js

Before your run the server, update the topic variable to SACTopic.

"use strict";
const port = process.env.PORT || 3000;
const server = require("http").createServer();
const express = require("express");
const fs = require('fs');
const url = require('url');
const bodyParser = require("body-parser");


const {
    Kafka
} = require('kafkajs')
const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({
    groupId: 'test-group'
})
const topic = 'SACTopic';
var sockett;

var app = express();
app.use(bodyParser.json());


//Start the Server 
server.on("request", app);

// use socket.io
var io = require('socket.io').listen(server);

// define interactions with client
io.on('connection', function(socket) {
    sockett = socket;

    socket.on('fr_sac', function(data) {
        console.log('--from SAC: ' + data);

	    var sendMessage = async () => {
		  await producer.connect()
		  await producer.send({
		    topic: topic,
		    messages: [
		      { key: 'key1', value: data }
		    ],
		  })
		  await producer.disconnect()
		}

		sendMessage().catch(console.error);        
    });
});


//Start the Server 
//server.on("request", app);
server.listen(port, function() {
    console.info(`HTTP Server: ${server.address().port}`);
});


const listenMessage = async () => {
    // Consuming
    await consumer.connect()
    await consumer.subscribe({
        topic: topic,
        fromBeginning: false
    })

    await consumer.run({
        eachMessage: async ({
            topic,
            partition,
            message
        }) => {
            console.log({
                partition,
                offset: message.offset,
                value: message.value.toString(),
                key: message.key.toString()
            })

            if(message.key.toString() === "key2") {
            	sockett.emit("client_data", message.value.toString());
        	}
        },
    })
}

listenMessage().catch(console.error)

Run the server with the following command:

node nodeserver.js

SAC Custom Widget

In the widget, we will use the Socket.IO client library and performs the connection to the NodeJS server and listens to the client_data event from the server.

function loadthis(that, changedProperties) {
    var that_ = that;
    var socketid;
    //Socket Connection
    //******************************************
    socket = io("http://localhost:3000");
    socket.on('disconnect', function() {
        console.log("socket disconnected: " + socketid);
        UI5(changedProperties, that, "");
    });
    socket.on('connect', function() {
        socketid = socket.id;
        console.log("socket connected: " + socketid);
        UI5(changedProperties, that);
    });
    socket.on('client_data', function(data) {
        console.log('Message from server: ' + data);
        UI5(changedProperties, that, data);
    });
}

The onButtonPress() function gets the value from the SAPUI5 Text Area and send it to Node server via Socket.IO.

onButtonPress: async function(oEvent) {
    var oInput = this.byId("textArea_" + widgetName);
    console.log(oInput.getValue());

    socket.emit("fr_sac", oInput.getValue());
}

Client App (Consumer)

A client app acts as a “Consumer” to read messages from topics

const { Kafka } = require('kafkajs')
 
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})
 
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
 
let topic = 'SACTopic'

const listenMessage = async () => {
    // Consuming
    await consumer.connect()
    await consumer.subscribe({
        topic: topic,
        fromBeginning: false
    })

    await consumer.run({
        eachMessage: async ({
            topic,
            partition,
            message
        }) => {
            console.log({
                partition,
                offset: message.offset,
                value: message.value.toString(),
            })
        }
    })
}

listenMessage().catch(console.error)

Quick Demo

References

Be the first to leave a comment
You must be Logged on to comment or reply to a post.