Skip to Content

Sometimes the Internet of Things creates a situation in which a lot of processing is required in order to make sense of all the data being generated by these newly connected, sensor-enabled “things” that can now constantly transmit potentially useful data from wherever they are. In many cases, it’s precisely the location that can be the problem. We often refer to this as the “edge.”

Put differently, the edge is simply any location that is close to the source of the data, that is, where the IoT devices exist and generate data. So an edge location could be an off-shore oil right at sea, a moving vehicle (like a truck or train), a manufacturing shop floor, or a retail outlet. When there are a lot of data generated at the edge, it is often important to process that data at the edge so that actions can be taken in near real time while allowing for a significant decrease in the volume of data that need be transferred to the cloud.

Usually at these edge locations is a device called an IoT gateway, which is a dedicated computing device that provides connectivity, data processing, analysis, and storage for the local edge “things.” The IoT gateways come in all sorts of shapes and sizes and can be everything from a raspberry Pi to an industrial, ruggedized PC. They often run a variety of processors, with a wide range of processing, memory, and storage capabilities.

So now that you know what the edge is, the question is, how do you go about taking what happens at the edge and integrating it back into your digital core for better business outcomes? Because of latency as well as transmission and storage costs, it makes sense to have a way to configure how to take what happens at the edge and send the highlights or anomalies back to the digital core rather than simply the raw data.

This concept is sometimes referred to as aggregation. And SAP Edge Services, as part of SAP Leonardo, makes this possible. SAP Edge Services brings the intelligent enterprise to the edge, by extending the processing power of the cloud to the edge, with local compute, processing and business semantics combined to create the intelligent edge. We’ve put together two samples of how this works and have made them available on GitHub here.

Let’s take a closer look at the first sample, what we call the streaming aggregation sample.

Streaming Aggregation Sample

Description

This is a streaming aggregation sample which can be used to implement your own aggregation needs. There are many reasons why aggregations are important for IOT applications at the edge. While creating rule, instead of every event, you may want to aggregate the last few messages. This is because sensor data are known to be fluctuating. Also, when you run ML/AI models at the edge, you may want to run these models at aggregated data rather than every event. That way your ML/AI will use the less compute resource but delivers the same result.

Once you convert this sample to your own streaming aggregation service, make sure you run thorough testing before deploying to the production landscape. Though this is a Java sample, internally it uses custom rule feature of the Streaming service to compute the aggregation. More in High Level Design section.

What it does?

It basically creates two types of streaming aggregations. We call it persistence aggregation and streaming rule aggregation.

Persistence Aggregation

The aggregations are calculated in time window. For example, if you define a time window of 30 seconds then, aggregations are calculated at the end of 30 seconds. Then the window is cleared, and it starts accumulating event for next 30 seconds. The sample just saves the aggregated data in to a file. You can extend this to save in the persistence service.

Streaming Rule Aggregation

The aggregations are calculated in a time window, but aggregation is generated every time new event arrives. Every second, the time window slides one second and so to removing beginning one second data. The calculated streaming aggregations can be sent back to the same rule in the streaming servicei that it came from. The aggregation will go as event of the rule.

How does this Aggregation Service receive input data?

The inputs are received from the Streaming rule. You can define a Custom Rule in the Streaming rule engine. In this setup usually, you tell the rule engine which host and port your custom engine is running. In the JSON configuration of this streaming aggregation service, put the same host and port. With this, the aggregation service will bind this port when it starts. And the streaming rule engine sends data to this port. The data transfer to streaming aggregation service and subscription of the aggregated result back to rule engine are handled by the streaming service.

How the aggregations are computed?

The streaming aggregation service internally uses streaming engine in another process to compute the aggregation. The aggregation logics are written in CCL (continuous computation language). Depending on the configuration of the streaming aggregation service, CCL is generated, compiled and run to compute the aggregation.

How can I use streaming aggregations in the streaming rule

The streaming aggregations created in the aggregation service can be brought back to the custom rule of streaming service as an event. If you need the aggregated data then you can use enterprise plugin to send the aggregated to the appropriate target.

What is the expected performance?

Aggregations are computed using streaming engine and so performance is highly depend on the available memory and compute resources. The streaming aggregation service runs in a separate process and so there is an inherent network delay to get the event from the streaming service to the aggregation service.

If your aggregation time buckets are small enough – say it doesn’t capture more than few thousands of events within this time, then you wouldn’t see any performance degradation even if data are continuously input at the same speed.

You should test the performance to see if it satisfies your requirements.

