Skip to Content
Technical Articles

Predictive Analytics at the EDGE as a Python script within a message bus

This example is an alternative for chapter “6. Implementation” of this blog post

Python bundle variant

There are several reasons why I decide to implement these two Python-based variants.

The main reason is related to the usage; differently from Java, Python is a really used language in the data science: several libraries are available only for Python and C (wrapped with Cython).

A second reason is that this method is very generic and can be considered as a precooked box to run any possible algorithm already implemented with your favorite programming language.

This is the schema of the implemented solution

We will focus on the components in the grey boxes.

As in the pure Java implementation, the source code used to implement this sample is available in this link of the official SAP Github repository.

The project structure and the classes that implement the persistence java client are very similar to the other example I’ve already explained in the other blog post. The project looks like in the following pictur

One of the main modification that I’ve introduced is into the pom.xml file to permits the usage of the ZeroMQ Java libraries, that is the simple message bus we have decided to use to communicate with the python module.


I have used ZeroMQ in the current codebase as a simple socket-based implementation that permits our java code to connect and communicate over a socket. It means that the Java message bus implementation represents the client in the communication and the Python code will implement the server.

  • orchestrates all the queries which are executed from the SQLAnywhere database within the Persistence java client.
  • DataStreamer class contains just the methods to stream the results to SAP Cloud Platform Internet of Things based on a simple HTTP client.
  • is a helper class that wraps the features exposed by ZMQ.

This class contains the methods to send and receive the data within the message bus and to initialize correctly the bus.

The runPair method first pings the server to verify that is alive. If it’s not alive, he’s going to start it.

The close will be invoked once when the service is stopped. It will also stop the server (when possible).

  • The core of the logic is inside our PredictValues class.

Starting from the initialize method you can find the details to connect to the server.

The rest of the logic is called inside the run method that is automatically scheduled every 10 seconds.

This method is going to fetch the data within the Java Persistence Service Client using a time window of 10 seconds.

In case there are some data, for each sample we are going to send it into the message bus and wait to receive the result from the ZeroMQ socket, which contains a JSON with the predicted value.

Then we are going to build an index that came from a mathematical formula that considers an aggregation of the prediction for all the samples.

Finally, we can stream the punctual prediction and the global prediction (represented trough the index) back to SAP Cloud Platform Internet of Things.

This operation permits to compute additional manipulation to the predicted values such as to send them back to a 3rd party system in the cloud, or, like in this scenario, with a Rule computed within the Streaming Service.

Python module

In the python, first, I have created the “server” connection for the message bus.

Then I have started an infinite loop that is going to wait for a message, parse it in a usable format, and invoke the prediction within a python sub-call that is already been implemented (for example within a Jupyther Notebook in SAP Data Intelligence in the cloud).

After the result is ready it’s transformed in several key-value pairs (the equivalent of a Java Map), converted to JSON string and sent into the message bus.

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Copyright (c) 2020 SAP SE or an affiliate company. All rights reserved.
# The sample is not intended for production use.  Provided "as is".
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

This program implements a KNN algorithm to recognize the color of in input sample,
in terms of RGB components. The model is trained within a dataset of red-scale, 
so it able to recognize what colours are red, and provide as output the euclidean distances
of the top 3 nearest neighbors.
import json
import os.path

# Third-party libraries
import sklearn.neighbors
import zmq

# This folder must contain the model data
DATA_DIR = '.'
TRAINING_DATASET_FILENAME = os.path.join(DATA_DIR, 'red-colors.json')

