Skip to Content
Technical Articles

Predictive Analytics at the EDGE as a Python script within a message bus

This example is an alternative for chapter “6. Implementation” of this blog post

Python bundle variant

There are several reasons why I decide to implement these two Python-based variants.

The main reason is related to the usage; differently from Java, Python is a really used language in the data science: several libraries are available only for Python and C (wrapped with Cython).

A second reason is that this method is very generic and can be considered as a precooked box to run any possible algorithm already implemented with your favorite programming language.

This is the schema of the implemented solution

We will focus on the components in the grey boxes.

As in the pure Java implementation, I have leveraged the SAP official example https://github.com/SAP/iot-edge-services-samples/tree/master/persistence-aggregation-max-temp to implement the algorithm to read the data within Persistence Service.

The persistence java client classes are very similar in the implementation, so you can start from it as well in case you would like to reproduce the same example.

I have modified the Engine.java to permit to invoke the custom predictive module that is responsible to interact with your python code that makes the prediction with a ScheduledExecutorService.

package com.sap.iot.edgeservices.persistence.sample;

import java.util.concurrent.*;

/**
 * The Engine will continuously run the calculation 
 */
public class Engine extends Thread {
	
	////////////////////
	// class fields
	////////////////////
	
	private Calculation calculation;						// the parent interface to the calculation class
	private boolean bStop = false;							// flag that determines if the engine should continue
	private ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
	private ScheduledFuture<?> thread;

	////////////////////
	// Constructors
	////////////////////
	
	/**
	 * Ctor for the engine.  
	 * @param calculation
	 */
	public Engine( Calculation calculation ) {
		PersistenceSampleActivator.printlnDebug("Engine:ctor - called");
		this.calculation = calculation;
		initialize();
	}
	
	////////////////////
	// Public methods
	////////////////////
	
	public void run() {
		PersistenceSampleActivator.printlnDebug("Engine:run - called");
        try {
			thread = threadPool.scheduleAtFixedRate(calculation,0, 10000, TimeUnit.MILLISECONDS);

		} catch (Exception e) {
            PersistenceSampleActivator.println("ERROR: Engine: Problem executing the calculation: ");
            e.printStackTrace();
        }
	}
	
	
	public void stopGracefully() {
		PersistenceSampleActivator.println("Engine:stopGracefully - called");
		thread.cancel(true);
	}
	
	////////////////////
	// private methods
	////////////////////
	
	private void initialize() {
		PersistenceSampleActivator.printlnDebug("Engine:initialize - called");
	}
}

Other modifications have been introduced into the pom.xml file to permits the usage of the ZeroMQ Java libraries, that is the simple message bus we have decided to use to communicate with the python module.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.sap.iot.edgeservices.persistence.sample</groupId>
    <artifactId>PredictiveModel</artifactId>
    <version>1.0.0</version>
    <properties>
        <bundle.symbolicName>${project.artifactId}</bundle.symbolicName>
        <bundle.version>${project.version}</bundle.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.sap.iot.edgeservices</groupId>
            <artifactId>PersistenceService</artifactId>
            <version>3.1909.0</version>
            <type>bundle</type>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.osgi</groupId>
            <artifactId>org.osgi.core</artifactId>
            <version>4.3.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.osgi</groupId>
            <artifactId>org.osgi.service.component.annotations</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-osgi</artifactId>
            <version>2.9.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math3</artifactId>
            <version>3.6.1</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.zeromq</groupId>
            <artifactId>jeromq</artifactId>
            <version>0.5.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.zeromq</groupId>
            <artifactId>jnacl</artifactId>
            <version>0.1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.7</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.7</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifestFile>META-INF/MANIFEST.MF</manifestFile>
                    </archive>
                </configuration>
            </plugin>
            <!-- the versions plugin can be used to check for new plugin versions
                like this: mvn versions:Display-plugin-updates -->
            <plugin>
                <groupId>org.apache.felix</groupId>
                <artifactId>maven-bundle-plugin</artifactId>
                <extensions>true</extensions>
                <executions>
                    <!-- Configure extra execution of 'manifest' in process-classes
                        phase to make sure SCR metadata is generated before unit test runs -->
                    <execution>
                        <id>scr-metadata</id>
                        <goals>
                            <goal>manifest</goal>
                        </goals>
                        <configuration>
                            <supportIncrementalBuild>true</supportIncrementalBuild>
                        </configuration>
                    </execution>
                </executions>
                <configuration>
                    <exportScr>true</exportScr>
                    <manifestLocation>META-INF</manifestLocation>
                    <instructions>
                        <!-- Enable processing of OSGI DS component annotations -->
                        <_dsannotations>*</_dsannotations>
                        <!-- Enable processing of OSGI metatype annotations -->
                        <_metatypeannotations>*</_metatypeannotations>
                    </instructions>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

I have used ZeroMQ in the current codebase as a simple socket-based implementation that permits our java code to connect and communicate over a socket. It means that the Java message bus implementation represents the client in the communication and the Python code will implement the server.

  • QueryData.java is a refactoring and an extension of all the queries which are executed from the SQLAnywhere database within the Persistence java client.

I have modified the method to get the data because differently from the persistence aggregation example, we are getting e properties instead of one. Here is the content:

package com.sap.iot.edgeservices.persistence.sample.custom;