What are the aggregations it supports?

The following 11 aggregations are supported by this sample aggregation service:

AVG, SUM, COUNT, MIN, MAX, MEDIAN, STDDEV (standard deviation), COUNTDISTINCT (count of distinct values), WEIGHTEDAVG (weighted average), LASTVALUE (last value in the time bucket), and FIRSTVALUE (first value in the time bucket). All the aggregations are grouped by device id, sensor id and profile id.

High Level Design

Under the hood a CCL is automatically generated by this service and using custom rule feature of streaming engine data are passed to this service.

The streaming aggregation service is mainly control by the JSON configuration file. The JSON configuration file has two type of configurations: streams and rules.

  1. Streams: This section defines the supported aggregations and whether a persistence aggregation need to be calculated.
  2. Rules: This section defines what are aggregations are subscribed by a sensor profile and rule.

Depending on the parameters of this JSON configuration, at the start of this service, CCL codes are generated, compiled and started. Once it is started, then the rule subscription configuration is published to an internal control stream of this CCL. The CCL can dynamically adjust the rule subscription.

Once it starts, streaming rule which is configured to send data to custom rule can start sending data to this aggregation service. The same streaming aggregation servive can handle data from many sensor profiles.

Depending on the rule subscription, streaming aggregations are published to a pre-defined stream where the streaming rule engine gets the aggregated data.

The streaming aggregation service has a subscriber which listens to the computed persistence aggregations. When it receives the aggregations, it stores these values in to a CSV file. It can be extended to store in persistence service.

How to change CCL code?

CCL code cannot be changed since it is generated from JSON configuration every time the service starts. Most of the simple things you should be able to handle by simply changing JSON configuration file. If required then you should change CCL generator i.e., CclGen.java.

Requirements

Configuration

By simply changing the JSON file you can control this streaming aggregation service. There are two sections in the JSON file: streams and rules. The streams section is fixed. This means this section must be there for this service to function properly.

Each section in the streams have three fixed and one optional name/value pairs. The properties “name”, “time” and “type” are fixed. The property “persist” is optional. If you put “persist” property for a aggregation then this service will create the aggregation and save in a table.

In the rule section of the JSON file there are three name value pairs you define for each section. These are “ruleId”, “profileId” and “aggr”. This is where you configure which streaming aggregation to send to which streaming rule. You may choose not to have rule section at all. In that case, there will be no streaming aggregations are generated.

Example

I have two sensor profiles Temperature_0_1 and Humidity_0_1. I have already defined two CUSTOM RULES : “TempAvg” and “HumdAvg” one for each profile.

I want to generate AVG and SUM in every 30 seconds for both of these sensor measures and save in the persistence service for later use.

I would also like to bring the streaming AVG of Temperature_0_1 to a rule “TempAvg” and AVG of Humidity_0_1 to a rule “HumdAvg”.

Solution:

  1. First, you configure the streams section of the JSON file. Put “persist”: true as shown below ONLY for the AVG and SUM to indicate the streaming aggregation service to generate and save these persistence aggregations. Also change the value of the “time” to 30 SECONDS for both AVG and SUM.
    {
      "name": "AVGSTREAM",
      "time": "10SEC",
      "type": "AVG",
      "persist": true
    },
    {
      "name": "SUMSTREAM",
      "time": "10SEC",
      "type": "SUM",
      "persist": true
    },
  1. Now you need to configure the rule section to indicate that AVG calculated for sensor profiles Temperature_0_1 and Humidity_0_1 should be sent to rules TempAvg and HumdAvg respectively.
    {
      "ruleId": "TempAvg",
      "profileId": "Temperature_0_1",
      "aggr": "AVG"
    },
    {
      "ruleId": "HumidAvg",
      "profileId": "Humdity_0_1",
      "aggr": "AVG"
    }
  1. Make sure you have Temperature_0_1 and Humidity_0_1 profiles are created in the rule engine. And also make sure to create CUSTOM RULES TempAvg and HumidAvg for each profile.
  2. Set the host name as localhost and port as 9090 for CUSTOM RULES. Make sure that host and port are same in the the in the JSON file too…
  3. Once JSON file is changed and saved, you need to restart the streaming aggregation service so that it will regenerate the CCL and start the aggregations.

Limitations

There are some limitations of what you can do with this aggregation service. Here are the list of functioanlities that are supported or not supported:

For Persistence Aggregation:

  • You can only define ONE Custom Rule for ONE sensor profile (sensor data)
  • You can define only one type of aggregation once for each sensor profile. You can not define same aggregation type for a sensor profile twice and provide different time windowing property
  • You can define one or more unique aggregations for each sensor profile

