Fixed interval publishing – one row at a time
This example strikes me as a rather unusual use case, but someone definitely needed it, so I thought I would share it.
Here, the requirement is to buffer incoming events and publish them – one at a time, at a regular/fixed interval. Thus a stream of input events that arrive in bursts can be :”smoothed” out to become a stream with reguarly spaced events – for example publishing one event every 10 seconds. There was a further requirement to be able to adjust the publishing frequency at any time, specifying the interval between events in seconds.
The following example implements this. Note that there is a danger here: the publishing needs to be able to “keep up” with the input stream. if the publishing interval is too low for the average incoming message rate, eventually the project will run out of memory trying to hold everything.
While I was at it, I show how you can use an input stream to receive changes to parameter values. Here I structured the ParameterInput to be able to receive values for multiple parameters, even though I am only using one in this example
CREATE INPUT STREAM myInput
SCHEMA ( id integer , value string ) ;
CREATE INPUT WINDOW ParameterInput
SCHEMA ( ParameterName string , Value integer )
PRIMARY KEY ( ParameterName ) ;
CREATE FLEX myOut
IN myInput , ParameterInput
OUT OUTPUT STREAM myOut
SCHEMA ( id integer , value string )
BEGIN
DECLARE
integer publishFreq := 10;
integer counter := 0;
eventCache(myInput) queue;
long i;
END
;
ON myInput {
} ;
ON ParameterInput {
if (ParameterInput.ParameterName = ‘publishFreq’) publishFreq := ParameterInput.Value;
} ;
EVERY 1 second {
counter := counter+1;
if (counter >= publishFreq) {
i := cacheSize(queue);
if (i>0) {
output getCache(queue,(i-1));
deleteCache(queue,(i-1));
}
counter := 0;
}
};
END;