import com.sap.iot.edgeservices.persistence.sample.PersistenceException;
import com.sap.iot.edgeservices.persistence.sample.PersistenceSampleActivator;
import com.sap.iot.edgeservices.persistence.sample.db.PersistenceClient;
import com.sap.iot.edgeservices.persistence.sample.db.Table;
import com.sap.iot.edgeservices.persistenceservice.model.PSStatementObject;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class QueryData {
    private static Boolean CLOUD_EDGE_SERVICES;
    private PersistenceClient persistenceClient;
    private List<String> properties = null;

    QueryData(PersistenceClient persistenceClient, Boolean cloudEdgeServices){
        this.persistenceClient = persistenceClient;
        CLOUD_EDGE_SERVICES = cloudEdgeServices;
    }

    String resetQueryTime() {
        // reset the query time for the next query
        String mostRecentQueryTime = null;
        try {
            mostRecentQueryTime = persistenceClient.getFirstRowFirstColumn(persistenceClient.executeQuery("SELECT NOW()"));
            PersistenceSampleActivator.printlnDebug("new date is: " + mostRecentQueryTime );
        } catch (PersistenceException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        return mostRecentQueryTime;
    }
    
    // SQL used to select the data
    private String getSqlForMetadata( String profileId, String objectId) {

        // NOTE: only top 1000 records are returned.  If more data is expected, then
        // query should be changed to use database aggregation instead of Java
        String sql = "SELECT top 1000 m.PROP_ID, "
                + "          m.PROP_SEQ,  "
                + "          m.TYPE_ID "
                + "     FROM EFPS.MEASURE_TYPE_PROPERTY m "
                + "    WHERE m.OBJECT_ID = '" + objectId + "'";
        // Add PROFILE_ID only in case of OP Edition
        if(!CLOUD_EDGE_SERVICES){
            sql += "      AND m.PROFILE_ID = '" + profileId + "'";
        }
        sql += " ORDER BY m.PROP_SEQ ASC";


        PersistenceSampleActivator.printlnDebug("getSqlForMetadata: ============");
        PersistenceSampleActivator.printlnDebug(sql);

        return sql;
    }

    // SQL used to select the data
    String getSqlForMeasureValues(String profileId, String objectId, String sinceDate) {

        // NOTE: only top 1000 records are returned.  If more data is expected, then
        // query should be changed to use database aggregation instead of Java
        String sql = "SELECT top 1000 m.DEVICE_ADDRESS, "
                + "          CAST(m.MEASURE_VALUE AS VARCHAR(32)) MEASURE_VALUE,  "
                + "          m.DATE_RECEIVED "
                + "     FROM EFPS.MEASURE m "
                + "    WHERE m.OBJECT_ID = '" + objectId + "'"
                + "      AND m.DATE_RECEIVED > '" + sinceDate + "'";
        // Add PROFILE_ID only in case of OP Edition
        if(!CLOUD_EDGE_SERVICES){
            sql += "      AND m.PROFILE_ID = '" + profileId + "'";
        }
        sql += " ORDER BY m.DATE_RECEIVED DESC";


        PersistenceSampleActivator.printlnDebug("getSqlForMeasureValues: ============");
        PersistenceSampleActivator.printlnDebug(sql);

        return sql;
    }

    // create a table to store the results (if you want... ) not implemented in this sample
    private void createMaxTemperatureTable() {
        PersistenceSampleActivator.printlnDebug("CalculateMaxTemperature:createMaxTemperatureTable - called");

        String sql = "CREATE TABLE MaxTemperature ( "
                + "              id INT PRIMARY KEY DEFAULT AUTOINCREMENT, "
                + "          device VARCHAR(32), "
                + "         maxTemp FLOAT NOT NULL, "
                + "        windowMS INTEGER NOT NULL, "
                + "        calcTime DATE DEFAULT CURRENT UTC TIMESTAMP); ";
        Table table = new Table(persistenceClient);
        try {
            // if table already exists, it will not modify it
            table.createIfNeeded(sql, "MaxTemperature");
            PersistenceSampleActivator.println("CalculateMaxTemperature:createMaxTemperatureTable - success");
        } catch (PersistenceException e) {
            // TODO Auto-generated catch block
            PersistenceSampleActivator.println("ERROR: CalculateMaxTemperature:createMaxTemperatureTable - failed");
            //state = Calculation.State.ERROR;
            e.printStackTrace();
        }
    }


    // convert the result set into an array of Doubles
    Map<String, ArrayList<Map<String, Float>>> getValuesAsFloatMapsByDevice(PSStatementObject statementObject, String sensorTypeAlternateId, String capabilityAlternateId) {
        properties = this.getMetadata(sensorTypeAlternateId, capabilityAlternateId);
        //ArrayList<Double> values = new ArrayList<>();
        Map<String, ArrayList<Map<String, Float>>> valuesByDevice = new HashMap<>();

        PersistenceSampleActivator.printlnDebug("getValuesAsDoublesByDevice start-------------");
        if (statementObject.hasResultList()) {
            statementObject.getResultList().forEach((row) -> {
                String device = row.get(0).getValue().toString();
                PersistenceSampleActivator.printlnDebug("device = " + device);
                ArrayList<Map<String, Float>> valueMap = valuesByDevice.get(device);
                if ( valueMap == null ) {
                    valueMap = new ArrayList<>();
                    valuesByDevice.put(device, valueMap);
                }
                String values = row.get(1).getValue().toString();
                PersistenceSampleActivator.printlnDebug("value = " + values);
                PersistenceSampleActivator.printlnDebug(device + ":" + values);

                String date = row.get(2).getValue().toString();
                PersistenceSampleActivator.printlnDebug("date = " + date);

                Map<String,Float> mapValues = new HashMap<>();
                try {
                    //Split into properties
                    String[] valuesArray = values.split(" ");
                    for (int i = 0; i < valuesArray.length; i++){
                        String prop = properties.get(i);
                        Float f = Float.valueOf(valuesArray[i]);
                        mapValues.put(prop, f);
                    }
                    valueMap.add(mapValues);
                    PersistenceSampleActivator.printlnDebug("value added");

                } catch (NumberFormatException nfe) {
                    StringBuffer errMsg = new StringBuffer("Error: not a number: ");
                    row.forEach((column) -> {
                        errMsg.append(column.getValue() + "(" + column.getColumnName() + ":" + column.getMetadata() + "), ");
                    });
                    PersistenceSampleActivator.printlnDebug(valueMap.toString());
                }
            });
        } else {
            PersistenceSampleActivator.printlnDebug(" ResultSet is empty");
        }

        PersistenceSampleActivator.printlnDebug("getValuesAsDoublesByDevice end---------------");
        return valuesByDevice;
    }

    // convert the result set into an array of Doubles
    private List<String> getMetadataFromResultset(PSStatementObject statementObject) {
        //ArrayList<Double> values = new ArrayList<>();
        List<String> metadata = new ArrayList<>();

        PersistenceSampleActivator.printlnDebug("getMetadataFromResultset start-------------");
        if (statementObject.hasResultList()) {
            statementObject.getResultList().forEach((row) -> {
                String type = row.get(0).getValue().toString();
                metadata.add(type);
                PersistenceSampleActivator.printlnDebug("type = " + type);
            });
        } else {
            PersistenceSampleActivator.printlnDebug(" ResultSet is empty");
        }

        PersistenceSampleActivator.printlnDebug("getMetadataFromResultset end---------------");
        return metadata;
    }

    List<String> getMetadata(String sensorTypeAlternateId, String capabilityAlternateId) {

        PSStatementObject stmt;
        String sql = getSqlForMetadata(sensorTypeAlternateId,
                capabilityAlternateId);

        List<String> types = new ArrayList<>();
        try {
            stmt = persistenceClient.executeQuery(sql);
            types = getMetadataFromResultset(stmt);
        } catch (PersistenceException pe) {
            pe.printStackTrace();
        } catch ( Exception e ) {
            e.printStackTrace();
        }
        return types;
    }
}
  • DataStreamer class contains just the methods to stream the results to SAP Cloud Platform Internet of Things based on a simple HTTP client
  • ZMQAdapter.java is an helper class that wraps the features exposed by ZMQ
package com.sap.iot.edgeservices.persistence.sample.custom;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

import java.io.IOException;

public class ZMQAdapter {
    private ZMQ.Socket socket;
    private String addressAndport;
    private Process pair;
    ZMQAdapter(String addressAndport){
        ZContext context = new ZContext();
        // Socket to talk to clients
        socket = context.createSocket(SocketType.REQ);
        socket.setReceiveTimeOut(10000);
        socket.connect("tcp://" + addressAndport);
        this.addressAndport = addressAndport;
    }
    void closeSocket(){
        socket.disconnect("tcp://" + addressAndport);
        socket.close();
        if (pair != null){
            pair.destroyForcibly();
        }
    }

    String receive(){
        byte[] reply = socket.recv(0);
        System.out.println("Received: [" + new String(reply, ZMQ.CHARSET) + "]");
        return new String(reply, ZMQ.CHARSET);
    }

    void send(String measurement) {
        socket.send(measurement.getBytes(ZMQ.CHARSET), 0);
    }

    boolean runPair(String process, String args){
        try{
            send("hello");
            String resp = receive();
            if (resp.isEmpty() || !resp.contentEquals("hello")){
                return false;
            }
            System.out.println("Server response: " + resp);
        }
        catch (Exception e){
            try {
                pair = new ProcessBuilder(process, args).start();
                send("hello");
                String msg = receive();
                if (msg.isEmpty() || !msg.contentEquals("hello")){
                    return false;
                }
                System.out.println("Server response: " + msg);
            } catch (IOException ex) {
                ex.printStackTrace();
                return false;
            }
        }
        return true;
    }
}

There are two methods, one to send the data into the message bus and the other to receive them from it. We are going to use it to send JSON packets.

The runPair method first pings the server to verify that is alive. If it’s not alive, he’s going to start it.

the close will be invoked once when the service is stopped. It will also stop the server (when possible).

  • The core of the logic is inside our PredictValues class.

Starting from the initialize method you can find the details to connect to the server.

The rest of the logic is called inside the run method that is automatically scheduled every 10 seconds.

This method is going to fetch the data within the Java Persistence Service Client using a time window of 10 seconds.

In case there are some data, for each sample we are going to send it into the message bus and wait to receive the result from the ZeroMQ socket, which contains a JSON with the predicted value.

Then we are going to build an index that came from a mathematical formula that considers an aggregation of the prediction for all the samples.

Finally, we can stream the punctual prediction and the global prediction (represented trough the index) back to SAP Cloud Platform Internet of Things.

This operation permits to compute additional manipulation to the predicted values such as to send them back to a 3rd party system in the cloud, or, like in this scenario, with a Rule computed within the Streaming Service.

package com.sap.iot.edgeservices.persistence.sample.custom;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sap.iot.edgeservices.persistence.sample.Calculation;
import com.sap.iot.edgeservices.persistence.sample.PersistenceException;
import com.sap.iot.edgeservices.persistence.sample.PersistenceSampleActivator;
import com.sap.iot.edgeservices.persistence.sample.db.PersistenceClient;
import com.sap.iot.edgeservices.persistenceservice.model.PSStatementObject;

import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

public class PredictValues extends Calculation {
	private static final String PYTHON_CMD = "python os command here";
	private static final String PYTHON_SCRIPT_PATH = "mypythonmodule.py here";

	////////////////////
	// Static fields
	////////////////////

	// this needs to be static since the Engine will be asking for this before it has instantiated it
	private static String logLevel = "INFO";

	private static Boolean CLOUD_EDGE_SERVICES = true;  //SET TO false for ON-PREMISE

	private static String IOTS_MEASURES_URL = "http://localhost:8699/measures/";	// destination to send calculated values
	private static String TRANSFORMED_SENSOR_TYPE_ALTERNATE_ID = "255";			// sensorTypeAlternateId to send calculated values
	private static String TRANSFORMED_CAPABILITY_ALTERNATE_ID = "color prediction";		// capabilityAlternateId to send calculated values
	private static String RESULT_CAPABILITY_ALTERNATE_ID = "validity color score";		// capabilityAlternateId to send calculated values
	private static String TRANSFORMED_SENSOR_ALTERNATE_ID = "color sensor";			// sensorAlternateId to send calculated values
	private static final float PLANT_SCALE_OUT_OF_RANGE = 1.25f;
	private static final float PLANT_LIMIT_OUT_OF_RANGE = 100f;

	////////////////////
	// Class fields
	////////////////////

	private int calculationFrequencyMS;	// frequency that the engine will calculate
	private String sensorTypeAlternateId;	// the sensorTypeAltnerateId  that the engine will calculate on 
	private String capabilityAlternateId = "color";	// the capabilityAlternateId that the engine will calculate on

	private QueryData queryData;
	private String mostRecentQueryTime = null;
	private ZMQAdapter zmq;
	
	
	////////////////////
	// constructors
	////////////////////

	@Override
	public void stopGracefully() {
		zmq.closeSocket();
	}

	public PredictValues(PersistenceClient persistenceClient ) {
		super(persistenceClient);

		PersistenceSampleActivator.printlnDebug("PredictValues:ctor - called");

		// properties that control how the Calculation will be done
		calculationFrequencyMS = 10000;	// 10 seconds

		if (CLOUD_EDGE_SERVICES) {
			// cloud edition
			sensorTypeAlternateId = "*"; 	// only for this sensorType
		} else {
			// on-premise edition
			sensorTypeAlternateId = "color"; 	// only for this Sensor Profile
		}

		mostRecentQueryTime = queryData.resetQueryTime();
	}

	////////////////////
	// public methods
	////////////////////

	public void run() {
		PersistenceSampleActivator.printlnDebug("PredictValues:run - called");

		// ensure we have initialized and in a good state
		if ( state != State.RUNNING ) {
			PersistenceSampleActivator.println("Warning: NOT RUNNING: PredictValues.state = " + state );
			return;
		}

		// determine if any configuration changes have been sent
		updateConfigurations();

		// get the data and create a primitive array
		Map<String, ArrayList<Map<String, Float>>> valuesByDevice = getSampleData();

		// for each device that sent in a value, send out the max
		valuesByDevice.forEach((device, values) -> {
			PersistenceSampleActivator.println( "======================== Computing prediction" );

			Float validPrediction = null;

			for (Map<String, Float> rgbmap : values ) {

				ObjectMapper objectMapper = new ObjectMapper();
				String measurement;
				try {
					measurement = objectMapper.writeValueAsString(rgbmap);
					System.out.println("json = " + measurement);
				} catch (JsonProcessingException e) {
					e.printStackTrace();
					continue;
				}
				// Send measurement
				zmq.send(measurement);
				String reply = zmq.receive();
				Map<String, ?> prediction;
				try {
					prediction = objectMapper.readValue(reply, Map.class);
				} catch (IOException e) {
					e.printStackTrace();
					continue;
				}
				validPrediction = checkPrediction(prediction, validPrediction, evaluator.getOutputFields());
				if (CLOUD_EDGE_SERVICES) {
					// send the results back into IOT Service engine as a different capability
					// FOR ON-PREMISE EDGE SERVICES, comment out this line
					DataStreamer.streamResults( IOTS_MEASURES_URL,
							device,
							TRANSFORMED_SENSOR_TYPE_ALTERNATE_ID,
							TRANSFORMED_CAPABILITY_ALTERNATE_ID,
							TRANSFORMED_SENSOR_ALTERNATE_ID,
							prediction);
				}
			}

			if (CLOUD_EDGE_SERVICES) {
				// send the results back into IOT Service engine as a different capability
				// FOR ON-PREMISE EDGE SERVICES, comment out this line
				DataStreamer.streamResult( IOTS_MEASURES_URL,
						device,
						TRANSFORMED_SENSOR_TYPE_ALTERNATE_ID,
						RESULT_CAPABILITY_ALTERNATE_ID,
						TRANSFORMED_SENSOR_ALTERNATE_ID,
						validPrediction);
			}
		});
	}

	private Float checkPrediction(Map<String, ?> prediction, Float validPrediction, List<OutputField> outputFields) {
		AtomicReference<Float> distance = new AtomicReference<>(0f);
		outputFields.forEach(field -> {
			try{
				float val = Float.parseFloat(String.valueOf(prediction.get(field.getName().getValue())));
				if(val > PLANT_LIMIT_OUT_OF_RANGE){
					val *= PLANT_SCALE_OUT_OF_RANGE;
				}
				float finalVal = val;
				distance.updateAndGet(v -> v + finalVal);
			}
			catch (Exception e){
				distance.updateAndGet(v -> v + 2 * outputFields.size() * PLANT_LIMIT_OUT_OF_RANGE);
			}
		});
		if (validPrediction == null){
			return distance.get() / outputFields.size();
		}
		//prediction not in range
		if ((distance.get() / outputFields.size()) > PLANT_LIMIT_OUT_OF_RANGE || validPrediction > PLANT_LIMIT_OUT_OF_RANGE){
			return (distance.get() / outputFields.size()) > validPrediction ? (distance.get() / outputFields.size()) : validPrediction;
		}

		return ((distance.get() / outputFields.size()) + validPrediction) / 2;
	}

	// this is made available to the engine
	@Override
	public int getPollingFreqencyMS() {
		// return the calculation delay
		return calculationFrequencyMS;
	}

	@Override
	public String getLogLevel() {
		return logLevel;
	}

	////////////////////
	// private methods
	////////////////////

	// this is called by the super class, Calculation
	protected void initialize() {
		//init auxiliary classes
		queryData = new QueryData(persistenceClient,CLOUD_EDGE_SERVICES);
		/*python ZMQ wrapper*/
		zmq = new ZMQAdapter("127.0.0.1:5555");
		boolean started = zmq.runPair(PYTHON_CMD, PYTHON_SCRIPT_PATH);
		/*end python*/
		PersistenceSampleActivator.printlnDebug("PredictValues:initialize - called");
		// if you want to store the result to a custom table, create a table to store them.
		this.state = started ? State.RUNNING : State.ERROR;
	}

	// selects the sample data from the persistence database which is constantly being updated
	private Map<String, ArrayList<Map<String, Float>>> getSampleData() {

		PSStatementObject stmt;
		String sql = queryData.getSqlForMeasureValues( sensorTypeAlternateId,
				capabilityAlternateId,
				mostRecentQueryTime );

		mostRecentQueryTime = queryData.resetQueryTime();

		Map<String, ArrayList<Map<String, Float>>> valuesByDevice = new HashMap<>();
		try {
			stmt = persistenceClient.executeQuery(sql);
			valuesByDevice = queryData.getValuesAsFloatMapsByDevice(stmt, sensorTypeAlternateId, capabilityAlternateId);
		} catch (PersistenceException pe) {
			// TODO Auto-generated catch block
			pe.printStackTrace();
		} catch ( Exception e ) {
			e.printStackTrace();
		}
		return valuesByDevice;
	}

	// go to the database and see if any of the configurations have changed
	private void updateConfigurations() {
	}

}

In the python, first, I have created the “server” connection for the message bus.

Then I have started an infinite loop that is going to wait for a message, parse it in a usable format, and invoke the prediction within a python sub-call that is already been implemented (for example within a Jupyther Notebook in SAP Data Intelligence in the cloud).

After the result is ready it’s transformed in several key-value pairs (the equivalent of a Java Map), converted to JSON string and sent into the message bus.

# Python system libraries
import json
import os.path
# Third-party libraries
import sklearn.neighbors
import zmq
# Change this to the folder containing the model data
DATA_DIR = '.'
TRAINING_DATASET_FILENAME = os.path.join(DATA_DIR, 'standard-colors.json')
f = open(TRAINING_DATASET_FILENAME)
dataset = json.load(f)
f.close()
points = [el['data'] for el in dataset]
labels = [el['label'] for el in dataset]
knn_classifier = sklearn.neighbors.KNeighborsClassifier(3)
knn_classifier.fit(points, labels)

def predict(array_data):
    knn_classifier.verbose=True
    predicted = knn_classifier.predict(array_data)
    print(predicted)
    distances,indexes = knn_classifier.kneighbors(array_data)
    print(distances)
    print([labels[i] for i in indexes[0]])
    print(indexes)
    data = {}
    data['label'] = predicted[0]
    data['neighbor(1)'] = distances[0][0]
    data['neighbor(2)'] = distances[0][1]
    data['neighbor(3)'] = distances[0][2]
    print(data)
    return data

def zmq_start_server(port):
    global socket
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:" + port)
    
    
zmq_start_server("5555")

while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received request: %s" % message)
    if(message == b'hello'):
        socket.send(b'hello')
        continue
    
    #  Parse Json
    obj = json.loads(message.decode('utf-8'))
    array1 = []
    array1.append([obj['R'],obj['G'],obj['B']])
    #  Do prediction
    prediction = predict(array1)
    
    #  Create json
    jsonprediction = json.dumps(prediction)
    print(jsonprediction)
    #  Send reply back to client
    socket.send((jsonprediction.encode('utf-8')))

This is a very good approach it grants to have a great continuum from the cloud to the edge, and the used methodology is completely generic: if you replace the Python, for example, with R you can implement a very similar solution with a very limited effort.

There are also two cons with this approach that should be considered.

The first came from the fact that you need to have the Python environment installed in the machine that runs the SAP Cloud Platform Internet of Things Edge Platform and not just a Java environment.

The second is related to the fact that you must deploy the Python script manually into the machine since it’s completely an external module, or you have to implement some code to make the deployment automatic from SAP Edge Service (such as within a custom dashboard or some exposed APIs).

 

Hard real-time Java (+ Python) implementation

This implementation is going to use an interceptor implemented within SAP Cloud Platform Internet of Things Edge Platform SDK to access in real-time to the data to be processed by the predictive external module.

Like in the other Python example explained in this blog post data is sent to the Python Predictive Analytics module through a data bus I’ve used ZMQ.

The Python script is the same used in the other part of the article.

Create a standard interceptor project as specified in the official documentation and remove all the autogenerated java files.

These are the classes for the interceptor:

  • InterceptorActivator.java
package com.sap.iotservices.gateway.interceptor;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

import com.sap.iotservices.commandmanagement.interfaces.CommandManager;
import com.sap.iotservices.gateway.datamanager.IDataManager;
import com.sap.iotservices.gateway.devicemanager.IDeviceManagerAsync;
import com.sap.iotservices.gateway.eventsender.EventSender;
import com.sap.iotservices.gateway.interceptor.custom.CustomParameters;
import com.sap.iotservices.gateway.properties.IGatewayProperties;
import com.sap.iotservices.gateway.topology.IGatewayTopology;
import com.sap.iotservices.hooks.gateway.IGatewayInterceptor;
import com.sap.iotservices.hooks.gateway.IGatewayInterceptorService;
import com.sap.iotservices.utils.DSUtils;
import com.sap.iotservices.xmlparse.IJMSFrameParser;


/**
 * This class starts the actual implementation for the Interceptor
 * 
 */
@Component(immediate = true, service = {})
public class InterceptorActivator {

	private static final Logger log = LoggerFactory.getLogger(InterceptorActivator.class);

	private static Object lock = new Object();
	private static Object zmqlock = new Object();
	private boolean registered = false;
	private static boolean initialized = false;

	private IGatewayInterceptor interceptor;

	private static Socket socket;

	private static String addressAndport = CustomParameters.ZMQ_CONNECTION;

	private static Process pair;

	private static String pythonPath = CustomParameters.PYTHON_PATH;

	private static String scriptName = CustomParameters.PYTHON_SCRIPT_PATH;

	private static volatile boolean zmq = false;
	public static boolean isZmq() {
		synchronized (zmqlock) {
			return zmq;
		}
	}

	public static void setZmq(boolean zmq) {
		InterceptorActivator.zmq = zmq;
	}

	@Activate
	public void start() {
		log.info("Starting Gateway Interceptor...");
		this.interceptor = new InterceptorImpl();

		new Thread(() -> {
			IGatewayInterceptorService interceptorMng = getInterceptorManager();

			synchronized (lock) {
				if ((interceptorMng != null) && (!registered)) {
					log.info("Registering implementation of the test interceptor");
					registered = interceptorMng.addInterceptor(interceptor);
				}
			}
		}).start();

	}

	@Deactivate
	public void stop() {
		log.info("Stopping Interceptor...");
		IGatewayInterceptorService interceptorMng = getInterceptorManager();

		synchronized (lock) {
			if ((interceptorMng != null) && (registered)) {
				log.info("Unregistering implementation of the test interceptor");
				interceptorMng.removeInterceptor(this.interceptor);
				registered = false;
			}
		}
		closeSocket();
	}
	
	static void closeSocket(){
        socket.disconnect("tcp://" + addressAndport);
        socket.close();
        if (pair != null){
            pair.destroyForcibly();
        }
    }
	
	static String receive(){
        byte[] reply = socket.recv(0);
        System.out.println("Received: [" + new String(reply, ZMQ.CHARSET) + "]");
        return new String(reply, ZMQ.CHARSET);
    }

	static void send(String measurement) {
        socket.send(measurement.getBytes(ZMQ.CHARSET), 0);
    }

	static void initSocket() {
		synchronized (zmqlock) {
			socket = new ZContext().createSocket(SocketType.REQ);
	        socket.setReceiveTimeOut(10000);
	        //update from configreader
	        socket.connect("tcp://" + addressAndport);
	        runPair(pythonPath, scriptName);
	        zmq = true;
		}
	}
	
    static boolean runPair(String process, String args){
        try{
            send("hello");
            String resp = receive();
            if (resp.isEmpty() || !resp.contentEquals("hello")){
                return false;
            }
            log.info("Server response: {}", resp);
        }
        catch (Exception e){
            try {
                pair = new ProcessBuilder(process, args).start();
                send("hello");
                String msg = receive();
                if (msg.isEmpty() || !msg.contentEquals("hello")){
                    return false;
                }
                log.info("Server response: {}", msg);
            } catch (IOException ex) {
                log.error(ex.getMessage(), ex);
                return false;
            }
        }
        return true;
    }

	public static boolean isInitialized() {
		return initialized;
	}

	public static void setInitialized(boolean initialized) {
		InterceptorActivator.initialized = initialized;
	}

	//////////////////////////////////////////////////////////////////////////////////////////
	// Available Declarative Services
	// To define and consume services via XML metadata, see OSGI-INF/InterceptorActivator.xml

	/**
	 * Interceptor Manager
	 */
	private static AtomicReference<IGatewayInterceptorService> interceptorMngr = new AtomicReference<>();

	@Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
	void setInterceptorManager(IGatewayInterceptorService arg) {
		DSUtils.setRef(log, interceptorMngr, arg, IGatewayInterceptorService.class, this.getClass());
	}

	void unsetInterceptorManager(IGatewayInterceptorService arg) {
		DSUtils.removeRef(log, interceptorMngr, arg, IGatewayInterceptorService.class, this.getClass());
	}

	public static IGatewayInterceptorService getInterceptorManager() {
		return DSUtils.get(log, interceptorMngr, DSUtils.WAIT_FOR_VALID_REFERENCE);
	}

	/**
	 * Frame Parser
	 */
	private static AtomicReference<IJMSFrameParser> frameParser = new AtomicReference<>();

	@Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
	void setFrameParser(IJMSFrameParser arg) {
		DSUtils.setRef(log, frameParser, arg, IJMSFrameParser.class, this.getClass());
	}

	void unsetFrameParser(IJMSFrameParser arg) {
		DSUtils.removeRef(log, frameParser, arg, IJMSFrameParser.class, this.getClass());
	}

	public static IJMSFrameParser getFrameParser() {
		return DSUtils.get(log, frameParser, DSUtils.WAIT_FOR_VALID_REFERENCE);
	}

	/**
	 * Device manager service
	 */
	private static AtomicReference<IDeviceManagerAsync> deviceManager = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setDeviceManagerService(IDeviceManagerAsync arg) {
		DSUtils.setRef(log, deviceManager, arg, IDeviceManagerAsync.class, this.getClass());
	}

	void unsetDeviceManagerService(IDeviceManagerAsync arg) {
		DSUtils.removeRef(log, deviceManager, arg, IDeviceManagerAsync.class, this.getClass());
	}

	public static IDeviceManagerAsync getDeviceManagerService() {
		return DSUtils.get(log, deviceManager, !DSUtils.WAIT_FOR_VALID_REFERENCE);
	}
	/**
	 * Gateway manager service
	 */
	private static AtomicReference<IGatewayProperties> gatewayManagerService = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setGatewayManagerService(IGatewayProperties arg) {
		DSUtils.setRef(log, gatewayManagerService, arg, IGatewayProperties.class, this.getClass());
	}

	void unsetGatewayManagerService(IGatewayProperties arg) {
		DSUtils.removeRef(log, gatewayManagerService, arg, IGatewayProperties.class, this.getClass());
	}

	public static IGatewayProperties getGatewayManagerService() {
		return DSUtils.get(log, gatewayManagerService, !DSUtils.WAIT_FOR_VALID_REFERENCE);
	}
	/**
	 * Event Sender
	 */
	private static AtomicReference<EventSender> eventSender = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setEventSender(EventSender arg) {
		DSUtils.setRef(log, eventSender, arg, EventSender.class, this.getClass());
	}

	void unsetEventSender(EventSender arg) {
		DSUtils.removeRef(log, eventSender, arg, EventSender.class, this.getClass());
	}

	public static EventSender getEventSender() {
		return DSUtils.get(log, eventSender, !DSUtils.WAIT_FOR_VALID_REFERENCE);
	}

	/**
	 * Data Manager
	 */
	private static AtomicReference<IDataManager> dataManager = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setDataManagerService(IDataManager arg) {
		DSUtils.setRef(log, dataManager, arg, IDataManager.class, this.getClass());
	}

	void unsetDataManagerService(IDataManager arg) {
		DSUtils.removeRef(log, dataManager, arg, IDataManager.class, this.getClass());
	}

	public static IDataManager getDataManagerService() {
		return DSUtils.get(log, dataManager, !DSUtils.WAIT_FOR_VALID_REFERENCE);
	}

	/**
	 * Topology Service
	 */
	private static AtomicReference<IGatewayTopology> topologyService = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setTopologyManagerService(IGatewayTopology arg) {
		DSUtils.setRef(log, topologyService, arg, IGatewayTopology.class, this.getClass());
	}

	void unsetTopologyManagerService(IGatewayTopology arg) {
		DSUtils.removeRef(log, topologyService, arg, IGatewayTopology.class, this.getClass());
	}

	public static IGatewayTopology getTopologyService() {
		return DSUtils.get(log, topologyService, !DSUtils.WAIT_FOR_VALID_REFERENCE);
	}

	/**
	 * Command Manager
	 */
	private static AtomicReference<CommandManager> commandManager = new AtomicReference<>(); // NOSONAR

	@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC)
	void setCommandManager(CommandManager arg) {
		DSUtils.setRef(log, commandManager, arg, CommandManager.class, this.getClass());
	}

	void unsetCommandManager(CommandManager arg) {
		DSUtils.removeRef(log, commandManager, arg, CommandManager.class, this.getClass());
	}

	public static CommandManager getCommandManager() {
		return DSUtils.get(log, commandManager, !DSUtils.WAIT_FOR_VALID_REFERENCE);
	}
}
  • InterceptorImpl.java