For Streaming Aggregation:

  • You can only define ONE Custom Rule for ONE sensor profile (sensor data)
  • You can define only one type of aggregation once for each sensor profile. You can not define same aggregation type for a sensor profile twice and provide different time windowing property
  • You can define one or more unique aggregations for each sensor profile
  • You can only bring ONE type of aggregation as an event back to streaming rule engine

How do I deploy this service?

This service is not yet wrapped in OSGI container. If you wrap this in OSGI container then you can use IoT Service to deploy this to the gateway. For now, this sample you need to manually deploy in the gateway. The sections below describes how you build and run this sample at the gateway. One of the extension of this sample is to wrap this with OSGI and deploy from the cloud.

Prerequisites

We assumed that Maven is installed in your system. We also assume that you have already installed Edge Services. You can find the required libraries in the pom.xml file. The Edge Services libraries are in $STREAMING_HOME/libj folder.

You can use on-premise Edge Services or Edge Service cloud edition for this sample. If you are using on-premise, then download the Edge Services from the SAP Service Marketplace and install it as per the instructions provided in the instruction guide.

For cloud edition, you first need a tenant for Edge Services Policy Service. Then go to SAP Service Marketplace, download the IoT Services Gateway Edge and install it. Then from the Policy Service tenant, you can install the Edge Services in to your gateway.

We assumed that you are familiar with setting up streaming rules including custom rules.

Download the sample app

git clone https://github.com/SAP/iot-edge-services-samples.git
cd iot-edge-services-samples
git checkout streaming-aggregation

or if you want to clone the single branch only:

git clone -b streaming-aggregation --single-branch git://github.com/SAP/iot-edge-services-samples.git
cd streaming-aggregation

Compile and Package

  1. Go to the folder where pom.xml file is located.
  2. Copy libraries to Maven repository by running following command:
mvn eclipse:eclipse
  1. The above command will through errors for streaming libraries. It will also show you the command that you need to run in the command line. Follow those commands to copy all the libraries.
  2. Run following command to compile and build the package:
mvn clean install
  1. Create tar or zip which will include all the 3rd party jars:
cp aggr-cnf.json target
cd target
tar -cvf ../EdgeAnalytics.tar SampleEdgeAnalytics-null.jar  aggr-cnf.json lib/*.jar

Deploy

Here are the steps that you can follow to deploy, run and test it:

  1. Un-tar the tar file in to a folder
  2. Open the aggr-conf.json file in Notepad
  3. Start your gateway if it is already not running
  4. Login to the Edge Service’s console using browser: https://localhost
  5. If not yet then create a sensor profile: Temperature_0_1
  6. Create a CUSTOM rule called: TempAggregation
  7. Configure the custom rule with host localhost and port 9090
  8. Save it.
  9. Copy the sensor profile id and rule id (both are hexadecimal string)
  10. Go back to your aggr-conf.json file in the Notepad and in the bottom find the section called “rules” and change the ruleId and profileId for one of the section
  11. Save it. Exit the notepad.
  12. In the command line, set the STREAMING_HOME. For example, if your “edgeservice” folder is at “c:\Gateway\edgeservice” then your STREAMING_HOME should be set to “C:\Gateway\edgeservice\esp\ESP-5_1”
  13. To run aggregation service from command line:
java -jar SampleEdgeAnalytics-null.jar
  1. Shutdown the gateway and start again
  2. Open MQTTBox (if you don’t have then download this for Chrome) and send messages to Edge Service’s Temperature_0_1 sensor profile. You can use other methods to send data to Edge Services too.
  3. Check the file for aggregations and events in the UI. If you are using “persist” aggregation then you should see new aggregations are created in the CSV file in the same folder. If you have configured JSON to use aggregations in the rule then you should see the rules events generated in the streaming rule engine.

Troubleshoot

For any issues, you can check the log file that this aggregation service generates as well as Edge Services log files located at …/dep_iot_edge/log/

Limitations / Disclaimer

Note: Sample scenarios/applications are designed to help you get an overall understanding of various extensibility concepts/patterns. SAP recommends not to use these samples for any productive usage. They show basic interaction with an SAP IoT Edge Services system. Topics like authentication, error handling, transactional correctness, security, caching, tests were omitted on purpose for the sake of simplicity.

To report this post you need to login first.

Be the first to leave a comment

You must be Logged on to comment or reply to a post.

Leave a Reply