Skip to Content
Technical Articles

Implement Predictive Analytics at the EDGE

The rising of the IoT has connected to the cloud lots of devices and made available a huge amount of data.

The next-gen scenario, when we are talking about IoT, is the possibility to make analysis and prediction over the big amount of data that we are receiving through our IoT enabled devices.

The prediction, in the SAP ecosystem, is possible both in the cloud and at the Edge.

In some particular environments, such as low connectivity scenarios or real-time scenarios, it became really important and useful to have the possibility to run the predictive algorithm at the edge.

You can natively create such a scenario by leveraging the SAP Cloud Platform Internet of Things and Edge Services capabilities.

In the following article, I’m going to explain how you can physically implement a realistic industrial scenario that makes a prediction at the edge.

0. Story

Our story tells about Paint Ink, a company that carries out a professional painting service for several industries. They have for example orders from several automotive industries.

Even if the technologies permit to replicate the color used to paint an object, based on a color code, the company used for this example would like to have a high-quality level of their industrial process and objects.

The validation consists of the analysis of the painting, piece by piece. This process is composed of a conveyor belt with a sensor, like a camera, that reads the color components (RGB) of the pieces that are in the belt.

The painting of each piece is not completely uniform, and the color could change in the different areas of it due, for example, to the nature of the paint or to the form of the object.

They have already implemented a code to validate the analysis of the painting, based on the KNN (K-Nearest Neighbor) algorithm, that analyzes in several points of the object and establishes if the object has an enough high painting quality before proceeding with the packaging of it. In case the quality is not enough high the piece is automatically discarded.

As you can imagine, in this scenario it’s important to have a real-time analysis of the colors in order to speed up all the post-production tasks with no blocks on the belt.

They have decided to implement the analysis at the edge to satisfy this goal.

1. Implementation pre-requirements

The training of the base KNN algorithm has been done in the cloud within SAP Data Intelligence.

Inside my tenant of SAP Data Intelligence, I have used Jupyter to implement in a Notebook a python algorithm that trains a KNN model with a big amount of already categorized data that saves the output in a file as a PMML model.

It could also expose an endpoint to recompute the model to improve the recognition with additional trained data.

2. Edge Implementation schema

To implement the predictive analysis at the edge we have inherited the features exposed by SAP Cloud Platform Internet of Things Edge Platform with the creation of a new OSGi Bundle that runs in parallel into our Edge Platform and by leveraging the features of SAP Edge Services Persistence Service.

 

The implemented module imports into the OSGi environment the libraries required to use in Java the PMML model, such as the JPMML library.

In this scenario, we need to analyze several measures for our painted object, that represents the RGB color in different areas of it. Due to improving the system performances, since the required time to complete the acquisition of the measurements is around 10 seconds, we have decided to get all the required measurements in a single bunch.

It’s very easy and native with the module of SAP Edge Services Persistence Service that is installed on the top of the Edge Platform.

Persistence Service stores all the measurements into a local SQLAnywhere database and permits to fetch them within some REST/Java APIs or within a custom SQL query.

All the data is passed to the evaluator module to get a KNN prediction, based on the input values.

Each single analyzed point contributes as a result of the analysis to build a prediction index that passed on certain policies to validate the overall quality of the painting.

Both the punctual prediction and the overall index are streamed to SAP Cloud Platform Internet of Things, to permits further analysis, to improve the KNN model, and to permits to apply rules on the top of the streamed values by using the capabilities of SAP Edge Services Streaming Service.

You can easily extend this scenario to the ones described in this other blog post. In the setup of the rule, you have to apply it, for example to the prediction index, instead of the temperature used for that example.

This example is using JPMM but you can easily replace this library with your own preferred library or custom implementation.

3. Implementation

As the first operation, create the device model inside your tenant of SAP Cloud Platform Internet of Things. If it’s possible, in order to have an immediate mapping between the model and the input used for your KNN pmml model file, I suggest using the same property name specified as a label for the input parameters.

For this example, we have created into SAP Cloud Platform Internet of Things one sensor type with inside 3 capabilities.

  • color: this capability permits to receive as measurement the RGB values captured by RGB sensor
  • color prediction: contains the label of the computed prediction (point by point) with the 3 closest neighbors that are used to compute the affinity index
  • validity color score: it’s one per object analyzed (1 measurement each 10 seconds), and give a proximity index based on all the measurement received in the last 10 seconds. It permits us to identify if the painting has enough high quality.

4. Deploy Persistence Service

Open Edge Services Policy Service and click the tile Edge Services Management tile, and in the list search and click on your gateway.