package com.sap.iotservices.gateway.interceptor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sap.iotservices.api.models.gateway.Device;
import com.sap.iotservices.api.models.gateway.Sensor;
import com.sap.iotservices.data.WSNdataType;
import com.sap.iotservices.gateway.datamanager.IDataManager;
import com.sap.iotservices.gateway.interceptor.custom.CustomParameters;
import com.sap.iotservices.gateway.topology.IGatewayTopology;
import com.sap.iotservices.hooks.gateway.IGatewayInterceptor;
import com.sap.iotservices.hooks.gateway.IoTServicesPointcut;
import com.sap.iotservices.network.node.data.Value;
import com.sap.iotservices.network.node.data.WSNParsedMeasure;

public class InterceptorImpl
implements IGatewayInterceptor {

	private static final Logger log = LoggerFactory.getLogger(InterceptorImpl.class);

	List<Double> intervalValues = new ArrayList<>();

	private static ObjectMapper mapper = new ObjectMapper();

	@Override
	public void processObject(String pointcutName, Object... args)
			throws Exception {
		// check the messagebus is running
		if (!InterceptorActivator.isZmq()) {
			InterceptorActivator.initSocket();
		}
		try {
			IoTServicesPointcut pointcut = IoTServicesPointcut.valueOf(pointcutName);
			switch (pointcut) {
			case GATEWAY_PARSED_DATA_DISPATCH:
				// triggered upon dispatch of parsed sensor data
				manageIncomingMeasures(args);
				break;
			default:
				break;
			}
		} catch (Exception e) {
			log.error(e.getMessage());
		}
	}

	@Override
	public List<String> getBranchPoints() {
		List<String> list = new ArrayList<>();
		list.add(IoTServicesPointcut.GATEWAY_PARSED_DATA_DISPATCH.name());
		return list;
	}

	@Override
	public void onError(Object event, String pointcut, Exception e) {
		log.info("OnError triggered; pointcut is {}", pointcut);
	}

	// Go through measure list
	private void manageIncomingMeasures(Object... args) {
		@SuppressWarnings("unchecked")
		List<WSNParsedMeasure> measures = (List<WSNParsedMeasure>) args[0];
		// list of measures that are going to be dropped
		List<WSNParsedMeasure> toBeRemoved = new ArrayList<>();

		if (measures != null) {
			for (WSNParsedMeasure wsnParsedMeasure : measures) {

				List<Value<?>> valueList = wsnParsedMeasure.getValues();

				log.info("Measurements received for CapabilityID: {}", wsnParsedMeasure.getCapabilityAlternateId());
				manageIncomingValues(wsnParsedMeasure, valueList, toBeRemoved);
			}

			for (WSNParsedMeasure wsnParsedMeasure : toBeRemoved) {
				measures.remove(wsnParsedMeasure);
				log.info("Measure for Capability {}, Device {} and Sensor {} will be dropped",
						wsnParsedMeasure.getCapabilityAlternateId(), wsnParsedMeasure.getDeviceAlternateId(),
						wsnParsedMeasure.getSensorAlternateId());
			}
		}
	}

	// Go through values for measure
	@SuppressWarnings("unchecked")
	private void manageIncomingValues(WSNParsedMeasure wsnParsedMeasure, List<Value<?>> valueList,
			List<WSNParsedMeasure> toBeRemoved) {
		if(wsnParsedMeasure.getCapabilityAlternateId()
				.equals(CustomParameters.TARGET_CAPABILITY_ALTERNATEID_COLOR) && 
				valueList.size() == CustomParameters.Colors.values().length) {
			// create a map to be serialized for the predictive module
			Map<String,Object> map = new HashMap<String, Object>();
			map.put(valueList.get(0).getMeasureName(), valueList.get(0).getInnerMeasure());
			map.put(valueList.get(1).getMeasureName(), valueList.get(1).getInnerMeasure());
			map.put(valueList.get(2).getMeasureName(), valueList.get(2).getInnerMeasure());
			String measurement;
			try {
				// serialize the map 
				measurement = mapper.writeValueAsString(map);
				log.info("json = {}", measurement);
			} catch (JsonProcessingException e) {
				log.error(e.getMessage(), e);
				return;
			}
			// Send measurement
			InterceptorActivator.send(measurement);
			String reply = InterceptorActivator.receive();
			Map<String, ?> prediction;
			try {
				//read the predicted value into a map
				prediction = mapper.readValue(reply, Map.class);
			} catch (IOException e) {
				log.error(e.getMessage(), e);
				return;
			}
			// send the prediction in the data ingestion pipeline
			sendPrediction(wsnParsedMeasure, prediction);
		}

	}

	public void sendPrediction(WSNParsedMeasure wsnParsedMeasure, Map<String, ?> rgbPrediction) {

		// send Measure
		IGatewayTopology topologyService = InterceptorActivator.getTopologyService();
		String deviceAlternateId = wsnParsedMeasure.getDeviceAlternateId();
		String sensorAlternateId = wsnParsedMeasure.getSensorAlternateId();
		int sensorTypeAlternateId = wsnParsedMeasure.getSensorTypeAlternateId();
		String capabilityAlternateId = CustomParameters.TARGET_CAPABILITY_ALTERNATEID_PREDICTION;

		if (topologyService != null) {
			// Device should be present
			Device device = topologyService.getDevice(deviceAlternateId);
			if (device == null) {
				log.warn("No device: {}", deviceAlternateId);
				return;
			}
			// Sensor should be present
			Sensor sensor = topologyService.getSensor(deviceAlternateId, sensorAlternateId);
			if (sensor == null) {
				log.warn("No sensor: {}", sensorAlternateId);
				return;
			}

			List<WSNParsedMeasure> measureList = new ArrayList<>();

			// Add predicted label
			Value<String> label = new Value<>();
			label.setDataType(WSNdataType.ASCIIStr);
			label.setInnerMeasure((String) rgbPrediction.get(CustomParameters.Prediction.label.toString()));
			label.setMeasureName(CustomParameters.Prediction.label.name());
			label.setMeasureUnit("");

			// Add first neighbor
			Value<Float> neighbor1 = new Value<>();
			neighbor1.setDataType(WSNdataType.Single_prec);
			neighbor1.setInnerMeasure(((Double)rgbPrediction.get(CustomParameters.Prediction.neighbor1.toString())).floatValue());
			neighbor1.setMeasureName(CustomParameters.Prediction.neighbor1.name());
			neighbor1.setMeasureUnit("");

			// Add second neighbor
			Value<Float> neighbor2 = new Value<>();
			neighbor2.setDataType(WSNdataType.Single_prec);
			neighbor2.setInnerMeasure(((Double)rgbPrediction.get(CustomParameters.Prediction.neighbor2.toString())).floatValue());
			neighbor2.setMeasureName(CustomParameters.Prediction.neighbor2.name());
			neighbor2.setMeasureUnit("");

			// Add three neighbor
			Value<Float> neighbor3 = new Value<>();
			neighbor3.setDataType(WSNdataType.Single_prec);
			neighbor3.setInnerMeasure(((Double)rgbPrediction.get(CustomParameters.Prediction.neighbor3.toString())).floatValue());
			neighbor3.setMeasureName(CustomParameters.Prediction.neighbor3.name());
			neighbor3.setMeasureUnit("");

			List<Value<?>> values = new ArrayList<>();
			values.add(label);
			values.add(neighbor1);
			values.add(neighbor2);
			values.add(neighbor3);

			// create the measure for the predicted value
			WSNParsedMeasure prediction = new WSNParsedMeasure(deviceAlternateId, sensorAlternateId,
					sensorTypeAlternateId, capabilityAlternateId, values, new Date());

			measureList.add(prediction);

			IDataManager dataManager = InterceptorActivator.getDataManagerService();
			dataManager.sendMeasures(device, measureList);


		}
	}
}
  • CustomParameters.java
