Skip to Content

Occassionally people want to sort data in ESP – for example maybe they want to keep a top 10 list. While it’s easy enough to do this in ESP, it’s not obvious how to do it, since there is no built-in sorting mechanism in CCL – i.e. there is no ORDER BY clause. What’s more, it some people see the GROUP ORDER BY clause and think they can use that – and are surprised by the results. So, let me provide one way of maintaining a sorted list – using a Flex Operator – as well as clarify how the GROUP ORDER BY clause works.

Let me start by reiterating this basic point:  CCL does not support an ORDER BY clause.  There is no way to maintain the contents of an ESP window in a sorted order directly within a CREATE WINDOW statement.  The only way to do it is with a Flex Operator where you have direct control over the contents of the window and you can add and delete rows as needed.

Here’s one example that maintains a sorted top 10 list – in this case the 10 user ID’s registering the most “events” in the last 2 hours:

// input window receives user events and keeps the last 2 hours of events
CREATE INPUT WINDOW EventsIn
SCHEMA (SeqNo long , UID integer , Action string )
PRIMARY KEY (SeqNo)
KEEP 2 HOURS
AUTOGENERATE ( SeqNo );

// now group events by user ID and count the total number of events per UID

/**@SIMPLEQUERY=AGGREGATE*/
CREATE OUTPUT WINDOW counts
PRIMARY KEY DEDUCED
AS SELECT
   EventsIn.UID UID ,
   count ( EventsIn.Action) ActionCount
FROM EventsIn
GROUP BY EventsIn.UID ;

CREATE FLEX Top10sorted
  IN counts
  OUT OUTPUT WINDOW Top10 SCHEMA ( Position integer , UID integer, ActionCount integer ) PRIMARY KEY ( Position )
BEGIN
DECLARE
    //cache is an event cache that will hold the top 10 list, sorted by ActionCount
    eventCache (counts[], coalesce, ActionCount desc) cache;
    typeof(Top10) outrec;
    typeof(counts) temp;
  END;

ON counts
{
  if (cacheSize(cache) > 10) deleteCache(cache, 10);
    integer i := 0;
    while (i < cacheSize(cache) and i < 10) {
       temp := getCache(cache,i);
       outrec := [Position = i; UID = temp.UID; ActionCount = temp.ActionCount];
       output setOpcode(outrec, upsert);
       i++ ;
       }
        
    };
   
END;

Of course your updating the entire output window every time there’s a change in the top 10 list, but that’s the nature of sorted lists.  If you want a top 10 list but dont’ need it sorted,  this alternative to the Flex above is a little more efficient (i.e. it doesn’t update every row in the top 10 list on any change to the list, but only updates rows that change and deletes the bottom row when a new one gets added).

CREATE FLEX Top10unsorted

  IN counts

  OUT OUTPUT WINDOW Top10un SCHEMA ( UID integer, ActionCount integer ) PRIMARY KEY ( UID )

BEGIN

DECLARE

    //cache is an event cache that will hold the top 10 list, sorted by ActionCount

    eventCache (counts[], coalesce, ActionCount desc) cache;

    typeof(Top10un) outrec;

    typeof(counts) temp;

  END;

ON counts

{

  if (cacheSize(cache) > 10) {

    temp := getCache(cache,10);

    if (counts.ActionCount > temp.ActionCount) {

      deleteCache(cache, 10);

      outrec := [UID = temp.UID; ActionCount = temp.ActionCount];

      output setOpcode(outrec, safedelete);

      outrec := [UID = counts.UID; ActionCount = counts.ActionCount];

         output setOpcode(outrec, upsert);

       }

  }

  ELSE {

    outrec := [UID = counts.UID; ActionCount = counts.ActionCount];

    output setOpcode(outrec, upsert);

  }

   

};     

   

END;

A little explanation of an eventCache is probably in order, since they are less frequently used.  EventCaches are a cool data structure for maintaining custom windows.  And unless you understand how they work, they can seem a bit mysterious when you just look at the code. Here’s why:  they are typically used in “automatic” mode, where the contents of the event cache are updated automatically based on incoming events.  So in the example above, we’ve declared:

             eventCache (counts[], coalesce, ActionCount desc) cache;

This creates an eventCache named cache that automatically adds every event received from the counts window to the cache.  And the reason we are turning to the eventCache here is that … drum roll please…  you can specify a sorting order on an eventCache.  So where we’ve told it that cache should be sorted by ActionCount in desc(ending) order.  Coalesce tells it that we want one row in the cach for each UID, so basically apply updates rather than add them to the cache as separate events.  And finally noted that “counts[]” sets this up as one “bucket”.  You can also have one bucket for each key value or other field value – but i wont’ get into that here.  You can read all about eventCaches here.

Finally, back to GROUP ORDER BY and GROUP FILTER…

These CCL clause are only used with a GROUP BY clause and the key is that they are applied before the aggregation is done.

So if you added the following window definition to the example above:

/**@SIMPLEQUERY=AGGREGATE*/

CREATE OUTPUT WINDOW TopUserActivity

PRIMARY KEY DEDUCED

AS SELECT

  1 id,

  sum ( counts.ActionCount ) TopUserTotal

  FROM counts

  GROUP FILTER rank() < 10

  GROUP BY 1

  GROUP ORDER BY counts.ActionCount DESC ;

This would first sort all the rows in counts by ActionCount. It would then apply the group filter, only  keeping the rows with a rank in the sort order of <10.  And then finally it would aggregate – here the GROUP BY 1 will put all rows in one group, thus this window will only have one row, and it will contain the total action count for the top 10 users.  The key thing to remember is that GROUP ORDER BY and GROUP FILTER are applied before the aggregation.

To report this post you need to login first.

Be the first to leave a comment

You must be Logged on to comment or reply to a post.

Leave a Reply