Now you can deploy Persistence Service into your gateway, to permit the automatic storage of the measurements.

Click the tab named Services and then the plus button to deploy a new service into your SAP Edge Platform.

In the next window select Persistence Service in the dropdown and Save to start the deployment. When the deployment of the service has been completed the status of the service changes from Requested to Installed. You can also configure the user management and the User Identity Provider for this service accordingly with certain invocation rights policies.

Now you can also deploy Streaming Service and Essential Business Functions and their configuration accordingly with this blog post. In this example, I’ve used the validity color score capability into the streaming rule and, if it’s over a certain threshold (e.g. 100), an SAP Field Service Management Service Call will be created.

5. Create the pure Java Service

Do you mind to use a Python implementation? Let’s have a look at this alternative Python implementation

 

Now we can proceed with the custom logic that implements the Predictive Analytics Service.

I have leveraged the SAP official example https://github.com/SAP/iot-edge-services-samples/tree/master/persistence-aggregation-max-temp to implement the algorithm to read the data within Persistence Service.

The persistence java client classes are very similar in the implementation, so you can start from it as well in case you would like to reproduce the same example.

I have been made some changes into the Engine.java class, to invoke the main loop that makes the prediction with a ScheduledExecutorService.

package com.sap.iot.edgeservices.persistence.sample;

import java.util.concurrent.*;

/**
 * The Engine will continuously run the calculation 
 */
public class Engine extends Thread {
	
	////////////////////
	// class fields
	////////////////////
	
	private Calculation calculation;						// the parent interface to the calculation class
	private boolean bStop = false;							// flag that determines if the engine should continue
	private ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
	private ScheduledFuture<?> thread;

	////////////////////
	// Constructors
	////////////////////
	
	/**
	 * Ctor for the engine.  
	 * @param calculation
	 */
	public Engine( Calculation calculation ) {
		PersistenceSampleActivator.printlnDebug("Engine:ctor - called");
		this.calculation = calculation;
		initialize();
	}
	
	////////////////////
	// Public methods
	////////////////////
	
	public void run() {
		PersistenceSampleActivator.printlnDebug("Engine:run - called");
        try {
			thread = threadPool.scheduleAtFixedRate(calculation,0, 10000, TimeUnit.MILLISECONDS);

		} catch (Exception e) {
            PersistenceSampleActivator.println("ERROR: Engine: Problem executing the calculation: ");
            e.printStackTrace();
        }
	}
	
	
	public void stopGracefully() {
		PersistenceSampleActivator.println("Engine:stopGracefully - called");
		thread.cancel(true);
	}
	
	////////////////////
	// private methods
	////////////////////
	
	private void initialize() {
		PersistenceSampleActivator.printlnDebug("Engine:initialize - called");
	}
}

 

The required logic to implement the scenario is inside the classes defined in the custom package:

  • QueryData.java is a refactoring and an extension of all the queries which are executed from the SQLAnywhere database within the Persistence java client.

I have modified the method to get the data because differently from the persistence aggregation example, we are getting e properties instead of one. Here is the content:

package com.sap.iot.edgeservices.persistence.sample.custom;