def predict(array_data, labels, knn_classifier):
    '''Predict the color name by its RGB components
        array_data (list): a list of 3 integer elements: R, G, B. Each element must be in the 0..255 range.
        labels (list): a list of labels that come from the dataset.
        knn_classifier (object): The classifier used to make the prediction.
        A dictionary with neighbor names as keys and distances from the RGB point to each neighbor as values
    predicted = knn_classifier.predict(array_data)
    distances,indexes = knn_classifier.kneighbors(array_data)
    print([labels[i] for i in indexes[0]])
    data = {}
    data['label'] = predicted[0]
    data['neighbor(1)'] = distances[0][0]
    data['neighbor(2)'] = distances[0][1]
    data['neighbor(3)'] = distances[0][2]
    return data

def zmq_start_server(port):
    Strart ZMQ messagebus at the specified port (binded to any available ip)
        port (string): A string with the port used to bind the socket at the server side.
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:" + port)
    return socket

def main():
    The entry point of the predictive algorithm
    dataset = json.load(f)
    points = [el['data'] for el in dataset]
    labels = [el['label'] for el in dataset]
    knn_classifier = sklearn.neighbors.KNeighborsClassifier(3), labels)
    # Create the server
    socket = zmq_start_server("5555")
    # Process messages
    while True:
        #  Wait for next request from client
        message = socket.recv()
        print("Received request: %s" % message)
        if(message == b'hello'):
        #  Parse Json
            objSample = json.loads(message.decode('utf-8'))
            rgbSamples = []
            #  Do prediction
            prediction = predict(rgbSamples, labels, knn_classifier)
            #  Create json
            jsonprediction = json.dumps(prediction)
            #  Send reply back to client
        except Exception as e:

if __name__ == '__main__':
    # Run the main process

This is a very good approach it grants to have a great continuum from the cloud to the edge, and the used methodology is completely generic: if you replace the Python, for example, with R you can implement a very similar solution with a very limited effort.

There are also two cons with this approach that should be considered.

The first came from the fact that you need to have the Python environment installed in the machine that runs the SAP Cloud Platform Internet of Things Edge Platform and not just a Java environment.

The second is related to the fact that you must deploy the Python script manually into the machine since it’s completely an external module, or you have to implement some code to make the deployment automatic from SAP Edge Service (such as within a custom dashboard or some exposed APIs).


Hard real-time Java (+ Python) implementation

This implementation is going to use an interceptor implemented within SAP Cloud Platform Internet of Things Edge Platform SDK to access in real-time to the data to be processed by the predictive external module.

Like in the other Python example explained in this blog post data is sent to the Python Predictive Analytics module through a data bus I’ve used ZMQ.

The Python script is the same used in the other part of the article.

Create a standard interceptor project as specified in the official documentation and remove all the autogenerated java files.

These are the classes for the interceptor:


import java.util.concurrent.atomic.AtomicReference;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;


 * This class starts the actual implementation for the Interceptor
@Component(immediate = true, service = {})
public class InterceptorActivator {

	private static final Logger log = LoggerFactory.getLogger(InterceptorActivator.class);

	private static Object lock = new Object();
	private static Object zmqlock = new Object();
	private boolean registered = false;
	private static boolean initialized = false;

	private IGatewayInterceptor interceptor;

	private static Socket socket;

	private static String addressAndport = CustomParameters.ZMQ_CONNECTION;

	private static Process pair;

	private static String pythonPath = CustomParameters.PYTHON_PATH;

	private static String scriptName = CustomParameters.PYTHON_SCRIPT_PATH;

	private static volatile boolean zmq = false;
	public static boolean isZmq() {
		synchronized (zmqlock) {
			return zmq;

	public static void setZmq(boolean zmq) {
		InterceptorActivator.zmq = zmq;

	public void start() {"Starting Gateway Interceptor...");
		this.interceptor = new InterceptorImpl();

		new Thread(() -> {
			IGatewayInterceptorService interceptorMng = getInterceptorManager();

			synchronized (lock) {
				if ((interceptorMng != null) && (!registered)) {"Registering implementation of the test interceptor");
					registered = interceptorMng.addInterceptor(interceptor);


	public void stop() {"Stopping Interceptor...");
		IGatewayInterceptorService interceptorMng = getInterceptorManager();

		synchronized (lock) {
			if ((interceptorMng != null) && (registered)) {"Unregistering implementation of the test interceptor");
				registered = false;
	static void closeSocket(){
        socket.disconnect("tcp://" + addressAndport);
        if (pair != null){
	static String receive(){
        byte[] reply = socket.recv(0);
        System.out.println("Received: [" + new String(reply, ZMQ.CHARSET) + "]");
        return new String(reply, ZMQ.CHARSET);

	static void send(String measurement) {
        socket.send(measurement.getBytes(ZMQ.CHARSET), 0);

	static void initSocket() {
		synchronized (zmqlock) {
			socket = new ZContext().createSocket(SocketType.REQ);
	        //update from configreader
	        socket.connect("tcp://" + addressAndport);
	        runPair(pythonPath, scriptName);
	        zmq = true;
    static boolean runPair(String process, String args){
            String resp = receive();
            if (resp.isEmpty() || !resp.contentEquals("hello")){
                return false;
  "Server response: {}", resp);
        catch (Exception e){
            try {
                pair = new ProcessBuilder(process, args).start();
                String msg = receive();
                if (msg.isEmpty() || !msg.contentEquals("hello")){
                    return false;
      "Server response: {}", msg);
            } catch (IOException ex) {
                log.error(ex.getMessage(), ex);
                return false;
        return true;

	public static boolean isInitialized() {
		return initialized;

	public static void setInitialized(boolean initialized) {
		InterceptorActivator.initialized = initialized;

	// Available Declarative Services
	// To define and consume services via XML metadata, see OSGI-INF/InterceptorActivator.xml

	 * Interceptor Manager
	private static AtomicReference<IGatewayInterceptorService> interceptorMngr = new AtomicReference<>();

	@Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
	void setInterceptorManager(IGatewayInterceptorService arg) {
		DSUtils.setRef(log, interceptorMngr, arg, IGatewayInterceptorService.class, this.getClass());

	void unsetInterceptorManager(IGatewayInterceptorService arg) {
		DSUtils.removeRef(log, interceptorMngr, arg, IGatewayInterceptorService.class, this.getClass());

	public static IGatewayInterceptorService getInterceptorManager() {
		return DSUtils.get(log, interceptorMngr, DSUtils.WAIT_FOR_VALID_REFERENCE);

	 * Frame Parser
	private static AtomicReference<IJMSFrameParser> frameParser = new AtomicReference<>();

	@Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
	void setFrameParser(IJMSFrameParser arg) {
		DSUtils.setRef(log, frameParser, arg, IJMSFrameParser.class, this.getClass());

	void unsetFrameParser(IJMSFrameParser arg) {
		DSUtils.removeRef(log, frameParser, arg, IJMSFrameParser.class, this.getClass());

	public static IJMSFrameParser getFrameParser() {
		return DSUtils.get(log, frameParser, DSUtils.WAIT_FOR_VALID_REFERENCE);

	 * Device manager service
	private static AtomicReference<IDeviceManagerAsync> deviceManager = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setDeviceManagerService(IDeviceManagerAsync arg) {
		DSUtils.setRef(log, deviceManager, arg, IDeviceManagerAsync.class, this.getClass());

	void unsetDeviceManagerService(IDeviceManagerAsync arg) {
		DSUtils.removeRef(log, deviceManager, arg, IDeviceManagerAsync.class, this.getClass());

	public static IDeviceManagerAsync getDeviceManagerService() {
		return DSUtils.get(log, deviceManager, !DSUtils.WAIT_FOR_VALID_REFERENCE);
	 * Gateway manager service
	private static AtomicReference<IGatewayProperties> gatewayManagerService = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setGatewayManagerService(IGatewayProperties arg) {
		DSUtils.setRef(log, gatewayManagerService, arg, IGatewayProperties.class, this.getClass());

	void unsetGatewayManagerService(IGatewayProperties arg) {
		DSUtils.removeRef(log, gatewayManagerService, arg, IGatewayProperties.class, this.getClass());

	public static IGatewayProperties getGatewayManagerService() {
		return DSUtils.get(log, gatewayManagerService, !DSUtils.WAIT_FOR_VALID_REFERENCE);
	 * Event Sender
	private static AtomicReference<EventSender> eventSender = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setEventSender(EventSender arg) {
		DSUtils.setRef(log, eventSender, arg, EventSender.class, this.getClass());

	void unsetEventSender(EventSender arg) {
		DSUtils.removeRef(log, eventSender, arg, EventSender.class, this.getClass());

	public static EventSender getEventSender() {
		return DSUtils.get(log, eventSender, !DSUtils.WAIT_FOR_VALID_REFERENCE);

	 * Data Manager
	private static AtomicReference<IDataManager> dataManager = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setDataManagerService(IDataManager arg) {
		DSUtils.setRef(log, dataManager, arg, IDataManager.class, this.getClass());

	void unsetDataManagerService(IDataManager arg) {
		DSUtils.removeRef(log, dataManager, arg, IDataManager.class, this.getClass());

	public static IDataManager getDataManagerService() {
		return DSUtils.get(log, dataManager, !DSUtils.WAIT_FOR_VALID_REFERENCE);

	 * Topology Service
	private static AtomicReference<IGatewayTopology> topologyService = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setTopologyManagerService(IGatewayTopology arg) {
		DSUtils.setRef(log, topologyService, arg, IGatewayTopology.class, this.getClass());

	void unsetTopologyManagerService(IGatewayTopology arg) {
		DSUtils.removeRef(log, topologyService, arg, IGatewayTopology.class, this.getClass());

	public static IGatewayTopology getTopologyService() {
		return DSUtils.get(log, topologyService, !DSUtils.WAIT_FOR_VALID_REFERENCE);

	 * Command Manager
	private static AtomicReference<CommandManager> commandManager = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setCommandManager(CommandManager arg) {
		DSUtils.setRef(log, commandManager, arg, CommandManager.class, this.getClass());

	void unsetCommandManager(CommandManager arg) {
		DSUtils.removeRef(log, commandManager, arg, CommandManager.class, this.getClass());

	public static CommandManager getCommandManager() {
		return DSUtils.get(log, commandManager, !DSUtils.WAIT_FOR_VALID_REFERENCE);

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class InterceptorImpl
implements IGatewayInterceptor {

	private static final Logger log = LoggerFactory.getLogger(InterceptorImpl.class);

	List<Double> intervalValues = new ArrayList<>();

	private static ObjectMapper mapper = new ObjectMapper();

	public void processObject(String pointcutName, Object... args)
			throws Exception {
		// check the messagebus is running
		if (!InterceptorActivator.isZmq()) {
		try {
			IoTServicesPointcut pointcut = IoTServicesPointcut.valueOf(pointcutName);
			switch (pointcut) {
				// triggered upon dispatch of parsed sensor data
		} catch (Exception e) {

	public List<String> getBranchPoints() {
		List<String> list = new ArrayList<>();
		return list;

	public void onError(Object event, String pointcut, Exception e) {"OnError triggered; pointcut is {}", pointcut);

	// Go through measure list
	private void manageIncomingMeasures(Object... args) {
		List<WSNParsedMeasure> measures = (List<WSNParsedMeasure>) args[0];
		// list of measures that are going to be dropped
		List<WSNParsedMeasure> toBeRemoved = new ArrayList<>();

		if (measures != null) {
			for (WSNParsedMeasure wsnParsedMeasure : measures) {

				List<Value<?>> valueList = wsnParsedMeasure.getValues();"Measurements received for CapabilityID: {}", wsnParsedMeasure.getCapabilityAlternateId());
				manageIncomingValues(wsnParsedMeasure, valueList, toBeRemoved);

			for (WSNParsedMeasure wsnParsedMeasure : toBeRemoved) {
				measures.remove(wsnParsedMeasure);"Measure for Capability {}, Device {} and Sensor {} will be dropped",
						wsnParsedMeasure.getCapabilityAlternateId(), wsnParsedMeasure.getDeviceAlternateId(),

	// Go through values for measure
	private void manageIncomingValues(WSNParsedMeasure wsnParsedMeasure, List<Value<?>> valueList,
			List<WSNParsedMeasure> toBeRemoved) {
				.equals(CustomParameters.TARGET_CAPABILITY_ALTERNATEID_COLOR) && 
				valueList.size() == CustomParameters.Colors.values().length) {
			// create a map to be serialized for the predictive module
			Map<String,Object> map = new HashMap<String, Object>();
			map.put(valueList.get(0).getMeasureName(), valueList.get(0).getInnerMeasure());
			map.put(valueList.get(1).getMeasureName(), valueList.get(1).getInnerMeasure());
			map.put(valueList.get(2).getMeasureName(), valueList.get(2).getInnerMeasure());
			String measurement;
			try {
				// serialize the map 
				measurement = mapper.writeValueAsString(map);"json = {}", measurement);
			} catch (JsonProcessingException e) {
				log.error(e.getMessage(), e);
			// Send measurement
			String reply = InterceptorActivator.receive();
			Map<String, ?> prediction;
			try {
				//read the predicted value into a map
				prediction = mapper.readValue(reply, Map.class);
			} catch (IOException e) {
				log.error(e.getMessage(), e);
			// send the prediction in the data ingestion pipeline
			sendPrediction(wsnParsedMeasure, prediction);


	public void sendPrediction(WSNParsedMeasure wsnParsedMeasure, Map<String, ?> rgbPrediction) {

		// send Measure
		IGatewayTopology topologyService = InterceptorActivator.getTopologyService();
		String deviceAlternateId = wsnParsedMeasure.getDeviceAlternateId();
		String sensorAlternateId = wsnParsedMeasure.getSensorAlternateId();
		int sensorTypeAlternateId = wsnParsedMeasure.getSensorTypeAlternateId();
		String capabilityAlternateId = CustomParameters.TARGET_CAPABILITY_ALTERNATEID_PREDICTION;

		if (topologyService != null) {
			// Device should be present
			Device device = topologyService.getDevice(deviceAlternateId);
			if (device == null) {
				log.warn("No device: {}", deviceAlternateId);
			// Sensor should be present
			Sensor sensor = topologyService.getSensor(deviceAlternateId, sensorAlternateId);
			if (sensor == null) {
				log.warn("No sensor: {}", sensorAlternateId);

			List<WSNParsedMeasure> measureList = new ArrayList<>();

			// Add predicted label
			Value<String> label = new Value<>();
			label.setInnerMeasure((String) rgbPrediction.get(CustomParameters.Prediction.label.toString()));

			// Add first neighbor
			Value<Float> neighbor1 = new Value<>();

			// Add second neighbor
			Value<Float> neighbor2 = new Value<>();

			// Add three neighbor
			Value<Float> neighbor3 = new Value<>();

			List<Value<?>> values = new ArrayList<>();

			// create the measure for the predicted value
			WSNParsedMeasure prediction = new WSNParsedMeasure(deviceAlternateId, sensorAlternateId,
					sensorTypeAlternateId, capabilityAlternateId, values, new Date());


			IDataManager dataManager = InterceptorActivator.getDataManagerService();
			dataManager.sendMeasures(device, measureList);


 * It is possible to customize all the values present in this class to customize this interceptor example
public class CustomParameters {

	public static enum Colors{
	public static enum Prediction{

		public String toString() {
			//map propety name with the output of the prediction. The name() is equals to the propertyname and the python has the same output of the toString()
			switch(this) {
			case neighbor1: return "neighbor(1)";
			case neighbor2: return "neighbor(2)";
			case neighbor3: return "neighbor(3)";
			return super.toString();

	// ****** Constants related to INTERCEPTORIMPL
	// - Target Device
	public static final String ZMQ_CONNECTION = "";
	public static final String PYTHON_PATH = "python";
	public static final String PYTHON_SCRIPT_PATH = "./";

	// - Target Sensor
	// public static final String TARGET_SENSOR_ALTERNATEID = "1";

	// - Target Capabilities
	public static final String TARGET_CAPABILITY_ALTERNATEID_COLOR = "color";
	public static final String TARGET_CAPABILITY_ALTERNATEID_PREDICTION = "color prediction";

Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: predictive model
Bundle-SymbolicName: predictive model
Bundle-Version: 4.0.0.SNAPSHOT
Bundle-Vendor: XYZ Company
Bnd-LastModified: 1481173207479
Require-Capability: osgi.service;filter:="(";effective:=active;cardinality:=multiple,osgi.service;filter:="(";effective:=active;cardinality:=multiple,;filter:="(&("
Build-Jdk: 1.8.0_92
Service-Component: OSGI-INF/
Bundle-ActivationPolicy: lazy
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
Bundle-ClassPath: third-parties-libs/,
  • is untouched

Let’s have a look at the implementation in the InterceptorImpl class.

Every measurement received within the interceptor we are going to invoke the manageIncomingValues method.

This method is building a key-value map with the measurement and serialize it to the Python script within our ZMQ bus.

Like in the other example the Python code fetch the measurement from the message bus, calculate the prediction and send the predicted data back on the bus.

The InterceptorImpl class is going to receive these values with the InterceptorActivator.receive() command in a JSON format; after the conversion we are ready to send the data back to SAP Cloud Platform Internet of Things.

Here you can also implement your custom algorithms, such as the creation of an overall index to validate within an SAP Edge Services Streaming Service Rule if the prediction is valid or not.

This implementation has not been currently implemented in this sample code but you can easily copy and adapt it for the other Python example in this blog post.

To improve the performances I have also created a method sendPrediction that is going to send the measurements for the predicted measures directly into the Edge Platform by using the native and integrated data ingestion pipeline, without using any (external) HTTP client that can really increase the latency.

Build the interceptor and Deploy it to your Edge Platform by using the contextual menu

During the deployment, you have been asked to the Server URL that should be provided in the form

https://<HOSTNAME>:443/<INSTANCE ID>/iot/core/api/v1/tenant/<TENANT ID>

You are also asked for the Gateway ID (and not the alternate Id), you can easily get it from your SAP Cloud Platform Internet of Things Cockpit.

This is the expected output of the Eclipse console

[31/10/19 14.30] Deployment successful; Filename=predictive model.jar
[31/10/19 15.19] Build was successful.
[31/10/19 15.19] 
Sending 'POST' request to URL : https://HOSTNAME:443/INSTANCE ID/iot/core/api/v1/tenant/TENANT ID/gateways/GATEWAY ID/bundles



In this blog post, I’ve explained two alternatives to implement a predictive analytics scenario at the edge by using an external implementation of the algorithm with Python.

You should have also some references on how to replace Python with your favorite language such as R.

This example is using and integrating the features of SAP Cloud Platform Internet of Things SDK, SAP Edge Services, and in particular Persistence Service, Streaming Service, and Essential Business Functions.

Be the first to leave a comment
You must be Logged on to comment or reply to a post.