Skip to Content
Technical Articles

SAP Analytics Cloud Custom Widget to Send and Receive Messages with RabbitMQ

In this tutorial, we’ll learn how we can create a SAC custom widget to send and receive messages with RabbitMQ.

RabbitMQ

RabbitMQ is a free, open-source and extensible message queuing Protocol. It is one of the most widely used message brokers in the world. It also known as a message broker or queue manager. A message broker can act as a middleman for various services. A broker can be used to reduce loads and delivery times by web application servers.

Diagram

Below is the high-level diagram with the following components:

  • SAP Analytics Cloud, Analytic App with the custom widget and SAPUI5 component.
    We will build a user interface to send and receive messages from the SAC Analytic App.
  • A JavaScript MQTT library Paho JavaScript client.
  • A RabbitMQ server with Web MQTT Plugin.
  • A simple Python client to send and receive messages.

RabbitMQ Server Setup

Download and install the RabbitMQ server on your machine.

  • Once you have installed, start the service with this command:
    sudo service rabbitmq-server start​
  • Enable RabbitMQ Web MQTT Plugin. The Web MQTT plugin makes it possible to use MQTT over a WebSocket connection. Refer to this document for more details.
    rabbitmq-plugins enable rabbitmq_web_mqtt​
  • Enable the RabbitMQ Management Plugin, web based API for management and monitoring of RabbitMQ nodes and clusters. Refer to this document for more details.
  • Once you have enabled, you can go to this URL http://localhost:15672/
  • Go to Queues tab and create a new queue, SAC.
  • You should see now the SAC queue has been created. The second queue in the screenshot is created when you run the SAC custom widget app. We will walkthrough it shortly.
  • Go to Exchanges tab and select amq.topic.
  • We need to add binding to SAC queue with Routing Key events. Go ahead to add and bind it.

SAP Analytics Cloud Custom Widget

Now we need to create a WebComponents for custom widget.

The widget will first load the MQTT JavaScript library (mqttws31.js) and initiate the connection to RabbitMQ server.

Once the connection is established, it will subscribe to the Routing Key events that we have defined earlier. By subscribing to this event, it listens to the incoming messages – see the function client.onMessageArrived().

 

let mqttjs = "http://localhost:7800/mqttws31.js";
async function LoadLibs() {
    try {
        await loadScript(mqttjs, _shadowRoot);
    } catch (e) {
        alert(e);
    } finally {
        var wsbroker = 'localhost'; //mqtt websocket enabled broker
        console.log(wsbroker);
        var wsport = 15675; // port for above

        client = new Paho.MQTT.Client(wsbroker, wsport, "/ws",
            "myclientid_" + parseInt(Math.random() * 100, 10));

        console.log(client);

        client.onConnectionLost = function(responseObject) {
            console.log("CONNECTION LOST - " + responseObject.errorMessage);
        };

        client.onMessageArrived = function(message) {
            console.log("RECEIVE ON " + message.destinationName + " PAYLOAD " + message.payloadString);

            if (message.payloadString.split(":")[0] === "loc") {
                loadthis(that, changedProperties, message.payloadString);
            }
        };
        var options = {
            timeout: 3,
            keepAliveInterval: 30,
            onSuccess: function() {
                console.log("CONNECTION SUCCESS !!");
                client.subscribe("events", {
                    qos: 1
                });
            },
            onFailure: function(message) {
                console.log("CONNECTION FAILURE - " + message.errorMessage);
            }
        };
        if (location.protocol == "https:") {
            options.useSSL = true;
        }
        console.log("CONNECT TO " + wsbroker + ":" + wsport);
        client.connect(options);
        loadthis(that, changedProperties, '');
    }
}
LoadLibs();

From the UI5 codes that we embedded in the WebComponents, we create a TextArea for user to type and send messages.

return Controller.extend("myView.Template", {

    onInit: function() {
        console.log("-------oninit--------");
        console.log("widgetName:" + that.widgetName);

        if (that._firstConnection === 0) {
            that._firstConnection = 1;
            var oData = {
                "value": parseInt(that._export_settings.subtitle)
            };
            var oModel = new JSONModel(oData);
            this.getView().setModel(oModel, that.widgetName);
        } else {
            console.log("After-------------");
            console.log(that.widgetName);
            MessageToast.show(message.split(":")[1]);
        }
    },

    handleLiveChange: function(oEvent) {
        var oTextArea = oEvent.getSource(),
            iValueLength = oTextArea.getValue().length,
            iMaxLength = oTextArea.getMaxLength(),
            sState = iValueLength > iMaxLength ? "Error" : "None";

        _input = oTextArea.getValue();
        console.log("_input:" + _input);
        that._firePropertiesChanged();
        this.settings = {};
        this.settings.input = "";
        that.dispatchEvent(new CustomEvent("onStart", {
            detail: {
                settings: this.settings
            }
        }));
        oTextArea.setValueState(sState);

        var message = new Paho.MQTT.Message("sac:" + _input);
        message.destinationName = "events";
        console.log("SEND ON " + message.destinationName + " PAYLOAD " + _input);
        client.send(message);
    }
});

Python App

The last thing we need to build is a Python client app to send and receive messages from/to SAC widget.

Send.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='SAC', durable=True) #task_queue will be permanent

message = ' '.join(sys.argv[1:]) or "Hello World from SAC"
channel.basic_publish(
    exchange='amq.topic',
    routing_key='events',
    body='loc:' + message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

Receive.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='SAC', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
	msg = body.decode().split(":")
	print(" [x] Received %r" % msg[1])
	time.sleep(body.count(b'.'))
	print(" [x] Done")
	ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='SAC', on_message_callback=callback)

channel.start_consuming()

Quick Demo

Conclusion

For the use cases that I can think of is we can implement this method to train the ML model directly from the SAC itself by just clicking a button and get the result updated in the SAC model. Or perform RPC (Remote Procedure Call) directly from the SAC to execute the heavy duty jobs.

In the next tutorial, we will build the same thing with Apache Kafka.

References

Be the first to leave a comment
You must be Logged on to comment or reply to a post.