Technical Articles
CCL Tips and Techniques: Sampling – Output every N seconds
CCL or Continuous Computation Language is the scripting language used to develop projects for HANA streaming analytics and Streaming Lite. CCL is derived from SQL which helps with the learning curve for experienced database developers who are starting to work with real time streaming analytics. As with other development languages, examples of some common design patterns can also be really useful in ramping up.
HANA streaming analytics provides real time event processing. That means that new incoming events are processed as quickly as they arrive. One of the basic structures in a a streaming project is a “window”. Windows persist data for a specific interval which can be defined either in terms of a # of records or a time interval. One operation that windows enable is the calculation of aggregate values such as an average. The default behavior is to generate output as a “sliding window” which means that each time an input value is received, or a previous value ages out, the aggregate calculation is updated and the new result is output.
An example of a sliding window used to calculate an average temperature would look like this:
CREATE OUTPUT WINDOW AVG_TEMP
PRIMARY KEY DEDUCED KEEP 30 SEC
AS SELECT
EVENTS.MACHINEID MACHINEID ,
LAST ( EVENTS.EVENT_TIME ) EVENT_TIME ,
avg ( to_decimal ( EVENTS.EVENT_VALUE, 4, 2 ) ) AVG_TEMP ,
EVENTS.MAX_TEMP MAX_TEMP ,
EVENTS.MIN_TEMP MIN_TEMP ,
EVENTS.LOCATION LOCATION ,
EVENTS.TEMP_UNIT TEMP_UNIT
FROM EVENTS
GROUP FILTER EVENTS.EVENT_NAME = 'TEMP'
GROUP BY EVENTS.MACHINEID ;
In this example, we are receiving an incoming stream of data for several different machines with each machine uniquely identified by a MACHINEID. The window persists each event for 30 seconds from the time it arrives and the average temperature is calculated on a per machine basis.
If 10 events are received for MACHINEID = 1 in a 30 second period, then the average is calculated based on the 10 events. If only 1 event is received in a given 30 second period, then the average is calculated based on that single event. In addition to the average value being updated every time a new event is received, the average is also updated every time an event ages out. Thirty seconds after the 1st event is received, it will age out and the average value will be updated based on the remaining events.
As mentioned earlier, the default behavior is what is known as a sliding window. Each time an event enters the window or ages out of the window, the aggregate calculation is updated and the new value is output from the window. This gives you real time updates and is most likely the behavior you want when monitoring a production process to generate alerts or control a piece of manufacturing equipment.
However when it comes to capturing and storing high volume data for later analysis, review and reporting, you often don’t need to keep the full volume of the raw data. For example, you may get temperature readings every second, but want to consolidate the raw data and store an average for each 30 second interval in your data warehouse. This is where “jumping windows” come in.
A jumping window still persists events for a specified interval defined in terms of a # of rows or a time interval. However instead of aging out rows individually, it discards the full set of rows at the end of each interval, and starts accumulating a new set of rows.
Jumping windows still generate output each time an aggregate value is updated. If 100 incoming events were received for a given SensorID in the 30 second interval, then 100 output events will be generated. If 1,000 incoming events are received, then 1,000 output events will be generated. In order to reduce the volume of records output from the project so that only 1 output event is generated per SensorID per interval we will need to use a FLEX element in addition to the jumping window.
This particular example shows how to calculate an aggregate from a stream of input data and at a periodic interval of N seconds publish the aggregate for the previous interval. N happens to be set to 30 seconds here, but it can be a longer or shorter value.
CREATE INPUT STREAM events
SCHEMA (
SensorId string ,
Temperature float ,
ReadingDate msdate ) ;
/**@SIMPLEQUERY=AGGREGATE*/
CREATE OUTPUT WINDOW aAverageTemperatureBySensor
PRIMARY KEY DEDUCED
KEEP ALL
AS
SELECT
events.SensorId SensorId ,
avg (events.Temperature ) Temperature ,
last(events.ReadingDate) ReadingDate
FROM events KEEP EVERY 30 SECONDS PER(SensorId)
GROUP BY events.SensorId ;
CREATE FLEX ThrottledEvents
IN aAverageTemperatureBySensor
OUT OUTPUT STREAM ThrottledEvents
SCHEMA (SensorId string, Temperature float, ReadingDate msdate)
BEGIN
ON aAverageTemperatureBySensor{
if (getOpcode(aAverageTemperatureBySensor) = delete) {
output setOpcode(aAverageTemperatureBySensor, insert);
}
};
END;
The window used to calculate the average temperature looks fairly similar to the sliding window example but note that the KEEP policy has a different format and is declared in a different location in the CREATE OUTPUT WINDOW statement.
The aAverageTemperatureBySensor window still generates output events every time a new event comes into the window. Each of the output events generated in response to incoming data will have an OpCode or operation code of “insert” for the first output event for a given SensorID in each interval or “update” for subsequent output events. At the end of each interval an output event with a “delete” OpCode will be generated when the aAVerageTemperatureBySensor window contents are cleared.
Each of the events output from the aAverageTemperatureBySensor window will be received by the ThrottledEvents FLEX element, which acts as a filter in order to reduce the volume of output from the streaming project, Within the ThrottledEvents FLEX element, the “ON aAverageTemperatureBySensor clause is used to watch for only those events with an OpCode of “delete”. These will be the final events generated from the aAverageTemperatureBySensor window at the end of each interval for each SensorID.
...
ON aAverageTemperatureBySensor{
if (getOpcode(aAverageTemperatureBySensor) = delete) {
output setOpcode(aAverageTemperatureBySensor, insert);
}
...
The FLEX element then updates the OpCode from “delete” to “insert” and passes it on downstream in the project via the “output” statement. Events received from the aAverageTemperatureBySensor window with an OpCode of “insert” are filtered out by the if (getOpcode(aAverageTemperatureBySensor) = delete) statement and are not produced as output form the ThrottledEvents FLEX element.