Someone recently asked me for an example of how they could implement a custom aggregation using an event cache, so I thought I would share it with you.
This simple aggregation counts how many of the last 100 events for a given id had a value greater than the value in the current event:
CREATE INPUT STREAM myInput
SCHEMA ( id integer , value float ) ;
// Groups by ID and counts how many of the last 100 events had a "value" greater than current event
CREATE FLEX Flex1
IN myInput
OUT OUTPUT WINDOW myOut
SCHEMA ( id integer , currentvalue float, ct integer ) PRIMARY KEY ( id )
BEGIN
DECLARE
eventCache ( myInput[id], 100 events) cache ;
typeof ( myInput ) rec ;
typeof (myOut) outrec;
END;
ON myInput {
integer i := 0;
long n := cacheSize ( cache ) - 1 ;
WHILE ( n > - 1 )
{
rec := getCache ( cache , n ) ;
if (rec.value > myInput.value) i := i+1;
n := n - 1 ;
}
outrec.id := myInput.id ;
outrec.currentvalue := myInput.value;
outrec.ct :=i;
output setOpcode ( outrec , upsert ) ;
};
END;
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
35 | |
25 | |
17 | |
13 | |
8 | |
7 | |
6 | |
6 | |
6 | |
6 |