package com.sap.iotservices.gateway.interceptor.custom;

/**
 * It is possible to customize all the values present in this class to customize this interceptor example
 * 
 */
public class CustomParameters {

	public static enum Colors{
		R,
		G,
		B
	}
	public static enum Prediction{
		label,
		neighbor1,
		neighbor2,
		neighbor3;

		@Override
		public String toString() {
			//map propety name with the output of the prediction. The name() is equals to the propertyname and the python has the same output of the toString()
			switch(this) {
			case neighbor1: return "neighbor(1)";
			case neighbor2: return "neighbor(2)";
			case neighbor3: return "neighbor(3)";
			default:
				break;
			}
			return super.toString();
		}
	}

	// ****** Constants related to INTERCEPTORIMPL
	// - Target Device
	public static final String ZMQ_CONNECTION = "127.0.0.1:5555";
	public static final String PYTHON_PATH = "python";
	public static final String PYTHON_SCRIPT_PATH = "./knn.py";

	// - Target Sensor
	// public static final String TARGET_SENSOR_ALTERNATEID = "1";

	// - Target Capabilities
	public static final String TARGET_CAPABILITY_ALTERNATEID_COLOR = "color";
	public static final String TARGET_CAPABILITY_ALTERNATEID_PREDICTION = "color prediction";


}
  • MANIFEST.MF
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: predictive model
Bundle-SymbolicName: predictive model
Bundle-Version: 4.0.0.SNAPSHOT
Bundle-Vendor: XYZ Company
Export-Package: com.sap.iotservices.gateway.interceptor;uses:="com.sap.iotservices.hooks.gateway,com.sap.iotservices.xmlparse"
Require-Bundle: org.eclipse.osgi.services,
 com.sap.iotservices.common.basic,
 com.sap.iotservices.common.interface,
 com.sap.iotservices.gateway.topology-interface,
 com.sap.iotservices.gateway.topology-service,
 com.sap.iotservices.gateway.device-manager-interface,
 com.sap.iotservices.gateway.properties-interface,
 com.sap.iotservices.gateway.eventsender-interface,
 com.sap.iotservices.gateway.data-manager-interface,
 org.zeromq.jeromq,
 com.fasterxml.jackson.core.jackson-databind,
 com.fasterxml.jackson.core.jackson-core
