Skip to Content

Someone recently asked me for an example of how they could implement a custom aggregation using an event cache, so I thought I would share it with you.

This simple aggregation counts how many of the last 100 events for a given id had a value greater than the value in the current event:

CREATE INPUT STREAM myInput
  SCHEMA ( id integer , value float ) ;

// Groups by ID and counts how many of the last 100 events had a “value” greater than current event
CREATE FLEX Flex1
IN myInput
OUT OUTPUT WINDOW myOut
SCHEMA ( id integer , currentvalue float, ct integer ) PRIMARY KEY ( id )
  BEGIN
DECLARE
  eventCache ( myInput[id], 100 events) cache ;
  typeof ( myInput ) rec ;
  typeof (myOut) outrec;
END;

ON myInput {
  integer i := 0;
  long n := cacheSize ( cache ) – 1 ;
  WHILE ( n > – 1 )
    {
    rec := getCache ( cache , n ) ;
    if (rec.value > myInput.value) i := i+1;
    n := n – 1 ;
    }
  outrec.id := myInput.id ;
  outrec.currentvalue := myInput.value;
  outrec.ct :=i;
  output setOpcode ( outrec , upsert ) ;
  };
END;

To report this post you need to login first.

Be the first to leave a comment

You must be Logged on to comment or reply to a post.

Leave a Reply