Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
remi_astier
Employee
Employee
In this blog post, I'll describe how to get started with complex event processing in SAP Data Intlligence to reconciliate in real time various messages that can be associated to a common event, when those messages are produced with no particular order.

We consider credit card transactions, after being recorded, various systems post additional information in real time such as risk score, machine learning based categorisation, location information, etc... Those new messages are produced without any order guaranteed.

We will use 3 types of messages to enrich a credit card transaction:

  • category (groceries, entertainment...)

  • location (airport, mall)

  • risk score and confidence


We want to trigger a specific process as soon as all 3 messages are produced. If the message sequence is location, category and risk, then within microseconds of receiving the risk message, the final event should be triggered.

In this picture, each color represents a transaction, and the shape is a message type. As soon as all messages types are produced for the same transaction, the final event is triggered


Even in situation where latency is not measured in micro/milliseconds, it's still very handy to handle various patterns of sequence of events. Other rules could include a particular order, combine logic, or include the absence of an event. For instance, after receiving category and risk, trigger an event if location message arrives in the following 30 seconds.

The pipeline below consists of a message generator, a streaming operator and two wiretaps.


In the wiretap at the bottom, the generated data looks like:


The operator script is:
import uuid
from datetime import datetime
import random
import json

#list of possible message types
msg_types = ['category', 'location', 'risk']

#define some values for each attribute
cat = ['travel', 'entertainment', 'groceries', 'health', 'government/taxes']
loc = ['beach', 'airport', 'mall', 'rural', 'home']
country = ['France', 'Italy', 'Spain', 'Greece', 'N/A']

#return attributes
def gen_msg(attr: str):

if (attr=='category'):
j={'category':random.choice(cat), 'category_confidence': round(random.random(),1)}
elif (attr=='location'):
j={'location':random.choice(loc), 'country':random.choice(country)}
else:
j={'risk':round(random.random(), 2), 'risk_confidence':round(random.random(),1)}
return j


events={}

def gen_event_id():
return str(uuid.uuid4())[:8]

#add a message for a given transaction id
def add_msg(transaction_id):

#get the list of possible message types
missing_attr=msg_types.copy()

if transaction_id in events:
#remove mesage types already sent
for a in events[transaction_id]:
missing_attr.remove(a)
else:
events[transaction_id]={}
#pick a message type not sent yet
new_attr=random.choice(missing_attr)
#create the corresponding message
events[transaction_id][new_attr] = gen_msg(new_attr)
return new_attr

def gen():

#reuse or create an event id
if(len(events)<= 15):
transaction_id = gen_event_id()
else:
transaction_id = random.choice(list(events.keys()))

#add an attribute to it
new_attr = add_msg(transaction_id)

#prepare a message by adding the timetamp and the transaction_id
mymsg={'transaction_id':transaction_id, 'ts':datetime.now().isoformat()}

mymsg.update(events[transaction_id][new_attr])

#send a json object inside an array because streaming operator only accepts json arrays
api.send(new_attr,json.dumps([mymsg]))

#send mymsg to the debugging port
api.send("alloutput", json.dumps(mymsg))

#if all the attributes are set for an evet, delete it from the queue
if (len(events[transaction_id])==len(msg_types)):
del events[transaction_id]

#2 messages per second
api.add_timer("0.5s", gen)

The streaming operator receives each message type on a dedicated input port. The message is passed as a json string. The alternative input format for streaming data is CSV but handling column orders can be cumbersome.

The operator is configured with json as input/output with no opcodes



 

Inside the streaming operator, each port is an input stream, and they are combined into an output stream using a pattern over a 1 minute sliding window. The visual representation is:


And the CCL Script source code:
//Use the first letter of location, category and risk
// to display the sequence in which the messages have arrived
DECLARE string displaySequence (bigdatetime loc_ts, bigdatetime cat_ts, bigdatetime risk_ts) {

bigdatetime min_ts := minof(loc_ts, cat_ts, risk_ts);
IF (min_ts=loc_ts)
IF (cat_ts<risk_ts)
return 'L,C,R';
else
return 'L,R,C';
ELSE IF (min_ts=cat_ts)
IF (loc_ts<risk_ts)
return 'C,L,R';
ELSE
return 'C,R,L';
ELSE
if (loc_ts<cat_ts)
return 'R,L,C';
else
return 'R,C,L';
}
END;

CREATE INPUT STREAM LOC SCHEMA (
transaction_id string,
location string,
country string,
ts bigdatetime
);

CREATE INPUT STREAM CAT SCHEMA (
transaction_id string,
category string,
category_confidence float,
ts bigdatetime
);

CREATE INPUT STREAM RISK SCHEMA (
transaction_id string,
risk float,
risk_confidence float,
ts bigdatetime
);

create output stream COMPLETE as
select LOC.transaction_id,
LOC.location,
CAT.category,
RISK.risk,
//compute time delta between first and last event
//convert result in from microseconds into seconds
//round at 2 digits
round((maxof(LOC.ts, RISK.ts, CAT.ts) -
minof(LOC.ts, RISK.ts, CAT.ts))/(1000.*1000), 2) as time_diff_s,
//use a custom function to display the sequence order
displaySequence( LOC.ts, CAT.ts, RISK.ts) as sequence_order
FROM LOC, CAT, RISK
MATCHING [1 minute : LOC && CAT && RISK]
on LOC.transaction_id = CAT.transaction_id = RISK.transaction_id ;

It helps productivity to develop and test CCLScript using HANA Express and then bring it into Data Intelligence.

 

Finally, the streaming operator has an output port with the name COMPLETE (must be identical to the name of an output stream) and type string connected to a wiretap. Here's the final result, the message with all attributes is reassembled. Two examples of KPI computed over a stream are added:

  • The latency between the first and last message

  • The order in which the messages have been produced.



 
1 Comment