Bnd-LastModified: 1481173207479
Require-Capability: osgi.service;filter:="(objectClass=com.sap.iotservices.hooks.gateway.IGatewayInterceptorService)";effective:=active;cardinality:=multiple,osgi.service;filter:="(objectClass=com.sap.iotservices.xmlparse.IJMSFrameParser)";effective:=active;cardinality:=multiple,osgi.ee;filter:="(&(osgi.ee=JavaSE)(version=1.8))"
Build-Jdk: 1.8.0_92
Service-Component: OSGI-INF/com.sap.iotservices.gateway.interceptor.InterceptorActivator.xml
Bundle-ActivationPolicy: lazy
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
Import-Package: com.sap.iotservices.commandmanagement.interfaces,
 com.sap.iotservices.commandmanagement.interfaces.bean,
 com.sap.iotservices.gateway.response.manager.beans,
 org.slf4j
Bundle-ClassPath: third-parties-libs/,
 .
  • com.sap.iotservices.gateway.interceptor.InterceptorActivator.xml is untouched

Let’s have a look at the implementation in the InterceptorImpl class.

Every measurement received within the interceptor we are going to invoke the manageIncomingValues method.

This method is building a key-value map with the measurement and serialize it to the Python script within our ZMQ bus.

Like in the other example the Python code fetch the measurement from the message bus, calculate the prediction and send the predicted data back on the bus.

