Skip to Content
Author's profile photo Jeff Wootton

Fixed interval publishing – one row at a time

This example strikes me as a rather unusual use case, but someone definitely needed it, so I thought I would share it.

Here, the requirement is to buffer incoming events and publish them – one at a time, at a regular/fixed interval.  Thus a stream of input events that arrive in bursts can be :”smoothed” out to become a stream with reguarly spaced events – for example publishing one event every 10 seconds.  There was a further requirement to be able to adjust the publishing frequency at any time, specifying the interval between events in seconds.

The following example implements this.  Note that there is a danger here:  the publishing needs to be able to “keep up” with the input stream.  if the publishing interval is too low for the average incoming message rate, eventually the project will run out of memory trying to hold everything.

While I was at it, I show how you can use an input stream to receive changes to parameter values.  Here I structured the ParameterInput to be able to receive values for multiple parameters, even though I am only using one in this example

CREATE INPUT STREAM myInput
  SCHEMA ( id integer , value string ) ;

CREATE INPUT WINDOW ParameterInput
  SCHEMA ( ParameterName string , Value integer )
  PRIMARY KEY ( ParameterName ) ;

CREATE FLEX myOut
  IN myInput , ParameterInput
  OUT OUTPUT STREAM myOut
  SCHEMA ( id integer , value string )
BEGIN
       DECLARE
              integer publishFreq := 10;
              integer counter := 0;
              eventCache(myInput) queue;
              long i;
       END
       ;
       ON myInput {
    } ;
       ON ParameterInput {
              if (ParameterInput.ParameterName = ‘publishFreq’) publishFreq := ParameterInput.Value;
    } ;
    EVERY 1 second  {
       counter := counter+1;
       if (counter >= publishFreq) {
              i := cacheSize(queue);
              if (i>0) {
                 output getCache(queue,(i-1));
                 deleteCache(queue,(i-1));
              }
         counter := 0;     
              }     
    };
END;

Assigned Tags

      Be the first to leave a comment
      You must be Logged on to comment or reply to a post.