import com.sap.iot.edgeservices.persistence.sample.PersistenceException;
import com.sap.iot.edgeservices.persistence.sample.PersistenceSampleActivator;
import com.sap.iot.edgeservices.persistence.sample.db.PersistenceClient;
import com.sap.iot.edgeservices.persistence.sample.db.Table;
import com.sap.iot.edgeservices.persistenceservice.model.PSStatementObject;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class QueryData {
    private static Boolean CLOUD_EDGE_SERVICES;
    private PersistenceClient persistenceClient;
    private List<String> properties = null;

    QueryData(PersistenceClient persistenceClient, Boolean cloudEdgeServices){
        this.persistenceClient = persistenceClient;
        CLOUD_EDGE_SERVICES = cloudEdgeServices;
    }

    String resetQueryTime() {
        // reset the query time for the next query
        String mostRecentQueryTime = null;
        try {
            mostRecentQueryTime = persistenceClient.getFirstRowFirstColumn(persistenceClient.executeQuery("SELECT NOW()"));
            PersistenceSampleActivator.printlnDebug("new date is: " + mostRecentQueryTime );
        } catch (PersistenceException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        return mostRecentQueryTime;
    }
    
    // SQL used to select the data
    private String getSqlForMetadata( String profileId, String objectId) {

        // NOTE: only top 1000 records are returned.  If more data is expected, then
        // query should be changed to use database aggregation instead of Java
        String sql = "SELECT top 1000 m.PROP_ID, "
                + "          m.PROP_SEQ,  "
                + "          m.TYPE_ID "
                + "     FROM EFPS.MEASURE_TYPE_PROPERTY m "
                + "    WHERE m.OBJECT_ID = '" + objectId + "'";
        // Add PROFILE_ID only in case of OP Edition
        if(!CLOUD_EDGE_SERVICES){
            sql += "      AND m.PROFILE_ID = '" + profileId + "'";
        }
        sql += " ORDER BY m.PROP_SEQ ASC";


        PersistenceSampleActivator.printlnDebug("getSqlForMetadata: ============");
        PersistenceSampleActivator.printlnDebug(sql);

        return sql;
    }

    // SQL used to select the data
    String getSqlForMeasureValues(String profileId, String objectId, String sinceDate) {

        // NOTE: only top 1000 records are returned.  If more data is expected, then
        // query should be changed to use database aggregation instead of Java
        String sql = "SELECT top 1000 m.DEVICE_ADDRESS, "
                + "          CAST(m.MEASURE_VALUE AS VARCHAR(32)) MEASURE_VALUE,  "
                + "          m.DATE_RECEIVED "
                + "     FROM EFPS.MEASURE m "
                + "    WHERE m.OBJECT_ID = '" + objectId + "'"
                + "      AND m.DATE_RECEIVED > '" + sinceDate + "'";
        // Add PROFILE_ID only in case of OP Edition
        if(!CLOUD_EDGE_SERVICES){
            sql += "      AND m.PROFILE_ID = '" + profileId + "'";
        }
        sql += " ORDER BY m.DATE_RECEIVED DESC";


        PersistenceSampleActivator.printlnDebug("getSqlForMeasureValues: ============");
        PersistenceSampleActivator.printlnDebug(sql);

        return sql;
    }

    // create a table to store the results (if you want... ) not implemented in this sample
    private void createMaxTemperatureTable() {
        PersistenceSampleActivator.printlnDebug("CalculateMaxTemperature:createMaxTemperatureTable - called");

        String sql = "CREATE TABLE MaxTemperature ( "
                + "              id INT PRIMARY KEY DEFAULT AUTOINCREMENT, "
                + "          device VARCHAR(32), "
                + "         maxTemp FLOAT NOT NULL, "
                + "        windowMS INTEGER NOT NULL, "
                + "        calcTime DATE DEFAULT CURRENT UTC TIMESTAMP); ";
        Table table = new Table(persistenceClient);
        try {
            // if table already exists, it will not modify it
            table.createIfNeeded(sql, "MaxTemperature");
            PersistenceSampleActivator.println("CalculateMaxTemperature:createMaxTemperatureTable - success");
        } catch (PersistenceException e) {
            // TODO Auto-generated catch block
            PersistenceSampleActivator.println("ERROR: CalculateMaxTemperature:createMaxTemperatureTable - failed");
            //state = Calculation.State.ERROR;
            e.printStackTrace();
        }
    }


    // convert the result set into an array of Doubles
    Map<String, ArrayList<Map<String, Float>>> getValuesAsFloatMapsByDevice(PSStatementObject statementObject, String sensorTypeAlternateId, String capabilityAlternateId) {
        properties = this.getMetadata(sensorTypeAlternateId, capabilityAlternateId);
        //ArrayList<Double> values = new ArrayList<>();
        Map<String, ArrayList<Map<String, Float>>> valuesByDevice = new HashMap<>();

        PersistenceSampleActivator.printlnDebug("getValuesAsDoublesByDevice start-------------");
        if (statementObject.hasResultList()) {
            statementObject.getResultList().forEach((row) -> {
                String device = row.get(0).getValue().toString();
                PersistenceSampleActivator.printlnDebug("device = " + device);
                ArrayList<Map<String, Float>> valueMap = valuesByDevice.get(device);
                if ( valueMap == null ) {
                    valueMap = new ArrayList<>();
                    valuesByDevice.put(device, valueMap);
                }
                String values = row.get(1).getValue().toString();
                PersistenceSampleActivator.printlnDebug("value = " + values);
                PersistenceSampleActivator.printlnDebug(device + ":" + values);

                String date = row.get(2).getValue().toString();
                PersistenceSampleActivator.printlnDebug("date = " + date);

                Map<String,Float> mapValues = new HashMap<>();
                try {
                    //Split into properties
                    String[] valuesArray = values.split(" ");
                    for (int i = 0; i < valuesArray.length; i++){
                        String prop = properties.get(i);
                        Float f = Float.valueOf(valuesArray[i]);
                        mapValues.put(prop, f);
                    }
                    valueMap.add(mapValues);
                    PersistenceSampleActivator.printlnDebug("value added");

                } catch (NumberFormatException nfe) {
                    StringBuffer errMsg = new StringBuffer("Error: not a number: ");
                    row.forEach((column) -> {
                        errMsg.append(column.getValue() + "(" + column.getColumnName() + ":" + column.getMetadata() + "), ");
                    });
                    PersistenceSampleActivator.printlnDebug(valueMap.toString());
                }
            });
        } else {
            PersistenceSampleActivator.printlnDebug(" ResultSet is empty");
        }

        PersistenceSampleActivator.printlnDebug("getValuesAsDoublesByDevice end---------------");
        return valuesByDevice;
    }

    // convert the result set into an array of Doubles
    private List<String> getMetadataFromResultset(PSStatementObject statementObject) {
        //ArrayList<Double> values = new ArrayList<>();
        List<String> metadata = new ArrayList<>();

        PersistenceSampleActivator.printlnDebug("getMetadataFromResultset start-------------");
        if (statementObject.hasResultList()) {
            statementObject.getResultList().forEach((row) -> {
                String type = row.get(0).getValue().toString();
                metadata.add(type);
                PersistenceSampleActivator.printlnDebug("type = " + type);
            });
        } else {
            PersistenceSampleActivator.printlnDebug(" ResultSet is empty");
        }

        PersistenceSampleActivator.printlnDebug("getMetadataFromResultset end---------------");
        return metadata;
    }

    List<String> getMetadata(String sensorTypeAlternateId, String capabilityAlternateId) {

        PSStatementObject stmt;
        String sql = getSqlForMetadata(sensorTypeAlternateId,
                capabilityAlternateId);

        List<String> types = new ArrayList<>();
        try {
            stmt = persistenceClient.executeQuery(sql);
            types = getMetadataFromResultset(stmt);
        } catch (PersistenceException pe) {
            pe.printStackTrace();
        } catch ( Exception e ) {
            e.printStackTrace();
        }
        return types;
    }
}
  • DataStreamer class contains just the methods to stream the results to SAP Cloud Platform Internet of Things based on a simple HTTP client
  • The core of the logic is inside our PredictValues class.

First, at the startup of the bundle, the initialize method is invoked, you need to import the saved PMML model with your java library. I’m loading it within the classloader, since the PMML model is saved as a resource of the project. You can make the load of the model dynamically by changing the implementation in this method.

After that, we are loading it into a KNN evaluator implementation. This is the object that I’m going to use to make the prediction over the streamed data.

Now let’s have a look at the run method that is automatically scheduled every 10 seconds (the time required to analyze each single painted object).

This method is going to fetch the data within persistence client in a 10 seconds time window. In case there are some data, for each sample, we are going to evaluate within the KNN evaluator if the data is near enough to the values used to train the model.

It will return the distance from the top 3 neighbors and the prediction with the label that contains the predicted value.

Then we are going to build an index that came from a mathematical formula that makes an aggregation of the calculated prediction for all the existing samples.

Finally, we can stream the punctual prediction and the calculated index (that represents the overall prediction) back to SAP Cloud Platform Internet of Things.

This operation permits to make addition computation to the predicted values such as send them back to a 3rd party system in the cloud, or, like in this scenario, with a Rule computed within Streaming Service.

package com.sap.iot.edgeservices.persistence.sample.custom;

import com.sap.iot.edgeservices.persistence.sample.Calculation;
import com.sap.iot.edgeservices.persistence.sample.PersistenceException;
import com.sap.iot.edgeservices.persistence.sample.PersistenceSampleActivator;
import com.sap.iot.edgeservices.persistence.sample.db.PersistenceClient;
import com.sap.iot.edgeservices.persistenceservice.model.PSStatementObject;
import org.dmg.pmml.FieldName;
import org.dmg.pmml.PMML;
import org.jpmml.evaluator.*;
import org.jpmml.evaluator.nearest_neighbor.NearestNeighborModelEvaluator;
import org.jpmml.model.PMMLUtil;

import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

public class PredictValues extends Calculation {

	////////////////////
	// Static fields
	////////////////////

	// this needs to be static since the Engine will be asking for this before it has instantiated it
	private static String logLevel = "INFO";

	private static Boolean CLOUD_EDGE_SERVICES = true;  //SET TO false for ON-PREMISE

	private static String IOTS_MEASURES_URL = "http://localhost:8699/measures/";	// destination to send calculated values
	private static String TRANSFORMED_SENSOR_TYPE_ALTERNATE_ID = "255";			// sensorTypeAlternateId to send calculated values
	private static String TRANSFORMED_CAPABILITY_ALTERNATE_ID = "color prediction";		// capabilityAlternateId to send calculated values
	private static String RESULT_CAPABILITY_ALTERNATE_ID = "validity color score";		// capabilityAlternateId to send calculated values
	private static String TRANSFORMED_SENSOR_ALTERNATE_ID = "color sensor";			// sensorAlternateId to send calculated values
	private static final float PLANT_SCALE_OUT_OF_RANGE = 1.25f;                            // scale factor for the computation of the index
	private static final float PLANT_LIMIT_OUT_OF_RANGE = 100f;                             // upper threshold to identify an outlayer

	////////////////////
	// Class fields
	////////////////////

	private int calculationFrequencyMS;	// frequency that the engine will calculate
	private String sensorTypeAlternateId;	// the sensorTypeAltnerateId  that the engine will calculate on 
	private String capabilityAlternateId = "color";	// the capabilityAlternateId that the engine will calculate on

	private QueryData queryData;
	private String mostRecentQueryTime = null;
	private Evaluator evaluator;
	private PMML pmml;
	
	
	////////////////////
	// constructors
	////////////////////

	@Override
	public void stopGracefully() {
	}

	public PredictValues(PersistenceClient persistenceClient ) {
		super(persistenceClient);

		PersistenceSampleActivator.printlnDebug("PredictValues:ctor - called");

		// properties that control how the Calculation will be done
		calculationFrequencyMS = 10000;	// 10 seconds

		if (CLOUD_EDGE_SERVICES) {
			// cloud edition
			sensorTypeAlternateId = "*"; 	// only for this sensorType
		} else {
			// on-premise edition
			sensorTypeAlternateId = "color"; 	// only for this Sensor Profile
		}

		mostRecentQueryTime = queryData.resetQueryTime();
	}

	////////////////////
	// public methods
	////////////////////

	public void run() {
		PersistenceSampleActivator.printlnDebug("PredictValues:run - called");

		// ensure we have initialized and in a good state
		if ( state != State.RUNNING ) {
			PersistenceSampleActivator.println("Warning: NOT RUNNING: PredictValues.state = " + state );
			return;
		}

		// determine if any configuration changes have been sent
		updateConfigurations();

		// get the data and create a primitive array
		Map<String, ArrayList<Map<String, Float>>> valuesByDevice = getSampleData();

		// for each device that sent in a value, send out the max
		valuesByDevice.forEach((device, values) -> {
			PersistenceSampleActivator.println( "======================== Calculating prediction" );

			Float validPrediction = null;

			for (Map<String, Float> rgbmap : values ) {

				Map<String, ?> prediction = EvaluateModel(evaluator, rgbmap);
				validPrediction = checkPrediction(prediction, validPrediction, evaluator.getOutputFields());
				if (CLOUD_EDGE_SERVICES) {
					// send the results back into IOT Service engine as a different capability
					// FOR ON-PREMISE EDGE SERVICES, comment out this line
					DataStreamer.streamResults( IOTS_MEASURES_URL,
							device,
							TRANSFORMED_SENSOR_TYPE_ALTERNATE_ID,
							TRANSFORMED_CAPABILITY_ALTERNATE_ID,
							TRANSFORMED_SENSOR_ALTERNATE_ID,
							prediction);
				}
			}

			if (CLOUD_EDGE_SERVICES) {
				// send the results back into IOT Service engine as a different capability
				// FOR ON-PREMISE EDGE SERVICES, comment out this line
				DataStreamer.streamResult( IOTS_MEASURES_URL,
						device,
						TRANSFORMED_SENSOR_TYPE_ALTERNATE_ID,
						RESULT_CAPABILITY_ALTERNATE_ID,
						TRANSFORMED_SENSOR_ALTERNATE_ID,
						validPrediction);
			}
		});
	}

	private Float checkPrediction(Map<String, ?> prediction, Float validPrediction, List<OutputField> outputFields) {
		AtomicReference<Float> distance = new AtomicReference<>(0f);
		outputFields.forEach(field -> {
			try{
				float val = Float.parseFloat(String.valueOf(prediction.get(field.getName().getValue())));
				if(val > PLANT_LIMIT_OUT_OF_RANGE){
					val *= PLANT_SCALE_OUT_OF_RANGE;
				}
				float finalVal = val;
				distance.updateAndGet(v -> v + finalVal);
			}
			catch (Exception e){
				distance.updateAndGet(v -> v + 2 * outputFields.size() * PLANT_LIMIT_OUT_OF_RANGE);
			}
		});
		if (validPrediction == null){
			return distance.get() / outputFields.size();
		}
		//prediction not in range
		if ((distance.get() / outputFields.size()) > PLANT_LIMIT_OUT_OF_RANGE || validPrediction > PLANT_LIMIT_OUT_OF_RANGE){
			return (distance.get() / outputFields.size()) > validPrediction ? (distance.get() / outputFields.size()) : validPrediction;
		}

		return ((distance.get() / outputFields.size()) + validPrediction) / 2;
	}

	// this is made available to the engine
	@Override
	public int getPollingFreqencyMS() {
		// return the calculation delay
		return calculationFrequencyMS;
	}

	@Override
	public String getLogLevel() {
		return logLevel;
	}

	/**
	 * Load a PMML model from the file system.
	 *
	 * @param file
	 * @return PMML
	 * @throws Exception
	 */

	
	private static PMML loadModel(final InputStream file) throws Exception {

		PMML pmml = null;

		try( InputStream in = file){
			pmml = PMMLUtil.unmarshal(in);

		} catch( Exception e) {
			throw e;
		}
		return pmml;
	}

	private static Evaluator EvaluatorModel(PMML pmml) {
		Evaluator evaluator = new NearestNeighborModelEvaluator(pmml);
		// Perforing the self-check
		evaluator.verify();
		// Printing input (x1, x2, .., xn) fields
		List<? extends InputField> inputFields = evaluator.getInputFields();
		System.out.println("Input fields: " + inputFields);

		// Printing primary result (y) field(s)
		List<? extends TargetField> targetFields = evaluator.getTargetFields();
		System.out.println("Target field(s): " + targetFields);

		// Printing secondary result (eg. probability(y), decision(y)) fields
		List<? extends OutputField> outputFields = evaluator.getOutputFields();
		System.out.println("Output fields: " + outputFields);

		return evaluator;
	}

	private static Map<String, ?> EvaluateModel(Evaluator evaluator, Map<String, ?> inputRecord) {
		// Get the list of required feature set model needs to predict.
		List<? extends InputField> inputFields = evaluator.getInputFields();
		List<? extends OutputField> outputFields = evaluator.getOutputFields();
		List<TargetField> target = evaluator.getTargetFields();

		if(inputRecord == null){
			return null;
		}

		Map<FieldName, FieldValue> arguments = new LinkedHashMap<>();

		// Mapping the record field-by-field from data source schema to PMML schema
		for(InputField inputField : inputFields){
			FieldName inputName = inputField.getName();

			Object rawValue = inputRecord.get(inputName.getValue());

			// Transforming an arbitrary user-supplied value to a known-good PMML value
			FieldValue inputValue = inputField.prepare(rawValue);

			arguments.put(inputName, inputValue);
		}
		Map<FieldName, ?> results = null;
		try {
			// Evaluating the model with known-good arguments
			results = evaluator.evaluate(arguments);
		}
		catch (Exception e){
			PersistenceSampleActivator.println(e.getMessage());
			e.printStackTrace();
		}

		// Decoupling results from the JPMML-Evaluator runtime environment
		Map<String, ?> resultRecord = EvaluatorUtil.decodeAll(results);

		FieldName targetField = target.get(0).getFieldName();
		ValueMap distances = ((Classification) results.get(targetField)).getValues();

		//put the distance instead of the index
		Map<String,Object> tmpMap = new HashMap<>(resultRecord);
		for(OutputField outputField : outputFields){
			FieldName outputName = outputField.getName();

			Object rawValue = resultRecord.get(outputName.getValue());
			String i;
			try{
				i = (String) rawValue;
			}
			catch(Exception e){
				e.printStackTrace();
				continue;
			}
			tmpMap.put(outputName.getValue(), distances.get(i));
		}
		return tmpMap;
	}

	////////////////////
	// private methods
	////////////////////

	// this is called by the super class, Calculation
	protected void initialize() {
		//init auxiliary classes
		queryData = new QueryData(persistenceClient,CLOUD_EDGE_SERVICES);
		//Load the model
		try {
			this.pmml = loadModel(getClass().getClassLoader().getResourceAsStream("knn-color-model.pmml"));
		} catch (Exception e) {
			e.printStackTrace();
		}
		this.evaluator = EvaluatorModel(pmml);
		PersistenceSampleActivator.printlnDebug("PredictValues:initialize - called");
		// if you want to store the result to a custom table, create a table to store them.
		this.state = started ? State.RUNNING : State.ERROR;
	}

	// selects the sample data from the persistence database which is constantly being updated
	private Map<String, ArrayList<Map<String, Float>>> getSampleData() {

		PSStatementObject stmt;
		String sql = queryData.getSqlForMeasureValues( sensorTypeAlternateId,
				capabilityAlternateId,
				mostRecentQueryTime );

		mostRecentQueryTime = queryData.resetQueryTime();

		Map<String, ArrayList<Map<String, Float>>> valuesByDevice = new HashMap<>();
		try {
			stmt = persistenceClient.executeQuery(sql);
			valuesByDevice = queryData.getValuesAsFloatMapsByDevice(stmt, sensorTypeAlternateId, capabilityAlternateId);
		} catch (PersistenceException pe) {
			// TODO Auto-generated catch block
			pe.printStackTrace();
		} catch ( Exception e ) {
			e.printStackTrace();
		}
		return valuesByDevice;
	}

	// go to the database and see if any of the configurations have changed
	private void updateConfigurations() {
	}

}

I’ve also created some modifications for the pom.xml Maven file to enable it to load all the required dependencies:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.sap.iot.edgeservices.persistence.sample</groupId>
    <artifactId>PredictiveModel</artifactId>
    <version>1.0.0</version>
    <properties>
        <bundle.symbolicName>${project.artifactId}</bundle.symbolicName>
        <bundle.version>${project.version}</bundle.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.sap.iot.edgeservices</groupId>
            <artifactId>PersistenceService</artifactId>
            <version>3.1909.0</version>
            <type>bundle</type>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.osgi</groupId>
            <artifactId>org.osgi.core</artifactId>
            <version>4.3.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.osgi</groupId>
            <artifactId>org.osgi.service.component.annotations</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-osgi</artifactId>
            <version>2.9.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math3</artifactId>
            <version>3.6.1</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.jpmml</groupId>
            <artifactId>pmml-evaluator</artifactId>
            <version>1.4.13</version>
            <type>bundle</type>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifestFile>META-INF/MANIFEST.MF</manifestFile>
                    </archive>
                </configuration>
            </plugin>
            <!-- the versions plugin can be used to check for new plugin versions
                like this: mvn versions:Display-plugin-updates -->
            <plugin>
                <groupId>org.apache.felix</groupId>
                <artifactId>maven-bundle-plugin</artifactId>
                <extensions>true</extensions>
                <executions>
                    <!-- Configure extra execution of 'manifest' in process-classes
                        phase to make sure SCR metadata is generated before unit test runs -->
                    <execution>
                        <id>scr-metadata</id>
                        <goals>
                            <goal>manifest</goal>
                        </goals>
                        <configuration>
                            <supportIncrementalBuild>true</supportIncrementalBuild>
                        </configuration>
                    </execution>
                </executions>
                <configuration>
                    <exportScr>true</exportScr>
                    <manifestLocation>META-INF</manifestLocation>
                    <instructions>
                        <!-- Enable processing of OSGI DS component annotations -->
                        <_dsannotations>*</_dsannotations>
                        <!-- Enable processing of OSGI metatype annotations -->
                        <_metatypeannotations>*</_metatypeannotations>
                    </instructions>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

The scope provided means that you need to provide the dependencies, satisfy them by sideloading them and all the inherited dependencies to your Edge Platform within the SAP Cloud Platform Internet of Things Cockpit.

 – Satisfy the dependencies

Open Cockpit and go to the Gateways list, and open your gateway, go to Bundle Management and use the upload button to start the upload of a new jar OSGi bundle into the Edge Platform.

Repeat the following for all the dependencies (marked with scope provided) that you have in your pom file (except for the PersistenceService that is already provided by Edge Services).

After you have uploaded one single jar, please have a look into the OSGi console of your Edge Platform, you can have errors like the following:

org.osgi.framework.BundleException: The bundle "<BUNDLE NAME HERE>" could not be resolved. Reason: Missing Constraint: Import-Package: <MISSING PACKAGE HERE>

It means that one inherited dependency is missing into the OSGi environment. My suggestion is to search for the library that contains the missing Import-Package specified in the error string and load it with the same procedure into the Edge Platform

You might need to create your own OSGi library for some projects that are not OSGi compatible. To transform a non-OSGi project into an OSGi project, I generally add into the pom.xml file these plugins:

        <plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<configuration>
					<archive>
						<manifestFile>META-INF/MANIFEST.MF</manifestFile>
					</archive>
				</configuration>
			</plugin>
			<!-- the versions plugin can be used to check for new plugin versions
				like this: mvn versions:Display-plugin-updates -->
			<plugin>
				<groupId>org.apache.felix</groupId>
				<artifactId>maven-bundle-plugin</artifactId>
				<extensions>true</extensions>
				<executions>
					<!-- Configure extra execution of 'manifest' in process-classes
						phase to make sure SCR metadata is generated before unit test runs -->
					<execution>
						<id>scr-metadata</id>
						<goals>
							<goal>manifest</goal>
						</goals>
						<configuration>
							<supportIncrementalBuild>true</supportIncrementalBuild>
						</configuration>
					</execution>
				</executions>
				<configuration>
					<exportScr>true</exportScr>
					<manifestLocation>META-INF</manifestLocation>
					<instructions>
						<!-- Enable processing of OSGI DS component annotations -->
						<_dsannotations>*</_dsannotations>
						<!-- Enable processing of OSGI metatype annotations -->
						<_metatypeannotations>*</_metatypeannotations>
					</instructions>
				</configuration>
			</plugin> 

 – Build the Service

Now we can proceed with the compile of the customized service.

First I need to import the Edge Service persistence java library.

Copy from the custombundles folder of your Edge Platform the file osgi-bundle-PersistenceService-3.1909.0.jar to the project root of your project.

Use the following command from the bash console in the root of your project to import the library into your local maven repository.

mvn install:install-file -Dfile=PersistenceService-3.1909.0.jar -DgroupId=com.sap.iot.edgeservices -DartifactId=PersistenceService -Dversion=3.1909.0 -Dpackaging=jar

Replace the version in the file name and in the pom.xml file of the project in case it’s different from the version you are using to.

Build the project with maven: go in the root of your project and type in a bash console:

mvn clean install

It will create in the target folder of your project the OSGi bundle of our service, his name is  PredictiveModel-1.0.0.jar

6. Deploy the Service

It is possible to load the service from the SAP Cloud Platform Internet of Things Cockpit into your Edge Platform, like I have done for the dependencies, but I suggest to use this method only in development and upload the “released” version within Edge Services; the main advantage of it is that you can easily deploy it to several gateways, or group of gateways, and it’s always available in the cloud.

Open Edge Services and click Settings; ensure that the flag Allow upload of Custom Services (OSGi bundles) to my IoT Edge Platforms is checked and press Save.

Open Edge Service Management, in the left menu click on Services and press the plus button in the button bar of the services list to create a new service.

Specify a name such as RGB Predictive Analytics and any name for the configuration topic (we are not using it).

The File Name field contains the OSGi bundle we are going to load with this service, which is the PredictiveModel-1.0.0.jar we have built in the previous section.

Now go to the Groups and Gateways screen and open your gateway, switch to the Services tab and press the plus button to start the deployment of a new service.

In the dropdown is now available a custom service with the same name you have specified during the creation of it. Select it and press Save.

It took a while to be installed, after some time the service status in the list became Installed. Go to the OSGi console of your Edge Platform and check that:

  • No errors have been generated during the installation and the start of the service
  • The status of the service is STARTED (type “lb” command into the OSGi console)

7. Test the Service

We have used a REST Edge Platform, so I’ve used Postman to test it.

I’m going to send one measurement that is R:235, G:64, B 52

this value should be classified as valid with an overall index lower than the threshold that is 100, with the label brown.

Now we can check the predicted value in SAP Cloud Platform Internet of Things Cockpit. Open the device and check Data Visualization

The bundle has streamed back to SAP Cloud Platform Internet of Things, as expected, the prediction that is brown, the 3 top neighbors, that are with a euclidean distance below 100, and the overall evaluation that lower than 100.

If we send a value that is out of range, that generates the validity index out of range a new Service Call into SAP field Service management is created,. This will notify the employees of the Paint Ink company, that something goes wrong in the painting, so they have to check manually the object and, in case, search for the root of the problem and solve it (i.e. the presence of some dirty).

8. Conclusions

With this example, I have explained how to build a Service that uses the predictive model at the edge in a pure Java Implementation.

In this other blog post, you can find also a Python implementation that could be easily extended to several different languages.

The technologies used and the services are several, and it will also explain how to put all together to create a realistic scenario, and how to grant the continuum from the cloud to the edge.

 

1 Comment
You must be Logged on to comment or reply to a post.
  • That was a very useful article Marco. Has been looking for ways to run a predictive model at the Edge and your blog gives a step by step implementation of the same.

    Thanks a lot