The InterceptorImpl class is going to receive these values with the InterceptorActivator.receive() command in a JSON format; after the conversion we are ready to send the data back to SAP Cloud Platform Internet of Things.

Here you can also implement your custom algorithms, such as the creation of an overall index to validate within an SAP Edge Services Streaming Service Rule if the prediction is valid or not.

This implementation has not been currently implemented in this sample code but you can easily copy and adapt it for the other Python example in this blog post.

To improve the performances I have also created a method sendPrediction that is going to send the measurements for the predicted measures directly into the Edge Platform by using the native and integrated data ingestion pipeline, without using any (external) HTTP client that can really increase the latency.

Build the interceptor and Deploy it to your Edge Platform by using the contextual menu

During the deployment, you have been asked to the Server URL that should be provided in the form

https://<HOSTNAME>:443/<INSTANCE ID>/iot/core/api/v1/tenant/<TENANT ID>

You are also asked for the Gateway ID (and not the alternate Id), you can easily get it from your SAP Cloud Platform Internet of Things Cockpit.

This is the expected output of the Eclipse console

[31/10/19 14.30] Deployment successful; Filename=predictive model.jar
[31/10/19 15.19] Build was successful.
[31/10/19 15.19] 
Sending 'POST' request to URL : https://HOSTNAME:443/INSTANCE ID/iot/core/api/v1/tenant/TENANT ID/gateways/GATEWAY ID/bundles

 

Conclusions

In this blog post, I’ve explained two alternatives to implement a predictive analytics scenario at the edge by using an external implementation of the algorithm with Python.

You should have also some references on how to replace Python with your favorite language such as R.

This example is using and integrating the features of SAP Cloud Platform Internet of Things SDK, SAP Edge Services, and in particular Persistence Service, Streaming Service, and Essential Business Functions.

Be the first to leave a comment
You must be Logged on to comment or reply to a post.