Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
JWootton
Advisor
Advisor
0 Kudos

This example strikes me as a rather unusual use case, but someone definitely needed it, so I thought I would share it.

Here, the requirement is to buffer incoming events and publish them - one at a time, at a regular/fixed interval.  Thus a stream of input events that arrive in bursts can be :"smoothed" out to become a stream with reguarly spaced events - for example publishing one event every 10 seconds.  There was a further requirement to be able to adjust the publishing frequency at any time, specifying the interval between events in seconds.

The following example implements this.  Note that there is a danger here:  the publishing needs to be able to "keep up" with the input stream.  if the publishing interval is too low for the average incoming message rate, eventually the project will run out of memory trying to hold everything.


While I was at it, I show how you can use an input stream to receive changes to parameter values.  Here I structured the ParameterInput to be able to receive values for multiple parameters, even though I am only using one in this example

CREATE INPUT STREAM myInput
  SCHEMA ( id integer , value string ) ;

CREATE INPUT WINDOW ParameterInput
  SCHEMA ( ParameterName string , Value integer )
  PRIMARY KEY ( ParameterName ) ;

CREATE FLEX myOut
  IN myInput , ParameterInput
  OUT OUTPUT STREAM myOut
  SCHEMA ( id integer , value string )
BEGIN
       DECLARE
              integer publishFreq := 10;
              integer counter := 0;
              eventCache(myInput) queue;
              long i;
       END
       ;
       ON myInput {
    } ;
       ON ParameterInput {
              if (ParameterInput.ParameterName = 'publishFreq') publishFreq := ParameterInput.Value;
    } ;
    EVERY 1 second  {
       counter := counter+1;
       if (counter >= publishFreq) {
              i := cacheSize(queue);
              if (i>0) {
                 output getCache(queue,(i-1));
                 deleteCache(queue,(i-1));
              }
         counter := 0;     
              }     
    };
END;