Skip to Content
Technical Articles

Complex Event Processing with SAP Data Intelligence

In this blog post, I’ll describe how to get started with complex event processing in SAP Data Intlligence to reconciliate in real time various messages that can be associated to a common event, when those messages are produced with no particular order.

We consider credit card transactions, after being recorded, various systems post additional information in real time such as risk score, machine learning based categorisation, location information, etc… Those new messages are produced without any order guaranteed.

We will use 3 types of messages to enrich a credit card transaction:

  • category (groceries, entertainment…)
  • location (airport, mall)
  • risk score and confidence

We want to trigger a specific process as soon as all 3 messages are produced. If the message sequence is location, category and risk, then within microseconds of receiving the risk message, the final event should be triggered.

In this picture, each color represents a transaction, and the shape is a message type. As soon as all messages types are produced for the same transaction, the final event is triggered

Even in situation where latency is not measured in micro/milliseconds, it’s still very handy to handle various patterns of sequence of events. Other rules could include a particular order, combine logic, or include the absence of an event. For instance, after receiving category and risk, trigger an event if location message arrives in the following 30 seconds.

The pipeline below consists of a message generator, a streaming operator and two wiretaps.

In the wiretap at the bottom, the generated data looks like:

The operator script is:

import uuid
from datetime import datetime
import random
import json

#list of possible message types 
msg_types = ['category', 'location', 'risk']

#define some values for each attribute
cat = ['travel', 'entertainment', 'groceries', 'health', 'government/taxes']
loc = ['beach', 'airport', 'mall', 'rural', 'home']
country = ['France', 'Italy', 'Spain', 'Greece', 'N/A']

#return attributes 
def gen_msg(attr: str):
    
    if (attr=='category'):
        j={'category':random.choice(cat), 'category_confidence': round(random.random(),1)}
    elif (attr=='location'):
        j={'location':random.choice(loc), 'country':random.choice(country)}
    else:
        j={'risk':round(random.random(), 2), 'risk_confidence':round(random.random(),1)}
    return j
    

events={}

def gen_event_id():
    return str(uuid.uuid4())[:8]

#add a message for a given transaction id
def add_msg(transaction_id):

    #get the list of possible message types
    missing_attr=msg_types.copy()
    
    if transaction_id in events: 
        #remove mesage types already sent
        for a in events[transaction_id]:
            missing_attr.remove(a)
    else:
        events[transaction_id]={}
    #pick a message type not sent yet
    new_attr=random.choice(missing_attr)
    #create the corresponding message
    events[transaction_id][new_attr] = gen_msg(new_attr)
    return new_attr

def gen():
    
    #reuse or create an event id
    if(len(events)<= 15):
        transaction_id = gen_event_id()
    else:
        transaction_id = random.choice(list(events.keys()))
    
    #add an attribute to it
    new_attr = add_msg(transaction_id)
    
    #prepare a message by adding the timetamp and the transaction_id
    mymsg={'transaction_id':transaction_id, 'ts':datetime.now().isoformat()}
    
    mymsg.update(events[transaction_id][new_attr])
    
    #send a json object inside an array because streaming operator only accepts json arrays
    api.send(new_attr,json.dumps([mymsg]))
    
    #send mymsg to the debugging port
    api.send("alloutput", json.dumps(mymsg))
    
    #if all the attributes are set for an evet, delete it from the queue
    if (len(events[transaction_id])==len(msg_types)):
        del events[transaction_id]

#2 messages per second
api.add_timer("0.5s", gen)

The streaming operator receives each message type on a dedicated input port. The message is passed as a json string. The alternative input format for streaming data is CSV but handling column orders can be cumbersome.

The operator is configured with json as input/output with no opcodes

 

Inside the streaming operator, each port is an input stream, and they are combined into an output stream using a pattern over a 1 minute sliding window. The visual representation is:

And the CCL Script source code:

//Use the first letter of location, category and risk
//  to display the sequence in which the messages have arrived
DECLARE string displaySequence (bigdatetime loc_ts, bigdatetime cat_ts, bigdatetime risk_ts) {

	bigdatetime min_ts := minof(loc_ts, cat_ts, risk_ts);
    IF (min_ts=loc_ts)
        IF (cat_ts<risk_ts)
            return 'L,C,R';
        else
            return 'L,R,C';
    ELSE IF (min_ts=cat_ts) 
        IF (loc_ts<risk_ts)
            return 'C,L,R';
        ELSE
            return 'C,R,L';
    ELSE
        if (loc_ts<cat_ts)
            return 'R,L,C';
        else
            return 'R,C,L';
}
END;

CREATE INPUT STREAM LOC SCHEMA (
    transaction_id string,
    location string,
    country  string,
    ts       bigdatetime
);

CREATE INPUT STREAM CAT SCHEMA (
    transaction_id string,
    category string,
    category_confidence float,
    ts       bigdatetime
);

CREATE INPUT STREAM RISK SCHEMA (
    transaction_id string,
    risk     float,
    risk_confidence float,
    ts       bigdatetime
);

create output stream COMPLETE as
    select LOC.transaction_id,
           LOC.location,
           CAT.category,
           RISK.risk,
           //compute time delta between first and last event
           //convert result in from microseconds into seconds
           //round at 2 digits
           round((maxof(LOC.ts, RISK.ts, CAT.ts) - 
           minof(LOC.ts, RISK.ts, CAT.ts))/(1000.*1000), 2) as time_diff_s,
           //use a custom function to display the sequence order
           displaySequence( LOC.ts, CAT.ts, RISK.ts) as sequence_order
    FROM LOC, CAT, RISK
    MATCHING [1 minute : LOC && CAT && RISK] 
     on LOC.transaction_id = CAT.transaction_id = RISK.transaction_id ;
 

It helps productivity to develop and test CCLScript using HANA Express and then bring it into Data Intelligence.

 

Finally, the streaming operator has an output port with the name COMPLETE (must be identical to the name of an output stream) and type string connected to a wiretap. Here’s the final result, the message with all attributes is reassembled. Two examples of KPI computed over a stream are added:

  • The latency between the first and last message
  • The order in which the messages have been produced.

 

1 Comment
You must be Logged on to comment or reply to a post.